Worker example

public function dispatch(object $event): object
            {
                if ($event instanceof WorkerRunningEvent) {
                    $this->listener->onWorkerRunning($event);
                }

                return $event;
            }
        };

        $worker = new Worker(['transport' => $receiver]$bus$dispatcher, clock: new MockClock());
        $worker->run();

        $this->assertSame($apiMessage$envelopes[0]->getMessage());
        $this->assertSame($ipaMessage$envelopes[1]->getMessage());
        $this->assertCount(1, $envelopes[0]->all(ReceivedStamp::class));
        $this->assertCount(1, $envelopes[0]->all(ConsumedByWorkerStamp::class));
        $this->assertSame('transport', $envelopes[0]->last(ReceivedStamp::class)->getTransportName());

        $this->assertSame(2, $receiver->getAcknowledgeCount());
    }

    
$serializer = new Serializer(
    new SerializerComponent\Serializer([new ObjectNormalizer()new ArrayDenormalizer()]['json' => new JsonEncoder()])
);

$connection = Connection::fromDsn(getenv('DSN'));
$receiver = new AmqpReceiver($connection$serializer);
$eventDispatcher = new EventDispatcher();
$eventDispatcher->addSubscriber(new StopWorkerOnSignalsListener());
$eventDispatcher->addSubscriber(new DispatchPcntlSignalListener());

$worker = new Worker(['the_receiver' => $receiver]new class() implements MessageBusInterface {
    public function dispatch($envelope, array $stamps = []): Envelope
    {
        echo 'Get envelope with message: '.$envelope->getMessage()::class."\n";
        echo sprintf("with stamps: %s\n", json_encode(array_keys($envelope->all()), \JSON_PRETTY_PRINT));

        sleep(30);
        echo "Done.\n";

        return $envelope;
    }
}$eventDispatcher);

$dispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener($sendersLocatorFailureTransport));
        $dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));

        $runWorker = function Dstring $transportName) use ($transports$bus$dispatcher): ?\Throwable {
            $throwable = null;
            $failedListener = function DWorkerMessageFailedEvent $event) use (&$throwable) {
                $throwable = $event->getThrowable();
            };
            $dispatcher->addListener(WorkerMessageFailedEvent::class$failedListener);

            $worker = new Worker([$transportName => $transports[$transportName]]$bus$dispatcher);

            $worker->run();

            $dispatcher->removeListener(WorkerMessageFailedEvent::class$failedListener);

            return $throwable;
        };

        // send the message         $envelope = new Envelope(new DummyMessage('API'));
        $bus->dispatch($envelope);

        
$io->comment("The worker will automatically exit once it has {$stopsWhen}.");
        }

        $io->comment('Quit the worker with CONTROL-C.');

        if (OutputInterface::VERBOSITY_VERBOSE > $output->getVerbosity()) {
            $io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
        }

        $bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;

        $worker = new Worker($receivers$bus$this->eventDispatcher, $this->logger, $rateLimiters);
        $options = [
            'sleep' => $input->getOption('sleep') * 1000000,
        ];
        if ($queues = $input->getOption('queues')) {
            $options['queues'] = $queues;
        }
        $worker->run($options);

        return 0;
    }

    
$shouldHandle = $shouldForce || 'retry' === $io->choice('Please select an action', ['retry', 'delete'], 'retry');

            if ($shouldHandle) {
                return;
            }

            $messageReceivedEvent->shouldHandle(false);
            $receiver->reject($envelope);
        };
        $this->eventDispatcher->addListener(WorkerMessageReceivedEvent::class$listener);

        $worker = new Worker(
            [$failureTransportName => $receiver],
            $this->messageBus,
            $this->eventDispatcher,
            $this->logger
        );

        try {
            $worker->run();
        } finally {
            $this->eventDispatcher->removeListener(WorkerMessageReceivedEvent::class$listener);
        }

        
Home | Imprint | This part of the site doesn't use cookies.