// first time, as zero > currentTime will never be true.
if ($process_from >
$this->time->
getCurrentMicroTime()) { $this->
usleep(round($process_from -
$this->time->
getCurrentMicroTime(), 3
) * 1000000
);
} try { $this->
processQueue($queue,
$worker);
} catch (SuspendQueueException
$e) { // Return to this queue after processing other queues if the delay is
// within the threshold.
if ($e->
isDelayable() && ($e->
getDelay() <
$max_wait)) { $item['process_from'
] =
$this->time->
getCurrentMicroTime() +
$e->
getDelay();
// Place this queue back in the stack for processing later.
array_push($queues,
$item);
} } // Reorder the queue by next 'process_from' timestamp.
usort($queues,
function Darray
$queueA, array
$queueB) { return $queueA['process_from'
] <=>
$queueB['process_from'
];
});
} }