[FLINK-39103][runtime] Tolerate errors when recycling buffer from input channel notification if it had errors already#27624
Conversation
| @@ -216,6 +221,10 @@ public void recycle(MemorySegment segment) { | |||
| if (inputChannel.isReleased()) { | |||
| globalPool.recycleUnpooledMemorySegments(Collections.singletonList(segment)); | |||
| return; | |||
| } else if (inputChannel.hasError()) { | |||
| LOG.warn("Input channel has errors - recycling the buffer"); | |||
There was a problem hiding this comment.
Can this warn result in e2e test failure as "unexpected error" or do we only filter for ERROR level there?
There was a problem hiding this comment.
IIRC the script greps errors, not warnings, so it should be fine.
Izeren
left a comment
There was a problem hiding this comment.
Thank you @rkhachatryan, I have left a couple of comments for better understanding of the problem
| @@ -346,6 +346,11 @@ public long unsynchronizedGetSizeOfQueuedBuffers() { | |||
| */ | |||
| public void notifyRequiredSegmentId(int subpartitionId, int segmentId) throws IOException {} | |||
|
|
|||
| /** Whether this input channel has encountered error. */ | |||
| public boolean hasError() { | |||
| return cause.get() != null; | |||
There was a problem hiding this comment.
If it is not too complicated would be good to write a dedicated unit test for new public method
There was a problem hiding this comment.
I agree, I just wanted to agree on the approach first.
Other approaches are passing ignoreErrors to notifyBufferAvailable or ignoring errors coming from it.
| @@ -216,6 +221,10 @@ public void recycle(MemorySegment segment) { | |||
| if (inputChannel.isReleased()) { | |||
| globalPool.recycleUnpooledMemorySegments(Collections.singletonList(segment)); | |||
| return; | |||
| } else if (inputChannel.hasError()) { | |||
| LOG.warn("Input channel has errors - recycling the buffer"); | |||
| globalPool.recycleUnpooledMemorySegments(Collections.singletonList(segment)); | |||
There was a problem hiding this comment.
For my understanding, does every segment have only 1 input channel?
Can it happen that we would call recycleUnpooledMemorySegments on the same segment twice? If yes, is this call idempotent?
There was a problem hiding this comment.
Yes, at any given time each memory segment has only one owner, and recycling it twice would cause an error.
0042631 to
7d35eec
Compare
| final boolean hadError = | ||
| checkpointedInputGate.getChannel(channelInfo.getInputChannelIdx()).hasError(); |
There was a problem hiding this comment.
In theory, recycling the buffer can also fail for other reason:
if (inputChannel.isReleased()) {
globalPool.recycleUnpooledMemorySegments(Collections.singletonList(segment));
But I think global pool failure shouldn't be ignored.
There was a problem hiding this comment.
I share the same concern you mentioned.
The current catch (Exception e) + if (hadError) would swallow all exceptions from releaseDeserializer, not just the channel error. If e.g. globalPool.recycleUnpooledMemorySegments fails while hadError=true, that failure is silently lost.
And it is dangerous if introducing new exception in the future.
|
CI failures seem unrelated |
Izeren
left a comment
There was a problem hiding this comment.
LGTM, I have added a few more comments, PTAL
...time/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
Outdated
Show resolved
Hide resolved
...time/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
Outdated
Show resolved
Hide resolved
...time/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
Outdated
Show resolved
Hide resolved
...time/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
Outdated
Show resolved
Hide resolved
1996fanrui
left a comment
There was a problem hiding this comment.
According the inline comment, I wonder if we should fix this at a different layer. Tracing the call chain:
BufferManager.recycle(segment)
→ buffer added back to queue ← already recycled successfully
→ inputChannel.notifyBufferAvailable(1) ← credit notification
→ notifyCreditAvailable()
→ checkPartitionRequestQueueInitialized()
→ checkError() → throws
The buffer is already recycled at that point. The failure comes from notifying credit to a producer whose connection is already dead — which is unnecessary.
What about skipping credit notification for errored channels instead?
// RemoteInputChannel.java
public void notifyBufferAvailable(int numAvailableBuffers) throws IOException {
if (hasError()) {
return;
}
if (numAvailableBuffers > 0 && unannouncedCredit.getAndAdd(numAvailableBuffers) == 0) {
notifyCreditAvailable();
}
}This way: (1) buffer recycling is not affected, (2) global pool errors are not affected (they happen before notifyBufferAvailable in BufferManager.recycle), and (3) all code paths that recycle buffers are covered, not just close().
| final boolean hadError = | ||
| checkpointedInputGate.getChannel(channelInfo.getInputChannelIdx()).hasError(); |
There was a problem hiding this comment.
I share the same concern you mentioned.
The current catch (Exception e) + if (hadError) would swallow all exceptions from releaseDeserializer, not just the channel error. If e.g. globalPool.recycleUnpooledMemorySegments fails while hadError=true, that failure is silently lost.
And it is dangerous if introducing new exception in the future.
Yeah, I'm not sure which level is better to fix this issue. In the original version, I fixed it at a lower level as you suggest. But there, it affects other notification paths besides of Task cleanup - which probably should not tolerate this error. In the current version, we're certain that it's Task cleanup, so the error can be ignored (if the channel errored already). However, it can be some error not related to channel. Maybe the middle ground is to catch it StreamTaskNetworkInput but check explicitly that it's a WDYT? |
Thanks for the valuable input! I did not test it, so not sure if the notification paths is suitable, or they can be changed as well.
The new proposal makes sense to me, it only suppresses the |
…in StreamTaskNetworkInput
…orresponding channel already had errors On cleanup, deserializer recycles its buffers, potentially notifying the input channel. However, if the input channel has encountered an error (such as RemoteTransportException); then notification will fail which might cause the whol cleanup to fail and lead to TM shutdown.
|
Thanks for the reviews @1996fanrui @Izeren! The previous build failed due to FLINK-38613. I'm going to proceed with merging |
The first 3 commits are from #27619 - please ignore them while reviewing.
Example failure stack trace: