Acknowledger example

/** * @param Acknowledger|null $ack The function to call to ack/nack the $message. * The message should be handled synchronously when null. * * @return mixed The number of pending messages in the batch if $ack is not null, * the result from handling the message otherwise */
    private function handle(object $message, ?Acknowledger $ack): mixed
    {
        if (null === $ack) {
            $ack = new Acknowledger(get_debug_type($this));
            $this->jobs[] = [$message$ack];
            $this->flush(true);

            return $ack->getResult();
        }

        $this->jobs[] = [$message$ack];
        if (!$this->shouldFlush()) {
            return \count($this->jobs);
        }

        
if ($this->messageHasAlreadyBeenHandled($envelope$handlerDescriptor)) {
                $alreadyHandled = true;
                continue;
            }

            try {
                $handler = $handlerDescriptor->getHandler();
                $batchHandler = $handlerDescriptor->getBatchHandler();

                /** @var AckStamp $ackStamp */
                if ($batchHandler && $ackStamp = $envelope->last(AckStamp::class)) {
                    $ack = new Acknowledger(get_debug_type($batchHandler)static function D\Throwable $e = null, $result = null) use ($envelope$ackStamp$handlerDescriptor) {
                        if (null !== $e) {
                            $e = new HandlerFailedException($envelope[$e]);
                        } else {
                            $envelope = $envelope->with(HandledStamp::fromDescriptor($handlerDescriptor$result));
                        }

                        $ackStamp->ack($envelope$e);
                    });

                    $result = $this->callHandler($handler$message$ack$envelope->last(HandlerArgumentsStamp::class));

                    
Home | Imprint | This part of the site doesn't use cookies.