StopWorkerOnMessageLimitListener example

EOF
            )
        ;
    }

    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        $this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));

        $io = new SymfonyStyle($input$output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
        $io->comment('Quit this command with CONTROL-C.');
        if (!$output->isVeryVerbose()) {
            $io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
        }

        $failureTransportName = $input->getOption('transport');
        if (self::DEFAULT_TRANSPORT_OPTION === $failureTransportName) {
            $this->printWarningAvailableFailureTransports($io$this->getGlobalFailureReceiverName());
        }
        
$bus->expects($this->exactly(2))
            ->method('dispatch')
            ->willReturnCallback(function D$envelope) use (&$envelopes) {
                return $envelopes[] = $envelope;
            });

        $dispatcher = new class() implements EventDispatcherInterface {
            private StopWorkerOnMessageLimitListener $listener;

            public function __construct()
            {
                $this->listener = new StopWorkerOnMessageLimitListener(2);
            }

            public function dispatch(object $event): object
            {
                if ($event instanceof WorkerRunningEvent) {
                    $this->listener->onWorkerRunning($event);
                }

                return $event;
            }
        };

        
if (null !== $this->resetServicesListener && !$input->getOption('no-reset')) {
            $this->eventDispatcher->addSubscriber($this->resetServicesListener);
        }

        $stopsWhen = [];
        if (null !== $limit = $input->getOption('limit')) {
            if (!is_numeric($limit) || 0 >= $limit) {
                throw new InvalidOptionException(sprintf('Option "limit" must be a positive integer, "%s" passed.', $limit));
            }

            $stopsWhen[] = "processed {$limit} messages";
            $this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($limit$this->logger));
        }

        if ($failureLimit = $input->getOption('failure-limit')) {
            $stopsWhen[] = "reached {$failureLimit} failed messages";
            $this->eventDispatcher->addSubscriber(new StopWorkerOnFailureLimitListener($failureLimit$this->logger));
        }

        if ($memoryLimit = $input->getOption('memory-limit')) {
            $stopsWhen[] = "exceeded {$memoryLimit} of memory";
            $this->eventDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener($this->convertToBytes($memoryLimit)$this->logger));
        }

        
$dispatcher = new EventDispatcher();
        $bus = new MessageBus([
            new FailedMessageProcessingMiddleware(),
            new SendMessageMiddleware($senderLocator),
            new HandleMessageMiddleware($handlerLocator),
        ]);
        $dispatcher->addSubscriber(new AddErrorDetailsStampListener());
        $dispatcher->addSubscriber(new SendFailedMessageForRetryListener($locator$retryStrategyLocator));

        $dispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener($sendersLocatorFailureTransport));
        $dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));

        $runWorker = function Dstring $transportName) use ($transports$bus$dispatcher): ?\Throwable {
            $throwable = null;
            $failedListener = function DWorkerMessageFailedEvent $event) use (&$throwable) {
                $throwable = $event->getThrowable();
            };
            $dispatcher->addListener(WorkerMessageFailedEvent::class$failedListener);

            $worker = new Worker([$transportName => $transports[$transportName]]$bus$dispatcher);

            $worker->run();

            
class StopWorkerOnMessageLimitListenerTest extends TestCase
{
    /** * @dataProvider countProvider */
    public function testWorkerStopsWhenMaximumCountExceeded(int $max, bool $shouldStop)
    {
        $worker = $this->createMock(Worker::class);
        $worker->expects($shouldStop ? $this->atLeastOnce() : $this->never())->method('stop');
        $event = new WorkerRunningEvent($worker, false);

        $maximumCountListener = new StopWorkerOnMessageLimitListener($max);
        // simulate three messages processed         $maximumCountListener->onWorkerRunning($event);
        $maximumCountListener->onWorkerRunning($event);
        $maximumCountListener->onWorkerRunning($event);
    }

    public static function countProvider(): iterable
    {
        yield [1, true];
        yield [2, true];
        yield [3, true];
        
Home | Imprint | This part of the site doesn't use cookies.