AmqpReceiver example



    public function testItSendsAndReceivesMessages()
    {
        $serializer = $this->createSerializer();

        $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
        $connection->setup();
        $connection->purgeQueues();

        $sender = new AmqpSender($connection$serializer);
        $receiver = new AmqpReceiver($connection$serializer);

        $sender->send($first = new Envelope(new DummyMessage('First')));
        $sender->send($second = new Envelope(new DummyMessage('Second')));

        $envelopes = iterator_to_array($receiver->get());
        $this->assertCount(1, $envelopes);
        /** @var Envelope $envelope */
        $envelope = $envelopes[0];
        $this->assertEquals($first->getMessage()$envelope->getMessage());
        $this->assertInstanceOf(AmqpReceivedStamp::class$envelope->last(AmqpReceivedStamp::class));

        
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;

$serializer = new Serializer(
    new SerializerComponent\Serializer([new ObjectNormalizer()new ArrayDenormalizer()]['json' => new JsonEncoder()])
);

$connection = Connection::fromDsn(getenv('DSN'));
$receiver = new AmqpReceiver($connection$serializer);
$eventDispatcher = new EventDispatcher();
$eventDispatcher->addSubscriber(new StopWorkerOnSignalsListener());
$eventDispatcher->addSubscriber(new DispatchPcntlSignalListener());

$worker = new Worker(['the_receiver' => $receiver]new class() implements MessageBusInterface {
    public function dispatch($envelope, array $stamps = []): Envelope
    {
        echo 'Get envelope with message: '.$envelope->getMessage()::class."\n";
        echo sprintf("with stamps: %s\n", json_encode(array_keys($envelope->all()), \JSON_PRETTY_PRINT));

        sleep(30);
        

        $this->connection->setup();
    }

    public function getMessageCount(): int
    {
        return $this->getReceiver()->getMessageCount();
    }

    private function getReceiver(): AmqpReceiver
    {
        return $this->receiver ??= new AmqpReceiver($this->connection, $this->serializer);
    }

    private function getSender(): AmqpSender
    {
        return $this->sender ??= new AmqpSender($this->connection, $this->serializer);
    }
}
public function testItReturnsTheDecodedMessageToTheHandler()
    {
        $serializer = new Serializer(
            new SerializerComponent\Serializer([new ObjectNormalizer()]['json' => new JsonEncoder()])
        );

        $amqpEnvelope = $this->createAMQPEnvelope();
        $connection = $this->createMock(Connection::class);
        $connection->method('getQueueNames')->willReturn(['queueName']);
        $connection->method('get')->with('queueName')->willReturn($amqpEnvelope);

        $receiver = new AmqpReceiver($connection$serializer);
        $actualEnvelopes = iterator_to_array($receiver->get());
        $this->assertCount(1, $actualEnvelopes);
        $this->assertEquals(new DummyMessage('Hi')$actualEnvelopes[0]->getMessage());
    }

    public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
    {
        $this->expectException(TransportException::class);
        $serializer = $this->createMock(SerializerInterface::class);
        $amqpEnvelope = $this->createAMQPEnvelope();
        $connection = $this->createMock(Connection::class);
        
Home | Imprint | This part of the site doesn't use cookies.