$this->
getRoutingKeyForMessage($amqpStamp),
$headers,
$amqpStamp );
} /**
* Returns an approximate count of the messages in defined queues.
*/
public function countMessagesInQueues(): int
{ return array_sum(array_map(fn ($queueName) =>
$this->
queue($queueName)->
declareQueue(),
$this->
getQueueNames()));
} /**
* @throws \AMQPException
*/
private function publishWithDelay(string
$body, array
$headers, int
$delay, AmqpStamp
$amqpStamp = null
): void
{ $routingKey =
$this->
getRoutingKeyForMessage($amqpStamp);
$isRetryAttempt =
$amqpStamp ?
$amqpStamp->
isRetryAttempt() : false;
$this->
setupDelay($delay,
$routingKey,
$isRetryAttempt);