getMessageBus example

/** @var BusNameStamp|null $busNameStamp */
        $busNameStamp = $envelope->last(BusNameStamp::class);

        if (null === $busNameStamp) {
            if (null === $this->fallbackBus) {
                throw new InvalidArgumentException('Envelope is missing a BusNameStamp and no fallback message bus is configured on RoutableMessageBus.');
            }

            return $this->fallbackBus->dispatch($envelope$stamps);
        }

        return $this->getMessageBus($busNameStamp->getBusName())->dispatch($envelope$stamps);
    }

    /** * @internal */
    public function getMessageBus(string $busName): MessageBusInterface
    {
        if (!$this->busLocator->has($busName)) {
            throw new InvalidArgumentException(sprintf('Bus named "%s" does not exist.', $busName));
        }

        
$last = array_pop($stopsWhen);
            $stopsWhen = ($stopsWhen ? implode(', ', $stopsWhen).' or ' : '').$last;
            $io->comment("The worker will automatically exit once it has {$stopsWhen}.");
        }

        $io->comment('Quit the worker with CONTROL-C.');

        if (OutputInterface::VERBOSITY_VERBOSE > $output->getVerbosity()) {
            $io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
        }

        $bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;

        $worker = new Worker($receivers$bus$this->eventDispatcher, $this->logger, $rateLimiters);
        $options = [
            'sleep' => $input->getOption('sleep') * 1000000,
        ];
        if ($queues = $input->getOption('queues')) {
            $options['queues'] = $queues;
        }
        $worker->run($options);

        return 0;
    }
Home | Imprint | This part of the site doesn't use cookies.