Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,17 @@ public PollResult pollOnClose(long currentTimeMs) {
* <p>Similarly, we may have to unblock the application thread to send a {@link AsyncPollEvent} to make sure
* our poll timer will not expire while we are polling.
*
* <p>In the event that heartbeats are currently being skipped, this still returns the next heartbeat
* delay rather than {@code Long.MAX_VALUE} so that the application thread remains responsive.
* <p>In the event that heartbeats are currently being skipped (e.g., the member is in
* {@link MemberState#UNSUBSCRIBED} when using manual assignment), this returns {@code Long.MAX_VALUE}
* to indicate there is no next heartbeat to wait for, allowing the application thread to block for
* the full user-specified poll timeout rather than spinning in a busy loop.
*/
@Override
public long maximumTimeToWait(long currentTimeMs) {
pollTimer.update(currentTimeMs);
if (membershipManager().shouldSkipHeartbeat()) {
return Long.MAX_VALUE;
}
if (pollTimer.isExpired() || (membershipManager().shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight())) {
return 0L;
}
Comment on lines 245 to 261
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if we should handle the pollTimer.isExpired() case here, as shouldSkipHeartbeat() being true would cause the logic to return Long.MAX_VALUE even if the timer has expired. However, I noticed that the callers use Math.min() with timer.remainingMs(), so it won't impact production. Ive also run the unit tests locally and everything looks solid.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is shouldSkipHeartbeat() being used to determine if the user is subscribed or not? I don't think they're synonymous 🤔

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Maybe we could narrow the condition to state == MemberState.UNSUBSCRIBED, focusing strictly on the manual assignment issue we are facing.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,32 @@ public void testSkippingHeartbeat(final boolean shouldSkipHeartbeat) {
}
}

/**
* When the consumer uses manual partition assignment (assign()) instead of subscribe(), the
* member stays in UNSUBSCRIBED state indefinitely. Because heartbeats are skipped in that
* state and heartbeatIntervalMs initialises to 0, maximumTimeToWait used to return 0, causing
* a busy-loop in pollForFetches. Verify that maximumTimeToWait returns Long.MAX_VALUE whenever
* shouldSkipHeartbeat() is true so the application thread can block for the full poll timeout.
*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testMaximumTimeToWaitWhenHeartbeatShouldBeSkipped(final boolean shouldSkipHeartbeat) {
// Start with zero heartbeat interval (simulates the initial state before any HB response)
createHeartbeatRequestStateWithZeroHeartbeatInterval();
when(membershipManager.shouldSkipHeartbeat()).thenReturn(shouldSkipHeartbeat);

long result = heartbeatRequestManager.maximumTimeToWait(time.milliseconds());

if (shouldSkipHeartbeat) {
assertEquals(Long.MAX_VALUE, result,
"maximumTimeToWait should return Long.MAX_VALUE when heartbeats are skipped " +
"(e.g., UNSUBSCRIBED state with manual assignment) to prevent a busy loop");
} else {
assertEquals(0, result,
"maximumTimeToWait should return 0 when heartbeat interval timer has already expired");
}
}

@Test
public void testTimerNotDue() {
time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent
Expand Down
Loading