TransportMessageIdStamp example

if (!$transportMessageIdStamp = $envelope->last(TransportMessageIdStamp::class)) {
            throw new LogicException('No TransportMessageIdStamp found on the Envelope.');
        }

        unset($this->queue[$id = $transportMessageIdStamp->getId()]$this->availableAt[$id]);
    }

    public function send(Envelope $envelope): Envelope
    {
        $id = $this->nextId++;
        $envelope = $envelope->with(new TransportMessageIdStamp($id));
        $encodedEnvelope = $this->encode($envelope);
        $this->sent[] = $encodedEnvelope;
        $this->queue[$id] = $encodedEnvelope;

        /** @var DelayStamp|null $delayStamp */
        if ($delayStamp = $envelope->last(DelayStamp::class)) {
            $now = $this->clock?->now() ?? new \DateTimeImmutable();
            $this->availableAt[$id] = $now->modify(sprintf('+%d seconds', $delayStamp->getDelay() / 1000));
        }

        return $envelope;
    }
'body' => $data['body'],
                'headers' => $data['headers'],
            ]);
        } catch (MessageDecodingFailedException $exception) {
            $this->connection->reject($data['id']);

            throw $exception;
        }

        return $envelope->with(
            new DoctrineReceivedStamp($data['id']),
            new TransportMessageIdStamp($data['id'])
        );
    }
}
protected function setUp(): void
    {
        $this->serializer = $this->createMock(SerializerInterface::class);
        $this->transport = new InMemoryTransport();
        $this->serializeTransport = new InMemoryTransport($this->serializer);
    }

    public function testSend()
    {
        $envelope = new Envelope(new \stdClass());
        $this->transport->send($envelope);
        $this->assertEquals([$envelope->with(new TransportMessageIdStamp(1))]$this->transport->getSent());
    }

    public function testSendWithSerialization()
    {
        $envelope = new Envelope(new \stdClass());
        $envelopeDecoded = Envelope::wrap(new DummyMessage('Hello.'));
        $this->serializer
            ->method('encode')
            ->with($this->equalTo($envelope->with(new TransportMessageIdStamp(1))))
            ->willReturn(['foo' => 'ba'])
        ;
        
$suggestions = $tester->complete(['--transport']);
        $this->assertSame(['global_receiver', 'failure_receiver']$suggestions);
    }

    public function testCompleteId()
    {
        $globalFailureReceiverName = 'failure_receiver';

        $receiver = $this->createMock(ListableReceiverInterface::class);
        $receiver->expects($this->once())->method('all')->with(50)->willReturn([
            Envelope::wrap(new \stdClass()[new TransportMessageIdStamp('2ab50dfa1fbf')]),
            Envelope::wrap(new \stdClass()[new TransportMessageIdStamp('78c2da843723')]),
        ]);

        $serviceLocator = $this->createMock(ServiceLocator::class);
        $serviceLocator->expects($this->once())->method('has')->with($globalFailureReceiverName)->willReturn(true);
        $serviceLocator->expects($this->any())->method('get')->with($globalFailureReceiverName)->willReturn($receiver);

        $command = new FailedMessagesRemoveCommand(
            $globalFailureReceiverName,
            $serviceLocator
        );
        

        /** @var SerializerInterface $serializer */
        $serializer = $this->createMock(SerializerInterface::class);
        $serializer
            ->expects($this->never())
            ->method('encode')
        ;
        $transport = $this->factory->createTransport('in-memory://?serialize=false', []$serializer);
        $message = Envelope::wrap(new DummyMessage('Hello.'));
        $transport->send($message);

        $this->assertEquals([$message->with(new TransportMessageIdStamp(1))]$transport->get());
    }

    public function testCreateTransportWithSerializer()
    {
        /** @var SerializerInterface $serializer */
        $serializer = $this->createMock(SerializerInterface::class);
        $message = Envelope::wrap(new DummyMessage('Hello.'));
        $serializer
            ->expects($this->once())
            ->method('encode')
            ->with($this->equalTo($message->with(new TransportMessageIdStamp(1))))
        ;
$suggestions = $tester->complete(['--transport']);
        $this->assertSame(['global_receiver', 'failure_receiver']$suggestions);
    }

    public function testCompleteId()
    {
        $globalFailureReceiverName = 'failure_receiver';

        $receiver = $this->createMock(ListableReceiverInterface::class);
        $receiver->expects($this->once())->method('all')->with(50)->willReturn([
            Envelope::wrap(new \stdClass()[new TransportMessageIdStamp('2ab50dfa1fbf')]),
            Envelope::wrap(new \stdClass()[new TransportMessageIdStamp('78c2da843723')]),
        ]);

        $serviceLocator = $this->createMock(ServiceLocator::class);
        $serviceLocator->expects($this->once())->method('has')->with($globalFailureReceiverName)->willReturn(true);
        $serviceLocator->expects($this->any())->method('get')->with($globalFailureReceiverName)->willReturn($receiver);

        $command = new FailedMessagesRetryCommand(
            $globalFailureReceiverName,
            $serviceLocator,
            $this->createMock(MessageBusInterface::class),
            
/** @var DelayStamp|null $delayStamp */
        $delayStamp = $envelope->last(DelayStamp::class);
        $delay = null !== $delayStamp ? $delayStamp->getDelay() : 0;

        try {
            $id = $this->connection->send($encodedMessage['body']$encodedMessage['headers'] ?? []$delay);
        } catch (DBALException $exception) {
            throw new TransportException($exception->getMessage(), 0, $exception);
        }

        return $envelope->with(new TransportMessageIdStamp($id));
    }
}
protected function tearDown(): void
    {
        putenv($this->colSize ? 'COLUMNS='.$this->colSize : 'COLUMNS');
    }

    public function testBasicRunWithServiceLocator()
    {
        $sentToFailureStamp = new SentToFailureTransportStamp('async');
        $redeliveryStamp = new RedeliveryStamp(0);
        $errorStamp = ErrorDetailsStamp::create(new \Exception('Things are bad!', 123));
        $envelope = new Envelope(new \stdClass()[
            new TransportMessageIdStamp(15),
            $sentToFailureStamp,
            $redeliveryStamp,
            $errorStamp,
        ]);
        $receiver = $this->createMock(ListableReceiverInterface::class);
        $receiver->expects($this->once())->method('find')->with(15)->willReturn($envelope);

        $failureTransportName = 'failure_receiver';
        $serviceLocator = $this->createMock(ServiceLocator::class);
        $serviceLocator->method('has')->with($failureTransportName)->willReturn(true);
        $serviceLocator->method('get')->with($failureTransportName)->willReturn($receiver);

        
public function send(Envelope $envelope): Envelope
    {
        $encodedMessage = $this->serializer->encode($envelope);

        /** @var DelayStamp|null $delayStamp */
        $delayStamp = $envelope->last(DelayStamp::class);
        $delayInMs = null !== $delayStamp ? $delayStamp->getDelay() : 0;

        $id = $this->connection->add($encodedMessage['body']$encodedMessage['headers'] ?? []$delayInMs);

        return $envelope->with(new TransportMessageIdStamp($id));
    }
}
Home | Imprint | This part of the site doesn't use cookies.