getTransportNames example

/** * @author Oleg Krasavin <okwinza@gmail.com> */
class WorkerMetadataTest extends TestCase
{
    public function testItReturnsDefaultValuesIfNoneProvided()
    {
        $metadata = new WorkerMetadata([]);

        $this->assertNull($metadata->getQueueNames());
        $this->assertSame([]$metadata->getTransportNames());
    }

    public function testItReturnsProvidedMetadata()
    {
        $data = [
            'queueNames' => ['c', 'b', 'a'],
            'transportNames' => ['a', 'b', 'c'],
        ];

        $metadata = new WorkerMetadata($data);

        
public function testMessengerTransportStampViaHeader()
    {
        $l = new MessengerTransportListener();
        $envelope = new Envelope(new Address('sender@example.com')[new Address('recipient@example.com')]);
        $headers = (new Headers())->addTextHeader('X-Bus-Transport', 'async');
        $message = new Message($headers);
        $event = new MessageEvent($message$envelope, 'smtp', true);
        $l->onMessage($event);
        $this->assertCount(1, $event->getStamps());
        /* @var TransportNamesStamp $stamp */
        $this->assertInstanceOf(TransportNamesStamp::class$stamp = $event->getStamps()[0]);
        $this->assertSame(['async']$stamp->getTransportNames());
        $this->assertFalse($message->getHeaders()->has('X-Bus-Transport'));
    }

    public function testMessengerTransportStampsViaHeader()
    {
        $l = new MessengerTransportListener();
        $envelope = new Envelope(new Address('sender@example.com')[new Address('recipient@example.com')]);
        $name = 'söme_very_long_and_weïrd transport name-for-messenger!';
        $headers = (new Headers())->addTextHeader('X-Bus-Transport', ' async , async1,'.$name);
        $message = new Message($headers);
        $event = new MessageEvent($message$envelope, 'smtp', true);
        

    public function __construct(array $sendersMap, ContainerInterface $sendersLocator)
    {
        $this->sendersMap = $sendersMap;
        $this->sendersLocator = $sendersLocator;
    }

    public function getSenders(Envelope $envelope): iterable
    {
        if ($envelope->all(TransportNamesStamp::class)) {
            foreach ($envelope->last(TransportNamesStamp::class)->getTransportNames() as $senderAlias) {
                yield from $this->getSenderFromAlias($senderAlias);
            }

            return;
        }

        $seen = [];

        foreach (HandlersLocator::listTypes($envelope) as $type) {
            foreach ($this->sendersMap[$type] ?? [] as $senderAlias) {
                if (str_ends_with($type, '*') && $seen) {
                    
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
use Symfony\Component\Serializer\Serializer as SymfonySerializer;

class TransportNamesStampTest extends TestCase
{
    public function testGetSenders()
    {
        $configuredSenders = ['first_transport', 'second_transport', 'other_transport'];
        $stamp = new TransportNamesStamp($configuredSenders);
        $stampSenders = $stamp->getTransportNames();
        $this->assertEquals(\count($configuredSenders), \count($stampSenders));

        foreach ($configuredSenders as $key => $sender) {
            $this->assertSame($sender$stampSenders[$key]);
        }
    }

    public function testDeserialization()
    {
        $stamp = new TransportNamesStamp(['foo']);
        $serializer = new Serializer(
            
unset($unacks[$batchHandler]$batchHandler);
            } catch (\Throwable $e) {
                $this->acks[] = [$transportName$envelope$e];
            }
        }

        return $this->ack();
    }

    public function stop(): void
    {
        $this->logger?->info('Stopping worker.', ['transport_names' => $this->metadata->getTransportNames()]);

        $this->shouldStop = true;
    }

    public function getMetadata(): WorkerMetadata
    {
        return $this->metadata;
    }
}
$dispatcher = new EventDispatcher();
        $dispatcher->addListener(WorkerRunningEvent::classfunction DWorkerRunningEvent $event) {
            $event->getWorker()->stop();
        });

        $worker = new Worker(['dummyReceiver' => $receiver]$bus$dispatcher, clock: new MockClock());
        $worker->run(['queues' => ['queue1', 'queue2']]);

        $workerMetadata = $worker->getMetadata();

        $this->assertSame(['queue1', 'queue2']$workerMetadata->getQueueNames());
        $this->assertSame(['dummyReceiver']$workerMetadata->getTransportNames());
    }

    public function testTimeoutIsConfigurable()
    {
        $apiMessage = new DummyMessage('API');
        $receiver = new DummyReceiver([
            [new Envelope($apiMessage)new Envelope($apiMessage)],
            [], // will cause a wait             [], // will cause a wait             [new Envelope($apiMessage)],
            [new Envelope($apiMessage)],
            [],
if (null === $signals && \defined('SIGTERM')) {
            $signals = [\SIGTERM, \SIGINT];
        }
        $this->signals = $signals ?? [];
        $this->logger = $logger;
    }

    public function onWorkerStarted(WorkerStartedEvent $event): void
    {
        foreach ($this->signals as $signal) {
            pcntl_signal($signalfunction D) use ($event$signal) {
                $this->logger?->info('Received signal {signal}.', ['signal' => $signal, 'transport_names' => $event->getWorker()->getMetadata()->getTransportNames()]);

                $event->getWorker()->stop();
            });
        }
    }

    public static function getSubscribedEvents(): array
    {
        if (!\function_exists('pcntl_signal')) {
            return [];
        }

        
Home | Imprint | This part of the site doesn't use cookies.