AmqpReceivedStamp example

use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceivedStamp;

/** * @requires extension amqp */
class AmqpReceivedStampTest extends TestCase
{
    public function testStamp()
    {
        $amqpEnvelope = $this->createMock(\AMQPEnvelope::class);

        $stamp = new AmqpReceivedStamp($amqpEnvelope, 'queueName');

        $this->assertSame($amqpEnvelope$stamp->getAmqpEnvelope());
        $this->assertSame('queueName', $stamp->getQueueName());
    }
}
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
    {
        $this->expectException(TransportException::class);
        $serializer = $this->createMock(SerializerInterface::class);
        $amqpEnvelope = $this->createAMQPEnvelope();
        $connection = $this->createMock(Connection::class);
        $connection->method('getQueueNames')->willReturn(['queueName']);
        $connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
        $connection->method('ack')->with($amqpEnvelope, 'queueName')->willThrowException(new \AMQPException());

        $receiver = new AmqpReceiver($connection$serializer);
        $receiver->ack(new Envelope(new \stdClass()[new AmqpReceivedStamp($amqpEnvelope, 'queueName')]));
    }

    public function testItThrowsATransportExceptionIfItCannotRejectMessage()
    {
        $this->expectException(TransportException::class);
        $serializer = $this->createMock(SerializerInterface::class);
        $amqpEnvelope = $this->createAMQPEnvelope();
        $connection = $this->createMock(Connection::class);
        $connection->method('getQueueNames')->willReturn(['queueName']);
        $connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
        $connection->method('nack')->with($amqpEnvelope, 'queueName', \AMQP_NOPARAM)->willThrowException(new \AMQPException());

        
$envelope = $this->serializer->decode([
                'body' => false === $body ? '' : $body, // workaround https://github.com/pdezwart/php-amqp/issues/351                 'headers' => $amqpEnvelope->getHeaders(),
            ]);
        } catch (MessageDecodingFailedException $exception) {
            // invalid message of some type             $this->rejectAmqpEnvelope($amqpEnvelope$queueName);

            throw $exception;
        }

        yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope$queueName));
    }

    public function ack(Envelope $envelope): void
    {
        try {
            $stamp = $this->findAmqpStamp($envelope);

            $this->connection->ack(
                $stamp->getAmqpEnvelope(),
                $stamp->getQueueName()
            );
        }
Home | Imprint | This part of the site doesn't use cookies.