AmqpStamp example

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp;

/** * @requires extension amqp */
class AmqpStampTest extends TestCase
{
    public function testRoutingKeyOnly()
    {
        $stamp = new AmqpStamp('routing_key');
        $this->assertSame('routing_key', $stamp->getRoutingKey());
        $this->assertSame(\AMQP_NOPARAM, $stamp->getFlags());
        $this->assertSame([]$stamp->getAttributes());
    }

    public function testFlagsAndAttributes()
    {
        $stamp = new AmqpStamp(null, \AMQP_DURABLE, ['delivery_mode' => 'unknown']);
        $this->assertNull($stamp->getRoutingKey());
        $this->assertSame(\AMQP_DURABLE, $stamp->getFlags());
        $this->assertSame(['delivery_mode' => 'unknown']$stamp->getAttributes());
    }
$envelope = $envelopes[0];
        $newEnvelope = $envelope
            ->with(new DelayStamp(2000))
            ->with(new RedeliveryStamp(1));
        $sender->send($newEnvelope);
        $receiver->ack($envelope);

        // send a 2nd message with a shorter delay and custom routing key         $customRoutingKeyMessage = new DummyMessage('custom routing key');
        $envelopeCustomRoutingKey = new Envelope($customRoutingKeyMessage[
            new DelayStamp(1000),
            new AmqpStamp('my_custom_routing_key'),
        ]);
        $sender->send($envelopeCustomRoutingKey);

        // wait for next message (but max at 3 seconds)         $startTime = microtime(true);
        $envelopes = $this->receiveEnvelopes($receiver, 3);

        // duration should be about 1 second         $this->assertApproximateDuration($startTime, 1);

        // this should be the custom routing key message first

        $factory = new TestAmqpFactory(
            $this->createMock(\AMQPConnection::class),
            $this->createMock(\AMQPChannel::class),
            $this->createMock(\AMQPQueue::class),
            $amqpExchange = $this->createMock(\AMQPExchange::class)
        );

        $amqpExchange->expects($this->once())->method('publish')->with('body', null, \AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y'], 'delivery_mode' => 2, 'timestamp' => time()]);

        $connection = Connection::fromDsn('amqp://localhost', []$factory);
        $connection->publish('body', ['Foo' => 'X'], 0, new AmqpStamp(null, \AMQP_NOPARAM, ['headers' => ['Bar' => 'Y']]));
    }

    public function testAmqpStampDeliveryModeIsUsed()
    {
        $factory = new TestAmqpFactory(
            $this->createMock(\AMQPConnection::class),
            $this->createMock(\AMQPChannel::class),
            $this->createMock(\AMQPQueue::class),
            $amqpExchange = $this->createMock(\AMQPExchange::class)
        );

        
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);

        $connection = $this->createMock(Connection::class);
        $connection->expects($this->once())->method('publish')->with($encoded['body']$encoded['headers']);

        $sender = new AmqpSender($connection$serializer);
        $sender->send($envelope);
    }

    public function testItSendsTheEncodedMessageUsingARoutingKey()
    {
        $envelope = (new Envelope(new DummyMessage('Oy')))->with($stamp = new AmqpStamp('rk'));
        $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];

        $serializer = $this->createMock(SerializerInterface::class);
        $serializer->method('encode')->with($envelope)->willReturn($encoded);

        $connection = $this->createMock(Connection::class);
        $connection->expects($this->once())->method('publish')->with($encoded['body']$encoded['headers'], 0, $stamp);

        $sender = new AmqpSender($connection$serializer);
        $sender->send($envelope);
    }

    
Home | Imprint | This part of the site doesn't use cookies.