getDelay example


        $this->connection = $connection;
        $this->serializer = $serializer ?? new PhpSerializer();
    }

    public function send(Envelope $envelope): Envelope
    {
        $encodedMessage = $this->serializer->encode($envelope);

        /** @var DelayStamp|null $delayStamp */
        $delayStamp = $envelope->last(DelayStamp::class);
        $delay = $delayStamp ? $delayStamp->getDelay() : 0;

        /** @var AmqpStamp|null $amqpStamp */
        $amqpStamp = $envelope->last(AmqpStamp::class);
        if (isset($encodedMessage['headers']['Content-Type'])) {
            $contentType = $encodedMessage['headers']['Content-Type'];
            unset($encodedMessage['headers']['Content-Type']);

            if (!$amqpStamp || !isset($amqpStamp->getAttributes()['content_type'])) {
                $amqpStamp = AmqpStamp::createWithAttributes(['content_type' => $contentType]$amqpStamp);
            }
        }

        
public function send(Envelope $envelope): Envelope
    {
        $id = $this->nextId++;
        $envelope = $envelope->with(new TransportMessageIdStamp($id));
        $encodedEnvelope = $this->encode($envelope);
        $this->sent[] = $encodedEnvelope;
        $this->queue[$id] = $encodedEnvelope;

        /** @var DelayStamp|null $delayStamp */
        if ($delayStamp = $envelope->last(DelayStamp::class)) {
            $now = $this->clock?->now() ?? new \DateTimeImmutable();
            $this->availableAt[$id] = $now->modify(sprintf('+%d seconds', $delayStamp->getDelay() / 1000));
        }

        return $envelope;
    }

    /** * @return void */
    public function reset()
    {
        $this->sent = $this->queue = $this->rejected = $this->acknowledged = [];
    }


                if (false === $shouldRetry) {
                    yield from $this->passthru($context$firstChunk$content$chunk);

                    return;
                }
            }

            $context->getResponse()->cancel();

            $delay = $this->getDelayFromHeader($context->getHeaders()) ?? $this->strategy->getDelay($context, !$exception && $chunk->isLast() ? $content : null, $exception);
            ++$retryCount;
            $content = '';
            $firstChunk = null;

            $this->logger?->info('Try #{count} after {delay}ms'.($exception ? ': '.$exception->getMessage() : ', status code: '.$context->getStatusCode())[
                'count' => $retryCount,
                'delay' => $delay,
            ]);

            $context->setInfo('retry_count', $retryCount);
            $context->replaceRequest($method$url, self::shiftBaseUri($options$baseUris));
            

        $this->connection = $connection;
        $this->serializer = $serializer;
    }

    public function send(Envelope $envelope): Envelope
    {
        $encodedMessage = $this->serializer->encode($envelope);

        /** @var DelayStamp|null $delayStamp */
        $delayStamp = $envelope->last(DelayStamp::class);
        $delayInMs = null !== $delayStamp ? $delayStamp->getDelay() : 0;

        $id = $this->connection->add($encodedMessage['body']$encodedMessage['headers'] ?? []$delayInMs);

        return $envelope->with(new TransportMessageIdStamp($id));
    }
}
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Stamp\DelayStamp;

/** * @author Yanick Witschi <yanick.witschi@terminal42.ch> */
class DelayStampTest extends TestCase
{
    public function testDelayFor()
    {
        $stamp = DelayStamp::delayFor(\DateInterval::createFromDateString('30 seconds'));
        $this->assertSame(30000, $stamp->getDelay());
        $stamp = DelayStamp::delayFor(\DateInterval::createFromDateString('30 minutes'));
        $this->assertSame(1800000, $stamp->getDelay());
        $stamp = DelayStamp::delayFor(\DateInterval::createFromDateString('30 hours'));
        $this->assertSame(108000000, $stamp->getDelay());

        $stamp = DelayStamp::delayFor(\DateInterval::createFromDateString('-5 seconds'));
        $this->assertSame(-5000, $stamp->getDelay());
    }

    public function testDelayUntil()
    {
        
public function testRecoverableStrategyCausesRetry()
    {
        $sender = $this->createMock(SenderInterface::class);
        $sender->expects($this->once())->method('send')->willReturnCallback(function DEnvelope $envelope) {
            /** @var DelayStamp $delayStamp */
            $delayStamp = $envelope->last(DelayStamp::class);
            /** @var RedeliveryStamp $redeliveryStamp */
            $redeliveryStamp = $envelope->last(RedeliveryStamp::class);

            $this->assertInstanceOf(DelayStamp::class$delayStamp);
            $this->assertSame(1000, $delayStamp->getDelay());

            $this->assertInstanceOf(RedeliveryStamp::class$redeliveryStamp);
            $this->assertSame(1, $redeliveryStamp->getRetryCount());

            return $envelope;
        });
        $senderLocator = $this->createMock(ContainerInterface::class);
        $senderLocator->expects($this->once())->method('has')->willReturn(true);
        $senderLocator->expects($this->once())->method('get')->willReturn($sender);
        $retryStategy = $this->createMock(RetryStrategyInterface::class);
        $retryStategy->expects($this->never())->method('isRetryable');
        
yield ['POST', 200, new TransportException()];
        yield ['POST', 500, null];
    }

    /** * @dataProvider provideDelay */
    public function testGetDelay(int $delay, int $multiplier, int $maxDelay, int $previousRetries, int $expectedDelay)
    {
        $strategy = new GenericRetryStrategy([]$delay$multiplier$maxDelay, 0);

        self::assertSame($expectedDelay$strategy->getDelay($this->getContext($previousRetries, 'GET', 'http://example.com/', 200), null, null));
    }

    public static function provideDelay(): iterable
    {
        // delay, multiplier, maxDelay, previousRetries, expectedDelay         yield [1000, 1, 5000, 0, 1000];
        yield [1000, 1, 5000, 1, 1000];
        yield [1000, 1, 5000, 2, 1000];

        yield [1000, 2, 10000, 0, 1000];
        yield [1000, 2, 10000, 1, 2000];
        

class QueueExceptionsTest extends UnitTestCase {

  /** * Tests that the `DelayedRequeueException` calls parent constructor. */
  public function testDelayedRequeueExceptionCallsParentConstructor(): void {
    $without_previous = new DelayedRequeueException(50, 'Delay the processing.');
    static::assertSame(50, $without_previous->getDelay());
    static::assertSame('Delay the processing.', $without_previous->getMessage());
    static::assertSame(0, $without_previous->getCode());
    static::assertNull($without_previous->getPrevious());

    $with_previous = new DelayedRequeueException(100, 'Increase the delay.', 200, $without_previous);
    static::assertSame(100, $with_previous->getDelay());
    static::assertSame('Increase the delay.', $with_previous->getMessage());
    static::assertSame(200, $with_previous->getCode());
    static::assertSame($without_previous$with_previous->getPrevious());
  }

}

        $this->connection = $connection;
        $this->serializer = $serializer ?? new PhpSerializer();
    }

    public function send(Envelope $envelope): Envelope
    {
        $encodedMessage = $this->serializer->encode($envelope);

        /** @var DelayStamp|null $delayStamp */
        $delayStamp = $envelope->last(DelayStamp::class);
        $delayInMs = null !== $delayStamp ? $delayStamp->getDelay() : 0;

        $this->connection->send($encodedMessage['body']$encodedMessage['headers'] ?? []$delayInMs);

        return $envelope;
    }
}

        $this->connection = $connection;
        $this->serializer = $serializer ?? new PhpSerializer();
    }

    public function send(Envelope $envelope): Envelope
    {
        $encodedMessage = $this->serializer->encode($envelope);

        /** @var DelayStamp|null $delayStamp */
        $delayStamp = $envelope->last(DelayStamp::class);
        $delay = null !== $delayStamp ? $delayStamp->getDelay() : 0;

        try {
            $id = $this->connection->send($encodedMessage['body']$encodedMessage['headers'] ?? []$delay);
        } catch (DBALException $exception) {
            throw new TransportException($exception->getMessage(), 0, $exception);
        }

        return $envelope->with(new TransportMessageIdStamp($id));
    }
}

        $this->connection = $connection;
        $this->serializer = $serializer;
    }

    public function send(Envelope $envelope): Envelope
    {
        $encodedMessage = $this->serializer->encode($envelope);

        /** @var DelayStamp|null $delayStamp */
        $delayStamp = $envelope->last(DelayStamp::class);
        $delay = null !== $delayStamp ? (int) ceil($delayStamp->getDelay() / 1000) : 0;

        $messageGroupId = null;
        $messageDeduplicationId = null;

        /** @var AmazonSqsFifoStamp|null $amazonSqsFifoStamp */
        $amazonSqsFifoStamp = $envelope->last(AmazonSqsFifoStamp::class);
        if (null !== $amazonSqsFifoStamp) {
            $messageGroupId = $amazonSqsFifoStamp->getMessageGroupId();
            $messageDeduplicationId = $amazonSqsFifoStamp->getMessageDeduplicationId();
        }

        
      // first time, as zero > currentTime will never be true.       if ($process_from > $this->time->getCurrentMicroTime()) {
        $this->usleep(round($process_from - $this->time->getCurrentMicroTime(), 3) * 1000000);
      }

      try {
        $this->processQueue($queue$worker);
      }
      catch (SuspendQueueException $e) {
        // Return to this queue after processing other queues if the delay is         // within the threshold.         if ($e->isDelayable() && ($e->getDelay() < $max_wait)) {
          $item['process_from'] = $this->time->getCurrentMicroTime() + $e->getDelay();
          // Place this queue back in the stack for processing later.           array_push($queues$item);
        }
      }

      // Reorder the queue by next 'process_from' timestamp.       usort($queuesfunction Darray $queueA, array $queueB) {
        return $queueA['process_from'] <=> $queueB['process_from'];
      });
    }
  }
Home | Imprint | This part of the site doesn't use cookies.