Skip to content

[Messenger] Batch handlers are not flushed on worker stop #49026

Closed
@AdamKatzDev

Description

@AdamKatzDev

Symfony version(s) affected

5.4, 6.3

Description

The problem is that worker can't flush batch handlers without feeding a message into them.
Not much to write here without diving deep into details, see the sections below.
This bug can cause multiple hard to track issues in other parts of the system, that depends on the way the queue is processed.

How to reproduce

The bug can be reproduced with any batch handler that does not flush itself without shouldFlush being true.

Possible Solution

TLDR: to fix write a listener that flushes batch handlers and calls Worker::ack method on every WorkerRunningEvent(not required, see the explanation below)/WorkerStoppedEvent event.
Use a hack like https://ocramius.github.io/blog/accessing-private-php-class-members-without-reflection/ to access Worker::ack.

Currently the Worker class waits for new messages to flush handlers. In general it makes sense, there is no reason to flush batches if there are no new messages since the simplest shouldFlush method that checks current batch size won't change.
But the class also doesn't flush the handlers on stop. And that is an issue.

But there is another problem with the current approach.
For example if we have a handler with the following shouldFlush method:

private function shouldFlush(): bool
{
    return $this->batchSize <= \count($this->jobs) || random_int(0, 1) /* some external condition, e.g. timer */;
}

The handler implements shouldFlush method that depends on an external condition that could trigger flush. This might be useful for jobs that are processed in real time. In the current worker implementation the batch might be hanging in memory for a long time if the rate at which the processed messages are generated is low.

To fix the bug and also to enable the previously described use case a listener can be written that flushes batch handlers and calls Worker::ack method on every WorkerRunningEvent/WorkerStoppedEvent event. Alas, this is a hack and not a proper solution that might be integrated into the Messenger component.

Additional Context

No response

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions