Skip to content

Drain responses on completion for TransportNodesAction #130303

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from

Conversation

ywangd
Copy link
Member

@ywangd ywangd commented Jun 30, 2025

This PR ensures the node responses are copied and drained exclusively in onCompletion so that they do not get concurrently modified by cancellation.

Resolves: #128852

This PR ensures the node responses are copied and drained exclusively
in onCompletion so that they do not get concurrently modified by
cancellation.

Resolves: elastic#128852
@ywangd ywangd added >bug v9.0.0 v8.19.0 v9.1.0 :Distributed Coordination/Distributed A catch all label for anything in the Distributed Coordination area. Please avoid if you can. v9.2.0 labels Jun 30, 2025
@elasticsearchmachine elasticsearchmachine added the Team:Distributed Coordination Meta label for Distributed Coordination team label Jun 30, 2025
@elasticsearchmachine
Copy link
Collaborator

Hi @ywangd, I've created a changelog YAML for you.

@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination)

) {
final var waited = new AtomicBoolean();
for (var response : testNodeResponses) {
if (waited.compareAndSet(false, true)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of a convoluted way to wait on a nonempty list. There's no concurrency here so the compareAndSet is a bit of a sledgehammer. Can we just check testNodeResponses.isEmpty()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to wait for only the first response. You are right there is no need for AtomicBoolean. I changed it to a primitive boolean variable.

Comment on lines 338 to 345
boolean waited = false;
for (var response : testNodeResponses) {
if (waited == false) {
waited = true;
safeAwait(barrier);
safeAwait(barrier);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not just do this?

Suggested change
boolean waited = false;
for (var response : testNodeResponses) {
if (waited == false) {
waited = true;
safeAwait(barrier);
safeAwait(barrier);
}
}
if (testNodeResponses.isEmpty() == false) {
safeAwait(barrier);
safeAwait(barrier);
}

Indeed can we not assert that testNodeResponses is nonempty in this test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The for-loop is to reproduce the ConcurrentModificationException reported in #128852. The test always passes without it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, could you add a comment to that effect or else this'll get "tidied up"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment added in fdf0b22

assert task instanceof CancellableTask : "expect CancellableTask, but got: " + task;
final var cancellableTask = (CancellableTask) task;
assert cancellableTask.isCancelled();
throw new TaskCancelledException("task cancelled [" + cancellableTask.getReasonCancelled() + "]");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getReasonCancelled is racy according to its Javadocs: "May also be null if the task was just cancelled since we don't set the reason and the cancellation flag atomically." You need to use notifyIfCancelled to get the right behaviour here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Pushed 3d07261. Please let me know if it has used the right listener.

Comment on lines 176 to 180
logger.debug("task cancelled after all responses were collected");
assert task instanceof CancellableTask : "expect CancellableTask, but got: " + task;
final var cancellableTask = (CancellableTask) task;
assert cancellableTask.isCancelled();
throw new TaskCancelledException("task cancelled [" + cancellableTask.getReasonCancelled() + "]");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is to address the edge case commented here. But I struggle to write a test for it. Essentially we need the cancel to comes in after all node responses are collected but before the AtomicBoolean responsesHandled is checked. One option is to extract the creation of CancellableFanOut into its own protected method plus wrapping the returned value with a delgating CancellableFanOut. But this requires making the 4 protected methods in CancellableFanOut package private. I am a bit suspicous on whether this is the right path to go down. I am open to suggestions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be content with a test which concurrently completes the action and cancels it, and asserts that we always either get an exception or we get a successful response. I expect such a test would find the bug here pretty reliably.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool I added such a test, see fb71e89

assert task instanceof CancellableTask : "expect CancellableTask, but got: " + task;
final var cancellableTask = (CancellableTask) task;
assert cancellableTask.isCancelled();
cancellableTask.notifyIfCancelled(finalListener);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should complete l here, not the finalListener.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right. I was in a rush to finish up and missed the obvious 🤦 pushed db76ef8


try {
final var testNodesResponse = future.actionGet(SAFE_AWAIT_TIMEOUT);
assertFalse(cancellableTask.isCancelled());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this'll hold in general, we could cancel the task after the completion has already passed the point of no return and then the task's cancellation flag will be set even though it completed successfully.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point, Thanks. I removed that in b38783d which also contains a few other tweaks.

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>bug :Distributed Coordination/Distributed A catch all label for anything in the Distributed Coordination area. Please avoid if you can. Team:Distributed Coordination Meta label for Distributed Coordination team v8.19.0 v9.0.4 v9.1.0 v9.2.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ConcurrentModificationException in TransportClusterStatsAction
3 participants