'worker' =>
$worker,
'process_from' =>
$process_from,
] =
$item;
// Each queue will be processed immediately when it is reached for the
// first time, as zero > currentTime will never be true.
if ($process_from >
$this->time->
getCurrentMicroTime()) { $this->
usleep(round($process_from -
$this->time->
getCurrentMicroTime(), 3
) * 1000000
);
} try { $this->
processQueue($queue,
$worker);
} catch (SuspendQueueException
$e) { // Return to this queue after processing other queues if the delay is
// within the threshold.
if ($e->
isDelayable() && ($e->
getDelay() <
$max_wait)) { $item['process_from'
] =
$this->time->
getCurrentMicroTime() +
$e->
getDelay();
// Place this queue back in the stack for processing later.
array_push($queues,
$item);
} }