public function countMessagesInQueues(): int
{ return array_sum(array_map(fn ($queueName) =>
$this->
queue($queueName)->
declareQueue(),
$this->
getQueueNames()));
} /**
* @throws \AMQPException
*/
private function publishWithDelay(string
$body, array
$headers, int
$delay, AmqpStamp
$amqpStamp = null
): void
{ $routingKey =
$this->
getRoutingKeyForMessage($amqpStamp);
$isRetryAttempt =
$amqpStamp ?
$amqpStamp->
isRetryAttempt() : false;
$this->
setupDelay($delay,
$routingKey,
$isRetryAttempt);
$this->
publishOnExchange( $this->
getDelayExchange(),
$body,
$this->
getRoutingKeyForDelay($delay,
$routingKey,
$isRetryAttempt),
$headers,
$amqpStamp );
}