ack example

$batchHandler = $handlerDescriptor->getBatchHandler();

                /** @var AckStamp $ackStamp */
                if ($batchHandler && $ackStamp = $envelope->last(AckStamp::class)) {
                    $ack = new Acknowledger(get_debug_type($batchHandler)static function D\Throwable $e = null, $result = null) use ($envelope$ackStamp$handlerDescriptor) {
                        if (null !== $e) {
                            $e = new HandlerFailedException($envelope[$e]);
                        } else {
                            $envelope = $envelope->with(HandledStamp::fromDescriptor($handlerDescriptor$result));
                        }

                        $ackStamp->ack($envelope$e);
                    });

                    $result = $this->callHandler($handler$message$ack$envelope->last(HandlerArgumentsStamp::class));

                    if (!\is_int($result) || 0 > $result) {
                        throw new LogicException(sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".', \is_int($result) ? $result : get_debug_type($result)get_debug_type($batchHandler)));
                    }

                    if (!$ack->isAcknowledged()) {
                        $envelope = $envelope->with(new NoAutoAckStamp($handlerDescriptor));
                    } elseif ($ack->getError()) {
                        
private function getBatchSize(): int
            {
                return 2;
            }

            private function process(array $jobs): void
            {
                $this->processedMessages = array_column($jobs, 0);

                foreach ($jobs as [$job$ack]) {
                    $ack->ack($job);
                }
            }
        };

        $middleware = new HandleMessageMiddleware(new HandlersLocator([
            DummyMessage::class => [new HandlerDescriptor($handler)],
        ]));

        $ackedMessages = [];
        $ack = static function DEnvelope $envelope, \Throwable $e = null) use (&$ackedMessages) {
            if (null !== $e) {
                

        $redis = $this->createMock(\Redis::class);

        $redis->expects($this->exactly(1))->method('xack')
            ->with('queue', 'symfony', ['1'])
            ->willReturn(1);
        $redis->expects($this->exactly(1))->method('xdel')
            ->with('queue', ['1'])
            ->willReturn(1);

        $connection = Connection::fromDsn('redis://localhost/queue', []$redis);
        $connection->ack('1');
    }

    public function testDeleteAfterReject()
    {
        $redis = $this->createMock(\Redis::class);

        $redis->expects($this->exactly(1))->method('xack')
            ->with('queue', 'symfony', ['1'])
            ->willReturn(1);
        $redis->expects($this->exactly(1))->method('xdel')
            ->with('queue', ['1'])
            
 catch (MessageDecodingFailedException $exception) {
            $this->connection->reject($message['id']);

            throw $exception;
        }

        return [$envelope->with(new RedisReceivedStamp($message['id']))];
    }

    public function ack(Envelope $envelope): void
    {
        $this->connection->ack($this->findRedisReceivedStamp($envelope)->getId());
    }

    public function reject(Envelope $envelope): void
    {
        $this->connection->reject($this->findRedisReceivedStamp($envelope)->getId());
    }

    public function getMessageCount(): int
    {
        return $this->connection->getMessageCount();
    }

    
$this->connection = $connection;
        $this->serializer = $serializer ?? new PhpSerializer();
    }

    public function get(): iterable
    {
        return $this->getReceiver()->get();
    }

    public function ack(Envelope $envelope): void
    {
        $this->getReceiver()->ack($envelope);
    }

    public function reject(Envelope $envelope): void
    {
        $this->getReceiver()->reject($envelope);
    }

    public function send(Envelope $envelope): Envelope
    {
        return $this->getSender()->send($envelope);
    }

    
public function testItCanGetMessagesViaTheReceiver()
    {
        $envelopes = [new Envelope(new \stdClass())new Envelope(new \stdClass())];
        $this->receiver->expects($this->once())->method('get')->willReturn($envelopes);
        $this->assertSame($envelopes$this->transport->get());
    }

    public function testItCanAcknowledgeAMessageViaTheReceiver()
    {
        $envelope = new Envelope(new \stdClass());
        $this->receiver->expects($this->once())->method('ack')->with($envelope);
        $this->transport->ack($envelope);
    }

    public function testItCanRejectAMessageViaTheReceiver()
    {
        $envelope = new Envelope(new \stdClass());
        $this->receiver->expects($this->once())->method('reject')->with($envelope);
        $this->transport->reject($envelope);
    }

    public function testItCanGetMessageCountViaTheReceiver()
    {
        
 catch (MessageDecodingFailedException $exception) {
            $this->connection->reject($beanstalkdEnvelope['id']);

            throw $exception;
        }

        return [$envelope->with(new BeanstalkdReceivedStamp($beanstalkdEnvelope['id']$this->connection->getTube()))];
    }

    public function ack(Envelope $envelope): void
    {
        $this->connection->ack($this->findBeanstalkdReceivedStamp($envelope)->getId());
    }

    public function reject(Envelope $envelope): void
    {
        $this->connection->reject($this->findBeanstalkdReceivedStamp($envelope)->getId());
    }

    public function getMessageCount(): int
    {
        return $this->connection->getMessageCount();
    }

    
throw $exception;
        }

        yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope$queueName));
    }

    public function ack(Envelope $envelope): void
    {
        try {
            $stamp = $this->findAmqpStamp($envelope);

            $this->connection->ack(
                $stamp->getAmqpEnvelope(),
                $stamp->getQueueName()
            );
        } catch (\AMQPException $exception) {
            throw new TransportException($exception->getMessage(), 0, $exception);
        }
    }

    public function reject(Envelope $envelope): void
    {
        $stamp = $this->findAmqpStamp($envelope);

        
1
        );

        // Queue will return the pending message first because redeliver_timeout = 0         $message = $connection->get();
        $this->assertEquals([
            'message' => json_encode([
                'body' => $body1,
                'headers' => $headers,
            ]),
        ]$message['data']);
        $connection->ack($message['id']);

        // Queue will return the second message         $message = $connection->get();
        $this->assertEquals([
            'message' => json_encode([
                'body' => $body2,
                'headers' => $headers,
            ]),
        ]$message['data']);
        $connection->ack($message['id']);
    }

    
$this->assertSame($stamp$envelopes[0]->last(ScheduledStamp::class));
        $this->assertSame('default', $stamp->messageContext->name);
        $this->assertSame('id', $stamp->messageContext->id);
    }

    public function testAckIgnored()
    {
        $transport = new SchedulerTransport($this->createMock(MessageGeneratorInterface::class));

        $this->expectNotToPerformAssertions();
        $transport->ack(new Envelope(new \stdClass()));
    }

    public function testRejectException()
    {
        $transport = new SchedulerTransport($this->createMock(MessageGeneratorInterface::class));

        $this->expectNotToPerformAssertions();
        $transport->reject(new Envelope(new \stdClass()));
    }

    public function testSendException()
    {
$this->connection = $connection;
        $this->serializer = $serializer ?? new PhpSerializer();
    }

    public function get(): iterable
    {
        return $this->getReceiver()->get();
    }

    public function ack(Envelope $envelope): void
    {
        $this->getReceiver()->ack($envelope);
    }

    public function reject(Envelope $envelope): void
    {
        $this->getReceiver()->reject($envelope);
    }

    public function getMessageCount(): int
    {
        return $this->getReceiver()->getMessageCount();
    }

    
$this->serializeTransport->send($envelope);
        $this->assertSame([$envelopeDecoded]$this->serializeTransport->getSent());
    }

    public function testQueue()
    {
        $envelope1 = new Envelope(new \stdClass());
        $envelope1 = $this->transport->send($envelope1);
        $envelope2 = new Envelope(new \stdClass());
        $envelope2 = $this->transport->send($envelope2);
        $this->assertSame([$envelope1$envelope2]$this->transport->get());
        $this->transport->ack($envelope1);
        $this->assertSame([$envelope2]$this->transport->get());
        $this->transport->reject($envelope2);
        $this->assertSame([]$this->transport->get());
    }

    public function testQueueWithDelay()
    {
        $envelope1 = new Envelope(new \stdClass());
        $envelope1 = $this->transport->send($envelope1);
        $envelope2 = (new Envelope(new \stdClass()))->with(new DelayStamp(10_000));
        $envelope2 = $this->transport->send($envelope2);
        

        return $this->getReceiver()->get();
    }

    public function getFromQueues(array $queueNames): iterable
    {
        return $this->getReceiver()->getFromQueues($queueNames);
    }

    public function ack(Envelope $envelope): void
    {
        $this->getReceiver()->ack($envelope);
    }

    public function reject(Envelope $envelope): void
    {
        $this->getReceiver()->reject($envelope);
    }

    public function send(Envelope $envelope): Envelope
    {
        return $this->getSender()->send($envelope);
    }

    

        $id = 123456;

        $tube = 'xyz';

        $client = $this->createMock(PheanstalkInterface::class);
        $client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
        $client->expects($this->once())->method('delete')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id));

        $connection = new Connection(['tube_name' => $tube]$client);

        $connection->ack((string) $id);
    }

    public function testAckWhenABeanstalkdExceptionOccurs()
    {
        $id = 123456;

        $tube = 'xyzw';

        $exception = new ServerException('baz error');

        $client = $this->createMock(PheanstalkInterface::class);
        
$this->receiver = $receiver;
        $this->sender = $sender;
    }

    public function get(): iterable
    {
        return $this->getReceiver()->get();
    }

    public function ack(Envelope $envelope): void
    {
        $this->getReceiver()->ack($envelope);
    }

    public function reject(Envelope $envelope): void
    {
        $this->getReceiver()->reject($envelope);
    }

    public function getMessageCount(): int
    {
        return $this->getReceiver()->getMessageCount();
    }

    
Home | Imprint | This part of the site doesn't use cookies.