Skip to content

[Messenger] Add --exclude-queues consume parameters #60979

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: 7.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ protected function configure(): void
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'),
new InputOption('exclude-receivers', 'er', InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Exclude specific receivers/transports from consumption (can only be used with --all)'),
new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL),
])
->setHelp(<<<'EOF'
Expand Down Expand Up @@ -122,6 +123,10 @@ protected function configure(): void
Use the --all option to consume from all receivers:

<info>php %command.full_name% --all</info>

Use the --exclude-receivers option to exclude specific receivers/transports from consumption (can only be used with --all):

<info>php %command.full_name% --all --exclude-receivers=queue1</info>
EOF
)
;
Expand All @@ -132,6 +137,10 @@ protected function initialize(InputInterface $input, OutputInterface $output): v
if ($input->hasParameterOption('--keepalive')) {
$this->getApplication()->setAlarmInterval((int) ($input->getOption('keepalive') ?? self::DEFAULT_KEEPALIVE_INTERVAL));
}

if ($input->getOption('exclude-receivers') && !$input->getOption('all')) {
throw new InvalidOptionException('The --exclude-receivers option can only be used with the --all option.');
}
}

protected function interact(InputInterface $input, OutputInterface $output): void
Expand Down Expand Up @@ -169,9 +178,22 @@ protected function interact(InputInterface $input, OutputInterface $output): voi

protected function execute(InputInterface $input, OutputInterface $output): int
{
if ($input->getOption('exclude-receivers') && !$input->getOption('all')) {
throw new InvalidOptionException('The --exclude-receivers option can only be used with the --all option.');
}

$receivers = [];
$rateLimiters = [];
$receiverNames = $input->getOption('all') ? $this->receiverNames : $input->getArgument('receivers');

if ($input->getOption('all') && $excludedTransports = $input->getOption('exclude-receivers')) {
$receiverNames = array_diff($receiverNames, $excludedTransports);

if (empty($receiverNames)) {
throw new RuntimeException('All transports/receivers have been excluded. Please specify at least one to consume from.');
}
}

foreach ($receiverNames as $receiverName) {
if (!$this->receiverLocator->has($receiverName)) {
$message = \sprintf('The receiver "%s" does not exist.', $receiverName);
Expand Down Expand Up @@ -276,6 +298,10 @@ public function complete(CompletionInput $input, CompletionSuggestions $suggesti
if ($input->mustSuggestOptionValuesFor('bus')) {
$suggestions->suggestValues($this->busIds);
}

if ($input->mustSuggestOptionValuesFor('exclude-receivers')) {
$suggestions->suggestValues($this->receiverNames);
}
}

public function getSubscribedSignals(): array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,4 +334,156 @@ public static function provideCompletionSuggestions()
yield 'receiver (no repeat)' => [['async', ''], ['async_high', 'failed']];
yield 'option --bus' => [['--bus', ''], ['messenger.bus.default']];
}

public function testRunWithExcludeReceiversOption()
{
$envelope1 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
$envelope2 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
$envelope3 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);

$receiver1 = $this->createMock(ReceiverInterface::class);
$receiver1->method('get')->willReturn([$envelope1]);
$receiver2 = $this->createMock(ReceiverInterface::class);
$receiver2->method('get')->willReturn([$envelope2]);
$receiver3 = $this->createMock(ReceiverInterface::class);
$receiver3->method('get')->willReturn([$envelope3]);

$receiverLocator = new Container();
$receiverLocator->set('dummy-receiver1', $receiver1);
$receiverLocator->set('dummy-receiver2', $receiver2);
$receiverLocator->set('dummy-receiver3', $receiver3);

$bus = $this->createMock(MessageBusInterface::class);
// Only 2 messages should be dispatched (receiver1 and receiver3, receiver2 is excluded)
$bus->expects($this->exactly(2))->method('dispatch');

$busLocator = new Container();
$busLocator->set('dummy-bus', $bus);

$command = new ConsumeMessagesCommand(
new RoutableMessageBus($busLocator),
$receiverLocator, new EventDispatcher(),
receiverNames: ['dummy-receiver1', 'dummy-receiver2', 'dummy-receiver3']
);

$application = new Application();
if (method_exists($application, 'addCommand')) {
$application->addCommand($command);
} else {
$application->add($command);
}
$tester = new CommandTester($application->get('messenger:consume'));
$tester->execute([
'--all' => true,
'--exclude-receivers' => ['dummy-receiver2'],
'--limit' => 2,
]);

$tester->assertCommandIsSuccessful();
$this->assertStringContainsString('[OK] Consuming messages from transports "dummy-receiver1, dummy-receiver3"', $tester->getDisplay());
}

public function testRunWithExcludeReceiversMultipleQueues()
{
$envelope1 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
$envelope2 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
$envelope3 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
$envelope4 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);

$receiver1 = $this->createMock(ReceiverInterface::class);
$receiver1->method('get')->willReturn([$envelope1]);
$receiver2 = $this->createMock(ReceiverInterface::class);
$receiver2->method('get')->willReturn([$envelope2]);
$receiver3 = $this->createMock(ReceiverInterface::class);
$receiver3->method('get')->willReturn([$envelope3]);
$receiver4 = $this->createMock(ReceiverInterface::class);
$receiver4->method('get')->willReturn([$envelope4]);

$receiverLocator = new Container();
$receiverLocator->set('dummy-receiver1', $receiver1);
$receiverLocator->set('dummy-receiver2', $receiver2);
$receiverLocator->set('dummy-receiver3', $receiver3);
$receiverLocator->set('dummy-receiver4', $receiver4);

$bus = $this->createMock(MessageBusInterface::class);
// Only 2 messages should be dispatched (receiver1 and receiver4, receiver2 and receiver3 are excluded)
$bus->expects($this->exactly(2))->method('dispatch');

$busLocator = new Container();
$busLocator->set('dummy-bus', $bus);

$command = new ConsumeMessagesCommand(
new RoutableMessageBus($busLocator),
$receiverLocator, new EventDispatcher(),
receiverNames: ['dummy-receiver1', 'dummy-receiver2', 'dummy-receiver3', 'dummy-receiver4']
);

$application = new Application();
if (method_exists($application, 'addCommand')) {
$application->addCommand($command);
} else {
$application->add($command);
}
$tester = new CommandTester($application->get('messenger:consume'));
$tester->execute([
'--all' => true,
'--exclude-receivers' => ['dummy-receiver2', 'dummy-receiver3'],
'--limit' => 2,
]);

$tester->assertCommandIsSuccessful();
$this->assertStringContainsString('[OK] Consuming messages from transports "dummy-receiver1, dummy-receiver4"', $tester->getDisplay());
}

public function testExcludeReceiverssWithoutAllOptionThrowsException()
{
$receiverLocator = new Container();
$receiverLocator->set('dummy-receiver', new \stdClass());

$command = new ConsumeMessagesCommand(new RoutableMessageBus(new Container()), $receiverLocator, new EventDispatcher());

$application = new Application();
if (method_exists($application, 'addCommand')) {
$application->addCommand($command);
} else {
$application->add($command);
}
$tester = new CommandTester($application->get('messenger:consume'));

$this->expectException(InvalidOptionException::class);
$this->expectExceptionMessage('The --exclude-receivers option can only be used with the --all option.');
$tester->execute([
'receivers' => ['dummy-receiver'],
'--exclude-receivers' => ['dummy-receiver'],
]);
}

public function testExcludeReceiversWithAllQueuesExcludedThrowsException()
{
$receiverLocator = new Container();
$receiverLocator->set('dummy-receiver1', new \stdClass());
$receiverLocator->set('dummy-receiver2', new \stdClass());

$command = new ConsumeMessagesCommand(
new RoutableMessageBus(new Container()),
$receiverLocator,
new EventDispatcher(),
receiverNames: ['dummy-receiver1', 'dummy-receiver2']
);

$application = new Application();
if (method_exists($application, 'addCommand')) {
$application->addCommand($command);
} else {
$application->add($command);
}
$tester = new CommandTester($application->get('messenger:consume'));

$this->expectException(\Symfony\Component\Console\Exception\RuntimeException::class);
$this->expectExceptionMessage('All transports/receivers have been excluded. Please specify at least one to consume from.');
$tester->execute([
'--all' => true,
'--exclude-receivers' => ['dummy-receiver1', 'dummy-receiver2'],
]);
}
}