throw new TransportException('Could not add a message to the redis stream.'
);
} break;
} $decodedQueuedMessage =
json_decode($queuedMessage, true
);
$this->
add(\
array_key_exists('body',
$decodedQueuedMessage) ?
$decodedQueuedMessage['body'
] :
$queuedMessage,
$decodedQueuedMessage['headers'
] ??
[], 0
);
} if (!
$this->couldHavePendingMessages &&
$this->nextClaim <=
microtime(true
)) { $this->
claimOldPendingMessages();
} $messageId = '>'; // will receive new messages
if ($this->couldHavePendingMessages
) { $messageId = '0'; // will receive consumers pending messages
} $redis =
$this->
getRedis();
try { $messages =
$redis->
xreadgroup(