/**
* @throws \AMQPException
*/
private function publishWithDelay(string
$body, array
$headers, int
$delay, AmqpStamp
$amqpStamp = null
): void
{ $routingKey =
$this->
getRoutingKeyForMessage($amqpStamp);
$isRetryAttempt =
$amqpStamp ?
$amqpStamp->
isRetryAttempt() : false;
$this->
setupDelay($delay,
$routingKey,
$isRetryAttempt);
$this->
publishOnExchange( $this->
getDelayExchange(),
$body,
$this->
getRoutingKeyForDelay($delay,
$routingKey,
$isRetryAttempt),
$headers,
$amqpStamp );
} private function publishOnExchange(\AMQPExchange
$exchange, string
$body, string
$routingKey = null, array
$headers =
[], AmqpStamp
$amqpStamp = null
): void
{ $attributes =
$amqpStamp ?
$amqpStamp->
getAttributes() :
[];
$attributes['headers'
] =
array_merge($attributes['headers'
] ??
[],
$headers);