Skip to content

[Messenger] Add --exclude-queues consume parameters #60979

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: 7.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ protected function configure(): void
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'),
new InputOption('exclude-transports', 'et', InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Exclude specific receivers/transports from consumption (can only be used with --all)'),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following previous conversation, the option --receivers describes which receivers/transports to consume, but here --exclude-transports describes which receivers/transports to not consume.

I feel like we should either have --receivers and --exclude-receivers, or --transports and --exclude-transports, but not both mixed.

I think the best option here is to go with --exclude-receivers (like your initial suggestion 😇).

new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL),
])
->setHelp(<<<'EOF'
Expand Down Expand Up @@ -122,6 +123,10 @@ protected function configure(): void
Use the --all option to consume from all receivers:

<info>php %command.full_name% --all</info>

Use the --exclude-transports option to exclude specific receivers/transports from consumption (can only be used with --all):

<info>php %command.full_name% --all --exclude-transports=queue1</info>
EOF
)
;
Expand All @@ -132,6 +137,10 @@ protected function initialize(InputInterface $input, OutputInterface $output): v
if ($input->hasParameterOption('--keepalive')) {
$this->getApplication()->setAlarmInterval((int) ($input->getOption('keepalive') ?? self::DEFAULT_KEEPALIVE_INTERVAL));
}

if ($input->getOption('exclude-transports') && !$input->getOption('all')) {
throw new InvalidOptionException('The --exclude-transports option can only be used with the --all option.');
}
}

protected function interact(InputInterface $input, OutputInterface $output): void
Expand Down Expand Up @@ -169,9 +178,22 @@ protected function interact(InputInterface $input, OutputInterface $output): voi

protected function execute(InputInterface $input, OutputInterface $output): int
{
if ($input->getOption('exclude-transports') && !$input->getOption('all')) {
throw new InvalidOptionException('The --exclude-transports option can only be used with the --all option.');
}

$receivers = [];
$rateLimiters = [];
$receiverNames = $input->getOption('all') ? $this->receiverNames : $input->getArgument('receivers');

if ($input->getOption('all') && $excludedTransports = $input->getOption('exclude-transports')) {
$receiverNames = array_diff($receiverNames, $excludedTransports);

if (empty($receiverNames)) {
throw new RuntimeException('All transports/receivers have been excluded. Please specify at least one to consume from.');
}
}

foreach ($receiverNames as $receiverName) {
if (!$this->receiverLocator->has($receiverName)) {
$message = \sprintf('The receiver "%s" does not exist.', $receiverName);
Expand Down Expand Up @@ -276,6 +298,10 @@ public function complete(CompletionInput $input, CompletionSuggestions $suggesti
if ($input->mustSuggestOptionValuesFor('bus')) {
$suggestions->suggestValues($this->busIds);
}

if ($input->mustSuggestOptionValuesFor('exclude-transports')) {
$suggestions->suggestValues($this->receiverNames);
}
}

public function getSubscribedSignals(): array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,4 +334,156 @@ public static function provideCompletionSuggestions()
yield 'receiver (no repeat)' => [['async', ''], ['async_high', 'failed']];
yield 'option --bus' => [['--bus', ''], ['messenger.bus.default']];
}

public function testRunWithExcludeTransportsOption()
{
$envelope1 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
$envelope2 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
$envelope3 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);

$receiver1 = $this->createMock(ReceiverInterface::class);
$receiver1->method('get')->willReturn([$envelope1]);
$receiver2 = $this->createMock(ReceiverInterface::class);
$receiver2->method('get')->willReturn([$envelope2]);
$receiver3 = $this->createMock(ReceiverInterface::class);
$receiver3->method('get')->willReturn([$envelope3]);

$receiverLocator = new Container();
$receiverLocator->set('dummy-receiver1', $receiver1);
$receiverLocator->set('dummy-receiver2', $receiver2);
$receiverLocator->set('dummy-receiver3', $receiver3);

$bus = $this->createMock(MessageBusInterface::class);
// Only 2 messages should be dispatched (receiver1 and receiver3, receiver2 is excluded)
$bus->expects($this->exactly(2))->method('dispatch');

$busLocator = new Container();
$busLocator->set('dummy-bus', $bus);

$command = new ConsumeMessagesCommand(
new RoutableMessageBus($busLocator),
$receiverLocator, new EventDispatcher(),
receiverNames: ['dummy-receiver1', 'dummy-receiver2', 'dummy-receiver3']
);

$application = new Application();
if (method_exists($application, 'addCommand')) {
$application->addCommand($command);
} else {
$application->add($command);
}
$tester = new CommandTester($application->get('messenger:consume'));
$tester->execute([
'--all' => true,
'--exclude-transports' => ['dummy-receiver2'],
'--limit' => 2,
]);

$tester->assertCommandIsSuccessful();
$this->assertStringContainsString('[OK] Consuming messages from transports "dummy-receiver1, dummy-receiver3"', $tester->getDisplay());
}

public function testRunWithExcludeTransportsMultipleQueues()
{
$envelope1 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
$envelope2 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
$envelope3 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
$envelope4 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);

$receiver1 = $this->createMock(ReceiverInterface::class);
$receiver1->method('get')->willReturn([$envelope1]);
$receiver2 = $this->createMock(ReceiverInterface::class);
$receiver2->method('get')->willReturn([$envelope2]);
$receiver3 = $this->createMock(ReceiverInterface::class);
$receiver3->method('get')->willReturn([$envelope3]);
$receiver4 = $this->createMock(ReceiverInterface::class);
$receiver4->method('get')->willReturn([$envelope4]);

$receiverLocator = new Container();
$receiverLocator->set('dummy-receiver1', $receiver1);
$receiverLocator->set('dummy-receiver2', $receiver2);
$receiverLocator->set('dummy-receiver3', $receiver3);
$receiverLocator->set('dummy-receiver4', $receiver4);

$bus = $this->createMock(MessageBusInterface::class);
// Only 2 messages should be dispatched (receiver1 and receiver4, receiver2 and receiver3 are excluded)
$bus->expects($this->exactly(2))->method('dispatch');

$busLocator = new Container();
$busLocator->set('dummy-bus', $bus);

$command = new ConsumeMessagesCommand(
new RoutableMessageBus($busLocator),
$receiverLocator, new EventDispatcher(),
receiverNames: ['dummy-receiver1', 'dummy-receiver2', 'dummy-receiver3', 'dummy-receiver4']
);

$application = new Application();
if (method_exists($application, 'addCommand')) {
$application->addCommand($command);
} else {
$application->add($command);
}
$tester = new CommandTester($application->get('messenger:consume'));
$tester->execute([
'--all' => true,
'--exclude-transports' => ['dummy-receiver2', 'dummy-receiver3'],
'--limit' => 2,
]);

$tester->assertCommandIsSuccessful();
$this->assertStringContainsString('[OK] Consuming messages from transports "dummy-receiver1, dummy-receiver4"', $tester->getDisplay());
}

public function testExcludeTransportssWithoutAllOptionThrowsException()
{
$receiverLocator = new Container();
$receiverLocator->set('dummy-receiver', new \stdClass());

$command = new ConsumeMessagesCommand(new RoutableMessageBus(new Container()), $receiverLocator, new EventDispatcher());

$application = new Application();
if (method_exists($application, 'addCommand')) {
$application->addCommand($command);
} else {
$application->add($command);
}
$tester = new CommandTester($application->get('messenger:consume'));

$this->expectException(InvalidOptionException::class);
$this->expectExceptionMessage('The --exclude-transports option can only be used with the --all option.');
$tester->execute([
'receivers' => ['dummy-receiver'],
'--exclude-transports' => ['dummy-receiver'],
]);
}

public function testExcludeTransportsWithAllQueuesExcludedThrowsException()
{
$receiverLocator = new Container();
$receiverLocator->set('dummy-receiver1', new \stdClass());
$receiverLocator->set('dummy-receiver2', new \stdClass());

$command = new ConsumeMessagesCommand(
new RoutableMessageBus(new Container()),
$receiverLocator,
new EventDispatcher(),
receiverNames: ['dummy-receiver1', 'dummy-receiver2']
);

$application = new Application();
if (method_exists($application, 'addCommand')) {
$application->addCommand($command);
} else {
$application->add($command);
}
$tester = new CommandTester($application->get('messenger:consume'));

$this->expectException(\Symfony\Component\Console\Exception\RuntimeException::class);
$this->expectExceptionMessage('All transports/receivers have been excluded. Please specify at least one to consume from.');
$tester->execute([
'--all' => true,
'--exclude-transports' => ['dummy-receiver1', 'dummy-receiver2'],
]);
}
}