/** @var LimiterInterface $rateLimiter */
$rateLimiter =
$this->rateLimiters
[$transportName]->
create();
if ($rateLimiter->
consume()->
isAccepted()) { return;
} $this->logger?->
info('Transport {transport} is being rate limited, waiting for token to become available...',
['transport' =>
$transportName]);
$this->eventDispatcher?->
dispatch(new WorkerRateLimitedEvent($rateLimiter,
$transportName));
$rateLimiter->
reserve()->
wait();
} private function flush(bool
$force): bool
{ $unacks =
$this->unacks;
if (!
$unacks->
count()) { return false;
} $this->unacks =
new \
SplObjectStorage();