$envelope =
$this->serializer->
decode([ 'body' => false ===
$body ? '' :
$body, // workaround https://github.com/pdezwart/php-amqp/issues/351
'headers' =>
$amqpEnvelope->
getHeaders(),
]);
} catch (MessageDecodingFailedException
$exception) { // invalid message of some type
$this->
rejectAmqpEnvelope($amqpEnvelope,
$queueName);
throw $exception;
} yield $envelope->
with(new AmqpReceivedStamp($amqpEnvelope,
$queueName));
} public function ack(Envelope
$envelope): void
{ try { $stamp =
$this->
findAmqpStamp($envelope);
$this->connection->
ack( $stamp->
getAmqpEnvelope(),
$stamp->
getQueueName() );
}