Description
Symfony version(s) affected
7.2
Description
When using a custom Redis adapter or client that automatically applies a key prefix (e.g. prefix:), streams are created under keys like prefix:. However, in Connection.php, the code that checks for and iterates over pending messages always uses the unprefixed $this->stream, so it never finds any messages.
Problematic code (around line 470 of Connection.php):
// Check for pending messages
if ($this->couldHavePendingMessages && empty($messages[$this->stream])) {
$this->couldHavePendingMessages = false;
// No pending, fetch new ones
return $this->get();
}
// Iterate over messages in the stream
foreach ($messages[$this->stream] ?? [] as $key => $message) {
…
}
Here, $this->stream is the bare stream name, but Redis actually stores messages under prefix:$this->stream, so $messages[$this->stream] is always empty and pending messages are never processed.
How to reproduce
- Configure Symfony Messenger with a Redis Stream transport using stream: 'my_stream'.
- Provide a custom \Redis instance (or wrapper) configured with a global key prefix, e.g. prefix:.
- Send messages to the transport.
- Run the worker (php bin/console messenger:consume).
Observed behavior:
The stream key in Redis is prefix:my_stream.
Connection::get() never finds any pending messages and always fetches new ones, skipping any existing entries.
Possible Solution
Use $this->redis->_prefix($this->stream) as the key when retrieving entries from $messages.
Additional Context
No response