diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerFetchTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerFetchTest.java index e65dd0b7ed98f..34d43384e5711 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerFetchTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerFetchTest.java @@ -41,6 +41,7 @@ import static org.apache.kafka.clients.ClientsTestUtils.awaitAssignment; import static org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords; import static org.apache.kafka.clients.ClientsTestUtils.consumeRecords; +import static org.apache.kafka.clients.ClientsTestUtils.pollUntilTrue; import static org.apache.kafka.clients.ClientsTestUtils.sendRecords; import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.FETCH_MAX_BYTES_CONFIG; @@ -55,7 +56,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; @ClusterTestDefaults( types = {Type.KRAFT}, @@ -174,14 +174,15 @@ private void testFetchOutOfRangeOffsetResetConfigLatest(GroupProtocol groupProto sendRecords(producer, tp, totalRecords, startingTimestamp); consumer.assign(List.of(tp)); consumer.seek(tp, 0); - + // consume some, but not all the records consumeAndVerifyRecords(consumer, tp, totalRecords / 2, 0); // seek to out of range position var outOfRangePos = totalRecords + 17; // arbitrary, much higher offset consumer.seek(tp, outOfRangePos); // assert that poll resets to the ending position - assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty()); + pollUntilTrue(consumer, () -> consumer.position(tp) == totalRecords, + "Consumer position should advance to the latest end offset " + totalRecords); sendRecords(producer, tp, totalRecords, totalRecords); var nextRecord = consumer.poll(Duration.ofMillis(50)).iterator().next(); // ensure the seek went to the last known record at the time of the previous poll diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java index 4f47c8591145b..8392e5032f664 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java @@ -245,12 +245,17 @@ public PollResult pollOnClose(long currentTimeMs) { *
Similarly, we may have to unblock the application thread to send a {@link AsyncPollEvent} to make sure * our poll timer will not expire while we are polling. * - *
In the event that heartbeats are currently being skipped, this still returns the next heartbeat - * delay rather than {@code Long.MAX_VALUE} so that the application thread remains responsive. + *
When the member is {@link MemberState#UNSUBSCRIBED} (for example, with manual assignment), + * this returns {@code Long.MAX_VALUE} to indicate there is no next heartbeat to wait for, + * allowing the application thread to block for the full user-specified poll timeout rather than + * spinning in a busy loop. */ @Override public long maximumTimeToWait(long currentTimeMs) { pollTimer.update(currentTimeMs); + if (membershipManager().state() == MemberState.UNSUBSCRIBED) { + return Long.MAX_VALUE; + } if (pollTimer.isExpired() || (membershipManager().shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight())) { return 0L; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java index 3ef1d712c9667..723fb9b778ce4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java @@ -314,6 +314,32 @@ public void testSkippingHeartbeat(final boolean shouldSkipHeartbeat) { } } + /** + * When the consumer uses manual partition assignment (assign()) instead of subscribe(), the + * member stays in UNSUBSCRIBED state indefinitely. Because heartbeats are skipped in that + * state and heartbeatIntervalMs initialises to 0, maximumTimeToWait used to return 0, causing + * a busy-loop in pollForFetches. Verify that maximumTimeToWait returns Long.MAX_VALUE whenever + * shouldSkipHeartbeat() is true so the application thread can block for the full poll timeout. + */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testMaximumTimeToWaitWhenHeartbeatShouldBeSkipped(final boolean shouldSkipHeartbeat) { + // Start with zero heartbeat interval (simulates the initial state before any HB response) + createHeartbeatRequestStateWithZeroHeartbeatInterval(); + when(membershipManager.shouldSkipHeartbeat()).thenReturn(shouldSkipHeartbeat); + + long result = heartbeatRequestManager.maximumTimeToWait(time.milliseconds()); + + if (shouldSkipHeartbeat) { + assertEquals(Long.MAX_VALUE, result, + "maximumTimeToWait should return Long.MAX_VALUE when heartbeats are skipped " + + "(e.g., UNSUBSCRIBED state with manual assignment) to prevent a busy loop"); + } else { + assertEquals(0, result, + "maximumTimeToWait should return 0 when heartbeat interval timer has already expired"); + } + } + @Test public void testTimerNotDue() { time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index d5e9014905222..7288a9465ae10 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -1669,7 +1669,11 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { addAndVerifyAcls(acls, resource) waitUntilTrue(() => { - consumer.poll(Duration.ofMillis(50L)) + try { + consumer.poll(Duration.ofMillis(50L)) + } catch { + case _: TopicAuthorizationException => + } brokers.forall { broker => OptionConverters.toScala(broker.metadataCache.getLeaderAndIsr(newTopic, 0)) match { case Some(partitionState) => FetchRequest.isValidBrokerId(partitionState.leader)