Skip to content

Commit ee16513

Browse files
[Messenger][Amqp] Do not use redelivery routing key when sending to failure transport
The failure transport uses a delay - the retry routing key from the previous stamp would interfere with publishing to the failure exchange/queue
1 parent b248583 commit ee16513

File tree

3 files changed

+201
-1
lines changed

3 files changed

+201
-1
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpSenderTest.php

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,14 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Bridge\Amqp\Tests\Fixtures\DummyMessage;
16+
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceivedStamp;
1617
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpSender;
1718
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp;
1819
use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection;
1920
use Symfony\Component\Messenger\Envelope;
2021
use Symfony\Component\Messenger\Exception\TransportException;
22+
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
23+
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
2124
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2225

2326
/**
@@ -55,6 +58,29 @@ public function testItSendsTheEncodedMessageUsingARoutingKey()
5558
$sender->send($envelope);
5659
}
5760

61+
public function testItDoesNotUseRetryRoutingKeyWhenSendingToFailureTransport()
62+
{
63+
$envelope = (new Envelope(new DummyMessage('Oy')))->with()
64+
->with(new AmqpReceivedStamp(
65+
$this->createStub(\AMQPEnvelope::class),
66+
'original_receiver'
67+
))
68+
->with(new RedeliveryStamp(1))
69+
->with(new SentToFailureTransportStamp('original_receiver'));
70+
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
71+
72+
$serializer = $this->createStub(SerializerInterface::class);
73+
$serializer->method('encode')->with($envelope)->willReturn($encoded);
74+
75+
$connection = $this->createMock(Connection::class);
76+
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $this->callback(
77+
static fn (AmqpStamp $stamp) => '' === $stamp->getRoutingKey()
78+
));
79+
80+
$sender = new AmqpSender($connection, $serializer);
81+
$sender->send($envelope);
82+
}
83+
5884
public function testItSendsTheEncodedMessageWithoutHeaders()
5985
{
6086
$envelope = new Envelope(new DummyMessage('Oy'));
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\Amqp\Tests\Transport;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Psr\Container\ContainerInterface;
16+
use Symfony\Component\DependencyInjection\ServiceLocator;
17+
use Symfony\Component\EventDispatcher\EventDispatcher;
18+
use Symfony\Component\Messenger\Bridge\Amqp\Tests\Fixtures\DummyMessage;
19+
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransport;
20+
use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection;
21+
use Symfony\Component\Messenger\Envelope;
22+
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
23+
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
24+
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
25+
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
26+
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
27+
use Symfony\Component\Messenger\Handler\HandlersLocator;
28+
use Symfony\Component\Messenger\MessageBus;
29+
use Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware;
30+
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
31+
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
32+
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
33+
use Symfony\Component\Messenger\Stamp\DelayStamp;
34+
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
35+
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
36+
use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
37+
use Symfony\Component\Messenger\Worker;
38+
39+
/**
40+
* @requires extension amqp
41+
*
42+
* @group integration
43+
*/
44+
class FailureTransportIntegrationTest extends TestCase
45+
{
46+
protected function setUp(): void
47+
{
48+
parent::setUp();
49+
50+
if (!getenv('MESSENGER_AMQP_DSN')) {
51+
$this->markTestSkipped('The "MESSENGER_AMQP_DSN" environment variable is required.');
52+
}
53+
}
54+
55+
public function testItDoesNotLoseMessagesFromTheFailedTransport()
56+
{
57+
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
58+
$connection->setup();
59+
$connection->purgeQueues();
60+
61+
$failureConnection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'),
62+
['exchange' => [
63+
'name' => 'failed',
64+
'type' => 'fanout',
65+
], 'queues' => ['failed' => []]]
66+
);
67+
$failureConnection->setup();
68+
$failureConnection->purgeQueues();
69+
70+
$originalTransport = new AmqpTransport($connection);
71+
$failureTransport = new AmqpTransport($failureConnection);
72+
73+
$retryStrategy = new MultiplierRetryStrategy(1, 100, 2);
74+
$retryStrategyLocator = $this->createStub(ContainerInterface::class);
75+
$retryStrategyLocator->method('has')->willReturn(true);
76+
$retryStrategyLocator->method('get')->willReturn($retryStrategy);
77+
78+
$sendersLocatorFailureTransport = new ServiceLocator([
79+
'original' => static fn () => $failureTransport,
80+
]);
81+
82+
$transports = [
83+
'original' => $originalTransport,
84+
'failed' => $failureTransport,
85+
];
86+
87+
$locator = $this->createStub(ContainerInterface::class);
88+
$locator->method('has')->willReturn(true);
89+
$locator->method('get')->willReturnCallback(static fn ($transportName) => $transports[$transportName]);
90+
$senderLocator = new SendersLocator(
91+
[DummyMessage::class => ['original']],
92+
$locator
93+
);
94+
95+
$timesHandled = 0;
96+
97+
$handler = static function () use (&$timesHandled) {
98+
++$timesHandled;
99+
throw new \Exception('Handler failed');
100+
};
101+
102+
$handlerLocator = new HandlersLocator([
103+
DummyMessage::class => [new HandlerDescriptor($handler, ['from_transport' => 'original'])],
104+
]);
105+
106+
$bus = new MessageBus([
107+
new FailedMessageProcessingMiddleware(),
108+
new SendMessageMiddleware($senderLocator),
109+
new HandleMessageMiddleware($handlerLocator),
110+
]);
111+
112+
$dispatcher = new EventDispatcher();
113+
$dispatcher->addSubscriber(new SendFailedMessageForRetryListener($locator, $retryStrategyLocator));
114+
$dispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener(
115+
$sendersLocatorFailureTransport,
116+
$retryStrategyLocator,
117+
));
118+
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
119+
$dispatcher->addSubscriber(new StopWorkerOnTimeLimitListener(2));
120+
121+
$originalTransport->send(Envelope::wrap(new DummyMessage('dummy')));
122+
123+
$runWorker = static function (string $transportName) use ($bus, $dispatcher, $transports): void {
124+
(new Worker(
125+
[$transportName => $transports[$transportName]],
126+
$bus,
127+
$dispatcher,
128+
))->run();
129+
};
130+
131+
$runWorker('original');
132+
$runWorker('original');
133+
$runWorker('failed');
134+
$runWorker('failed');
135+
136+
$this->assertSame(4, $timesHandled);
137+
$failedMessage = $this->waitForFailedMessage($failureTransport, 2);
138+
// 100 delay * 2 multiplier ^ 3 retries = 800 expected delay
139+
$this->assertSame(800, $failedMessage->last(DelayStamp::class)->getDelay());
140+
$this->assertSame(0, $failedMessage->last(RedeliveryStamp::class)->getRetryCount());
141+
$this->assertCount(4, $failedMessage->all(RedeliveryStamp::class));
142+
$this->assertCount(2, $failedMessage->all(SentToFailureTransportStamp::class));
143+
foreach ($failedMessage->all(SentToFailureTransportStamp::class) as $stamp) {
144+
$this->assertSame('original', $stamp->getOriginalReceiverName());
145+
}
146+
}
147+
148+
private function waitForFailedMessage(AmqpTransport $failureTransport, int $timeOutInS): Envelope
149+
{
150+
$start = microtime(true);
151+
while (microtime(true) - $start < $timeOutInS) {
152+
$envelopes = iterator_to_array($failureTransport->get());
153+
if (\count($envelopes) > 0) {
154+
foreach ($envelopes as $envelope) {
155+
$failureTransport->reject($envelope);
156+
}
157+
158+
return $envelopes[0];
159+
}
160+
usleep(100 * 1000);
161+
}
162+
throw new \RuntimeException('Message was not received from failure transport within expected timeframe.');
163+
}
164+
}

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Symfony\Component\Messenger\Exception\TransportException;
1616
use Symfony\Component\Messenger\Stamp\DelayStamp;
1717
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
18+
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
1819
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
1920
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2021
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -59,7 +60,7 @@ public function send(Envelope $envelope): Envelope
5960
$amqpStamp = AmqpStamp::createFromAmqpEnvelope(
6061
$amqpReceivedStamp->getAmqpEnvelope(),
6162
$amqpStamp,
62-
$envelope->last(RedeliveryStamp::class) ? $amqpReceivedStamp->getQueueName() : null
63+
$this->getRetryRoutingKey($envelope, $amqpReceivedStamp)
6364
);
6465
}
6566

@@ -76,4 +77,13 @@ public function send(Envelope $envelope): Envelope
7677

7778
return $envelope;
7879
}
80+
81+
private function getRetryRoutingKey(Envelope $envelope, AmqpReceivedStamp $amqpReceivedStamp): ?string
82+
{
83+
if (1 === \count($envelope->all(SentToFailureTransportStamp::class))) {
84+
return null;
85+
}
86+
87+
return $envelope->last(RedeliveryStamp::class) ? $amqpReceivedStamp->getQueueName() : null;
88+
}
7989
}

0 commit comments

Comments
 (0)