MessageBus example

return $envelope;
            })
        ;

        $middleware2 = $this->createMock(MiddlewareInterface::class);
        $middleware2
            ->expects($this->exactly(2))
            ->method('handle')
            ->willReturnCallback(fn (Envelope $envelope, StackInterface $stack): Envelope => $envelope)
        ;

        $bus = new MessageBus([$middleware1$middleware2]);

        $bus->dispatch(new \stdClass());
    }
}
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\BusNameStamp;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Tests\Fixtures\AnEnvelopeStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;

class MessageBusTest extends TestCase
{
    public function testItHasTheRightInterface()
    {
        $bus = new MessageBus();

        $this->assertInstanceOf(MessageBusInterface::class$bus);
    }

    public function testItCallsMiddleware()
    {
        $message = new DummyMessage('Hello');
        $envelope = new Envelope($message);

        $firstMiddleware = $this->createMock(MiddlewareInterface::class);
        $firstMiddleware->expects($this->once())
            
$receiver = new DummyReceiver([
            [new Envelope($expectedMessages[0])],
            [new Envelope($expectedMessages[1])],
        ]);

        $handler = new DummyBatchHandler();

        $middleware = new HandleMessageMiddleware(new HandlersLocator([
            DummyMessage::class => [new HandlerDescriptor($handler)],
        ]));

        $bus = new MessageBus([$middleware]);

        $dispatcher = new EventDispatcher();
        $dispatcher->addListener(WorkerRunningEvent::classfunction DWorkerRunningEvent $event) use ($receiver) {
            static $i = 0;
            if (1 < ++$i) {
                $event->getWorker()->stop();
                $this->assertSame(2, $receiver->getAcknowledgeCount());
            } else {
                $this->assertSame(0, $receiver->getAcknowledgeCount());
            }
        });

        
public function testEventsInNewTransactionAreHandledAfterMainMessage()
    {
        $message = new DummyMessage('Hello');

        $firstEvent = new DummyEvent('First event');
        $secondEvent = new DummyEvent('Second event');
        $thirdEvent = new DummyEvent('Third event');

        $middleware = new DispatchAfterCurrentBusMiddleware();
        $handlingMiddleware = $this->createMock(MiddlewareInterface::class);

        $eventBus = new MessageBus([
            $middleware,
            $handlingMiddleware,
        ]);

        $messageBus = new MessageBus([
            $middleware,
            new DispatchingMiddleware($eventBus[
                new Envelope($firstEvent[new DispatchAfterCurrentBusStamp()]),
                new Envelope($secondEvent[new DispatchAfterCurrentBusStamp()]),
                $thirdEvent, // Not in a new transaction             ]),
            
new HandlerDescriptor($allTransportHandlerThatWorks[
                    'alias' => 'handler_that_works1',
                ]),
                new HandlerDescriptor($transport2HandlerThatWorks[
                    'from_transport' => 'transport2',
                    'alias' => 'handler_that_works2',
                ]),
            ],
        ]);

        $dispatcher = new EventDispatcher();
        $bus = new MessageBus([
            new FailedMessageProcessingMiddleware(),
            new SendMessageMiddleware($senderLocator),
            new HandleMessageMiddleware($handlerLocator),
        ]);
        $dispatcher->addSubscriber(new AddErrorDetailsStampListener());
        $dispatcher->addSubscriber(new SendFailedMessageForRetryListener($locator$retryStrategyLocator));

        $dispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener($sendersLocatorFailureTransport));
        $dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));

        $runWorker = function Dstring $transportName) use ($transports$bus$dispatcher): ?\Throwable {
            
Home | Imprint | This part of the site doesn't use cookies.