getMessageCount example


        try {
            $this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
        } catch (HttpException $e) {
            throw new TransportException($e->getMessage(), 0, $e);
        }
    }

    public function getMessageCount(): int
    {
        try {
            return $this->connection->getMessageCount();
        } catch (HttpException $e) {
            throw new TransportException($e->getMessage(), 0, $e);
        }
    }

    private function findSqsReceivedStamp(Envelope $envelope): AmazonSqsReceivedStamp
    {
        /** @var AmazonSqsReceivedStamp|null $sqsReceivedStamp */
        $sqsReceivedStamp = $envelope->last(AmazonSqsReceivedStamp::class);

        if (null === $sqsReceivedStamp) {
            
$this->execute(getenv('MESSENGER_SQS_DSN'));
    }

    private function execute(string $dsn): void
    {
        $connection = Connection::fromDsn($dsn[]);
        $connection->setup();
        $this->clearSqs($dsn);

        $connection->send('{"message": "Hi"}', ['type' => DummyMessage::class, DummyMessage::class => 'special']);
        $this->assertSame(1, $connection->getMessageCount());

        $wait = 0;
        while ((null === $encoded = $connection->get()) && $wait++ < 200) {
            usleep(5000);
        }

        $this->assertEquals('{"message": "Hi"}', $encoded['body']);
        $this->assertEquals(['type' => DummyMessage::class, DummyMessage::class => 'special']$encoded['headers']);
    }

    private function clearSqs(string $dsn): void
    {
/** @var Envelope[] $envelope */
        $envelope = $redisReceiver->get();
        $this->assertCount(1, $envelope);

        $message = $envelope[0]->getMessage();
        $this->assertInstanceOf(DummyMessage::class$message);
        $this->assertEquals('Hi2', $message->getMessage());
    }

    public function testItCountMessages()
    {
        $this->assertSame(0, $this->connection->getMessageCount());

        $this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]);
        $this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]);
        $this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]);

        $this->assertSame(3, $this->connection->getMessageCount());

        $message = $this->connection->get();
        $this->connection->ack($message['id']);

        $this->assertSame(2, $this->connection->getMessageCount());

        

        $this->getReceiver()->ack($envelope);
    }

    public function reject(Envelope $envelope): void
    {
        $this->getReceiver()->reject($envelope);
    }

    public function getMessageCount(): int
    {
        return $this->getReceiver()->getMessageCount();
    }

    public function all(int $limit = null): iterable
    {
        return $this->getReceiver()->all($limit);
    }

    public function find(mixed $id): ?Envelope
    {
        return $this->getReceiver()->find($id);
    }

    
if (!$this->transportLocator->has($transportName)) {
                $io->warning(sprintf('The "%s" transport does not exist.', $transportName));

                continue;
            }
            $transport = $this->transportLocator->get($transportName);
            if (!$transport instanceof MessageCountAwareInterface) {
                $uncountableTransports[] = $transportName;

                continue;
            }
            $outputTable[] = [$transportName$transport->getMessageCount()];
        }

        $io->table(['Transport', 'Count']$outputTable);

        if ($uncountableTransports) {
            $io->note(sprintf('Unable to get message count for the following transports: "%s".', implode('", "', $uncountableTransports)));
        }

        return 0;
    }
}
'available_at' => $this->formatDateTime(new \DateTimeImmutable('2019-03-15 12:30:00', new \DateTimeZone('UTC'))),
        ]);
        // another available         $this->driverConnection->insert('messenger_messages', [
            'body' => '{"message": "Hi available"}',
            'headers' => json_encode(['type' => DummyMessage::class]),
            'queue_name' => 'default',
            'created_at' => $this->formatDateTime(new \DateTimeImmutable('2019-03-15 12:00:00', new \DateTimeZone('UTC'))),
            'available_at' => $this->formatDateTime(new \DateTimeImmutable('2019-03-15 12:30:00', new \DateTimeZone('UTC'))),
        ]);

        $this->assertSame(2, $this->connection->getMessageCount());
    }

    public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout()
    {
        $this->connection->setup();
        $twoHoursAgo = new \DateTimeImmutable('now -2 hours');
        $this->driverConnection->insert('messenger_messages', [
            'body' => '{"message": "Hi requeued"}',
            'headers' => json_encode(['type' => DummyMessage::class]),
            'queue_name' => 'default',
            'created_at' => $this->formatDateTime(new \DateTimeImmutable('2019-03-15 12:00:00', new \DateTimeZone('UTC'))),
            

        $this->getReceiver()->ack($envelope);
    }

    public function reject(Envelope $envelope): void
    {
        $this->getReceiver()->reject($envelope);
    }

    public function getMessageCount(): int
    {
        return $this->getReceiver()->getMessageCount();
    }

    public function send(Envelope $envelope): Envelope
    {
        return $this->getSender()->send($envelope);
    }

    public function setup(): void
    {
        try {
            $this->connection->setup();
        }

        $this->connection->ack($this->findBeanstalkdReceivedStamp($envelope)->getId());
    }

    public function reject(Envelope $envelope): void
    {
        $this->connection->reject($this->findBeanstalkdReceivedStamp($envelope)->getId());
    }

    public function getMessageCount(): int
    {
        return $this->connection->getMessageCount();
    }

    private function findBeanstalkdReceivedStamp(Envelope $envelope): BeanstalkdReceivedStamp
    {
        /** @var BeanstalkdReceivedStamp|null $beanstalkdReceivedStamp */
        $beanstalkdReceivedStamp = $envelope->last(BeanstalkdReceivedStamp::class);

        if (null === $beanstalkdReceivedStamp) {
            throw new LogicException('No BeanstalkdReceivedStamp found on the Envelope.');
        }

        

        $this->getReceiver()->ack($envelope);
    }

    public function reject(Envelope $envelope): void
    {
        $this->getReceiver()->reject($envelope);
    }

    public function getMessageCount(): int
    {
        return $this->getReceiver()->getMessageCount();
    }

    public function send(Envelope $envelope): Envelope
    {
        return $this->getSender()->send($envelope);
    }

    private function getReceiver(): BeanstalkdReceiver
    {
        return $this->receiver ??= new BeanstalkdReceiver($this->connection, $this->serializer);
    }

    

        $this->connection->ack($this->findRedisReceivedStamp($envelope)->getId());
    }

    public function reject(Envelope $envelope): void
    {
        $this->connection->reject($this->findRedisReceivedStamp($envelope)->getId());
    }

    public function getMessageCount(): int
    {
        return $this->connection->getMessageCount();
    }

    private function findRedisReceivedStamp(Envelope $envelope): RedisReceivedStamp
    {
        /** @var RedisReceivedStamp|null $redisReceivedStamp */
        $redisReceivedStamp = $envelope->last(RedisReceivedStamp::class);

        if (null === $redisReceivedStamp) {
            throw new LogicException('No RedisReceivedStamp found on the Envelope.');
        }

        
public function testItCanRejectAMessageViaTheReceiver()
    {
        $envelope = new Envelope(new \stdClass());
        $this->receiver->expects($this->once())->method('reject')->with($envelope);
        $this->transport->reject($envelope);
    }

    public function testItCanGetMessageCountViaTheReceiver()
    {
        $messageCount = 15;
        $this->receiver->expects($this->once())->method('getMessageCount')->willReturn($messageCount);
        $this->assertSame($messageCount$this->transport->getMessageCount());
    }

    public function testItCanSendAMessageViaTheSender()
    {
        $envelope = new Envelope(new \stdClass());
        $this->sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope);
        $this->assertSame($envelope$this->transport->send($envelope));
    }

    public function testItCanSetUpTheConnection()
    {
        
$tube = 'baz';

        $count = 51;

        $response = new ArrayResponse('OK', ['current-jobs-ready' => $count]);

        $client = $this->createMock(PheanstalkInterface::class);
        $client->expects($this->once())->method('statsTube')->with($tube)->willReturn($response);

        $connection = new Connection(['tube_name' => $tube]$client);

        $this->assertSame($count$connection->getMessageCount());
    }

    public function testMessageCountWhenABeanstalkdExceptionOccurs()
    {
        $tube = 'baz1234';

        $exception = new ClientException('foobar error');

        $client = $this->createMock(PheanstalkInterface::class);
        $client->expects($this->once())->method('statsTube')->with($tube)->willThrowException($exception);

        
 else {
            if (null !== $lastMessageDecodingFailedStamp) {
                $io->error('The message could not be decoded.');
            }
            $io->writeln(' Re-run command with <info>-vv</info> to see more message & error details.');
        }
    }

    protected function printPendingMessagesMessage(ReceiverInterface $receiver, SymfonyStyle $io): void
    {
        if ($receiver instanceof MessageCountAwareInterface) {
            if (1 === $receiver->getMessageCount()) {
                $io->writeln('There is <comment>1</comment> message pending in the failure transport.');
            } else {
                $io->writeln(sprintf('There are <comment>%d</comment> messages pending in the failure transport.', $receiver->getMessageCount()));
            }
        }
    }

    protected function getReceiver(string $name = null): ReceiverInterface
    {
        if (null === $name ??= $this->globalFailureReceiverName) {
            throw new InvalidArgumentException(sprintf('No default failure transport is defined. Available transports are: "%s".', implode('", "', array_keys($this->failureTransports->getProvidedServices()))));
        }

        return $this->getSender()->send($envelope);
    }

    public function setup(): void
    {
        $this->connection->setup();
    }

    public function getMessageCount(): int
    {
        return $this->getReceiver()->getMessageCount();
    }

    private function getReceiver(): AmqpReceiver
    {
        return $this->receiver ??= new AmqpReceiver($this->connection, $this->serializer);
    }

    private function getSender(): AmqpSender
    {
        return $this->sender ??= new AmqpSender($this->connection, $this->serializer);
    }
}

        try {
            $this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId());
        } catch (DBALException $exception) {
            throw new TransportException($exception->getMessage(), 0, $exception);
        }
    }

    public function getMessageCount(): int
    {
        try {
            return $this->connection->getMessageCount();
        } catch (DBALException $exception) {
            throw new TransportException($exception->getMessage(), 0, $exception);
        }
    }

    public function all(int $limit = null): iterable
    {
        try {
            $doctrineEnvelopes = $this->connection->findAll($limit);
        } catch (DBALException $exception) {
            throw new TransportException($exception->getMessage(), 0, $exception);
        }
Home | Imprint | This part of the site doesn't use cookies.