-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Drain responses on completion for TransportNodesAction #130303
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This PR ensures the node responses are copied and drained exclusively in onCompletion so that they do not get concurrently modified by cancellation. Resolves: elastic#128852
Hi @ywangd, I've created a changelog YAML for you. |
Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination) |
server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java
Outdated
Show resolved
Hide resolved
) { | ||
final var waited = new AtomicBoolean(); | ||
for (var response : testNodeResponses) { | ||
if (waited.compareAndSet(false, true)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kind of a convoluted way to wait on a nonempty list. There's no concurrency here so the compareAndSet
is a bit of a sledgehammer. Can we just check testNodeResponses.isEmpty()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to wait for only the first response. You are right there is no need for AtomicBoolean. I changed it to a primitive boolean variable.
boolean waited = false; | ||
for (var response : testNodeResponses) { | ||
if (waited == false) { | ||
waited = true; | ||
safeAwait(barrier); | ||
safeAwait(barrier); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we not just do this?
boolean waited = false; | |
for (var response : testNodeResponses) { | |
if (waited == false) { | |
waited = true; | |
safeAwait(barrier); | |
safeAwait(barrier); | |
} | |
} | |
if (testNodeResponses.isEmpty() == false) { | |
safeAwait(barrier); | |
safeAwait(barrier); | |
} |
Indeed can we not assert that testNodeResponses
is nonempty in this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The for-loop is to reproduce the ConcurrentModificationException
reported in #128852. The test always passes without it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, could you add a comment to that effect or else this'll get "tidied up"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment added in fdf0b22
assert task instanceof CancellableTask : "expect CancellableTask, but got: " + task; | ||
final var cancellableTask = (CancellableTask) task; | ||
assert cancellableTask.isCancelled(); | ||
throw new TaskCancelledException("task cancelled [" + cancellableTask.getReasonCancelled() + "]"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getReasonCancelled
is racy according to its Javadocs: "May also be null if the task was just cancelled since we don't set the reason and the cancellation flag atomically." You need to use notifyIfCancelled
to get the right behaviour here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Pushed 3d07261. Please let me know if it has used the right listener.
logger.debug("task cancelled after all responses were collected"); | ||
assert task instanceof CancellableTask : "expect CancellableTask, but got: " + task; | ||
final var cancellableTask = (CancellableTask) task; | ||
assert cancellableTask.isCancelled(); | ||
throw new TaskCancelledException("task cancelled [" + cancellableTask.getReasonCancelled() + "]"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is to address the edge case commented here. But I struggle to write a test for it. Essentially we need the cancel to comes in after all node responses are collected but before the AtomicBoolean responsesHandled
is checked. One option is to extract the creation of CancellableFanOut
into its own protected method plus wrapping the returned value with a delgating CancellableFanOut
. But this requires making the 4 protected methods in CancellableFanOut
package private. I am a bit suspicous on whether this is the right path to go down. I am open to suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be content with a test which concurrently completes the action and cancels it, and asserts that we always either get an exception or we get a successful response. I expect such a test would find the bug here pretty reliably.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool I added such a test, see fb71e89
assert task instanceof CancellableTask : "expect CancellableTask, but got: " + task; | ||
final var cancellableTask = (CancellableTask) task; | ||
assert cancellableTask.isCancelled(); | ||
cancellableTask.notifyIfCancelled(finalListener); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should complete l
here, not the finalListener
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah right. I was in a rush to finish up and missed the obvious 🤦 pushed db76ef8
|
||
try { | ||
final var testNodesResponse = future.actionGet(SAFE_AWAIT_TIMEOUT); | ||
assertFalse(cancellableTask.isCancelled()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this'll hold in general, we could cancel the task after the completion has already passed the point of no return and then the task's cancellation flag will be set even though it completed successfully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah good point, Thanks. I removed that in b38783d which also contains a few other tweaks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This PR ensures the node responses are copied and drained exclusively in onCompletion so that they do not get concurrently modified by cancellation.
Resolves: #128852