Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.http;

import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import org.testng.annotations.Test;

import java.io.IOException;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/**
* Verifies that {@link Http2ParentChannelExceptionHandler} uses connection
* state — active stream count and channel activity — to determine whether
* exceptions are logged at DEBUG (suppressed) or WARN (preserved).
* Exception type is NOT a filtering dimension.
*
* The EmbeddedChannel is configured to mirror the production HTTP/2 parent
* channel pipeline:
* <pre>
* Http2FrameCodec → Http2MultiplexHandler → Http2ParentChannelExceptionHandler → TailContext
* </pre>
* (SslHandler is omitted because it requires an SSLContext and is not relevant
* to exception propagation behavior.)
*
* {@code checkException()} re-throws any exception that reached the pipeline tail.
*/
public class Http2ParentChannelExceptionHandlerTest {

/**
* BEFORE fix — without the handler, exceptions reach the pipeline tail.
* EmbeddedChannel's checkException() re-throws the unhandled exception,
* proving it reached Netty's TailContext (which in production logs as WARN).
*/
@Test(groups = "unit")
public void withoutHandler_exceptionReachesTail() {
EmbeddedChannel channel = createH2ParentChannel(false);

channel.pipeline().fireExceptionCaught(
new IOException("Connection reset by peer"));

assertThatThrownBy(channel::checkException)
.isInstanceOf(IOException.class)
.hasMessageContaining("Connection reset by peer");

channel.finishAndReleaseAll();
}

/**
* With handler — exception on idle connection (0 active streams) is
* consumed at DEBUG. The suppression is based on connection state
* (no active streams), not exception type.
*
* In production, channel.isActive() transitions to false during the
* RST handling cycle, satisfying the OR condition. In EmbeddedChannel
* we can only verify the activeStreams == 0 branch.
*/
@Test(groups = "unit")
public void withHandler_zeroActiveStreams_consumedAtDebug() {
EmbeddedChannel channel = createH2ParentChannel(true);

Http2FrameCodec codec = channel.pipeline().get(Http2FrameCodec.class);
assertThat(codec).isNotNull();
assertThat(codec.connection().numActiveStreams()).isEqualTo(0);

channel.pipeline().fireExceptionCaught(
new IOException("recvAddress(..) failed with error(-104): Connection reset by peer"));

// Exception consumed — does NOT reach tail
channel.checkException();

channel.finishAndReleaseAll();
}

/**
* Handler does not close the channel — connection lifecycle is managed
* by reactor-netty's pool eviction, not by this handler.
*/
@Test(groups = "unit")
public void withHandler_exceptionDoesNotCloseChannel() {
EmbeddedChannel channel = createH2ParentChannel(true);

assertThat(channel.isActive()).isTrue();

channel.pipeline().fireExceptionCaught(
new IOException("Connection reset by peer"));

channel.checkException();
assertThat(channel.isOpen()).isTrue();

channel.finishAndReleaseAll();
}

/**
* RuntimeException on idle connection is also consumed — suppression
* is based on connection state, not exception type.
*/
@Test(groups = "unit")
public void withHandler_runtimeException_zeroActiveStreams_consumed() {
EmbeddedChannel channel = createH2ParentChannel(true);

channel.pipeline().fireExceptionCaught(
new RuntimeException("Unexpected state error"));

channel.checkException();

channel.finishAndReleaseAll();
}

/**
* NullPointerException on idle connection is also consumed — same
* connection-state-based suppression regardless of exception type.
*/
@Test(groups = "unit")
public void withHandler_npe_zeroActiveStreams_consumed() {
EmbeddedChannel channel = createH2ParentChannel(true);

channel.pipeline().fireExceptionCaught(
new NullPointerException("handler bug"));

channel.checkException();

channel.finishAndReleaseAll();
}

/**
* With handler — exception on a connection with active streams is
* consumed (does not reach TailContext). The handler logs at WARN
* instead of DEBUG because in-flight requests may be affected.
*/
@Test(groups = "unit")
public void withHandler_activeStreams_consumedAtWarn() throws Exception {
EmbeddedChannel channel = createH2ParentChannel(true);

Http2FrameCodec codec = channel.pipeline().get(Http2FrameCodec.class);
assertThat(codec).isNotNull();

// Create an active stream (client-initiated, odd stream ID)
codec.connection().local().createStream(1, false);
assertThat(codec.connection().numActiveStreams()).isEqualTo(1);
assertThat(channel.isActive()).isTrue();

channel.pipeline().fireExceptionCaught(
new IOException("Connection reset by peer"));

// Exception consumed — does NOT reach tail, even with active streams
channel.checkException();

channel.finishAndReleaseAll();
}

/**
* Handler does not close the channel even when active streams exist —
* connection lifecycle is managed by reactor-netty's pool eviction.
*/
@Test(groups = "unit")
public void withHandler_activeStreams_channelNotClosed() throws Exception {
EmbeddedChannel channel = createH2ParentChannel(true);

Http2FrameCodec codec = channel.pipeline().get(Http2FrameCodec.class);
assertThat(codec).isNotNull();

codec.connection().local().createStream(1, false);
assertThat(codec.connection().numActiveStreams()).isEqualTo(1);
assertThat(channel.isActive()).isTrue();

channel.pipeline().fireExceptionCaught(
new IOException("Connection reset by peer"));

channel.checkException();
assertThat(channel.isOpen()).isTrue();

channel.finishAndReleaseAll();
}

/**
* With handler — when Http2FrameCodec is absent from the pipeline,
* getActiveStreamCount() returns null. Since the active stream count
* is unknown and the channel is active, the handler takes the safe
* WARN path. This covers the fallback behavior when the codec is
* unavailable (e.g., torn down during shutdown).
*/
@Test(groups = "unit")
public void withHandler_codecAbsent_fallsBackToWarnPath() {
EmbeddedChannel channel = new EmbeddedChannel(
Http2ParentChannelExceptionHandler.INSTANCE);

assertThat(channel.pipeline().get(Http2FrameCodec.class)).isNull();
assertThat(channel.isActive()).isTrue();

channel.pipeline().fireExceptionCaught(
new IOException("Connection reset by peer"));

// Exception consumed — does NOT reach tail
channel.checkException();
assertThat(channel.isOpen()).isTrue();

channel.finishAndReleaseAll();
}

/**
* Error types (e.g., OutOfMemoryError) are NOT consumed by the handler —
* they propagate to TailContext. This ensures JVM-level errors are never
* silently swallowed.
*/
@Test(groups = "unit")
public void withHandler_errorNotConsumed_propagatesToTail() {
EmbeddedChannel channel = createH2ParentChannel(true);

channel.pipeline().fireExceptionCaught(
new OutOfMemoryError("test OOM"));

assertThatThrownBy(channel::checkException)
.isInstanceOf(OutOfMemoryError.class)
.hasMessageContaining("test OOM");

channel.finishAndReleaseAll();
}

/**
* Creates an EmbeddedChannel matching the production HTTP/2 parent channel
* pipeline (minus SslHandler): Http2FrameCodec → Http2MultiplexHandler →
* Http2ParentChannelExceptionHandler.
*/
private static EmbeddedChannel createH2ParentChannel(boolean withExceptionHandler) {
Http2FrameCodec codec = Http2FrameCodecBuilder.forClient()
.autoAckSettingsFrame(true)
.build();

Http2MultiplexHandler multiplexHandler = new Http2MultiplexHandler(
new ChannelInboundHandlerAdapter());

if (withExceptionHandler) {
return new EmbeddedChannel(codec, multiplexHandler,
Http2ParentChannelExceptionHandler.INSTANCE);
} else {
return new EmbeddedChannel(codec, multiplexHandler);
}
}
}
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* Fixed an issue where `CustomItemSerializer` was incorrectly applied to internal SDK query pipeline structures (e.g., `OrderByRowResult`, `Document`), causing deserialization failures in ORDER BY, GROUP BY, aggregate, DISTINCT, and hybrid search queries. - See [PR 48811](https://github.com/Azure/azure-sdk-for-java/pull/48811)
* Fixed an issue where `SqlParameter` ignored the configured `CustomItemSerializer`, always using the internal default serializer instead. - See [PR 48811](https://github.com/Azure/azure-sdk-for-java/pull/48811)
* Fixed a `ClientTelemetry` static initialization failure when IMDS access is disabled, preventing `NoClassDefFoundError` during Cosmos client creation in non-Azure environments. - See [PR 48888](https://github.com/Azure/azure-sdk-for-java/pull/48888)
* Fixed an issue where Netty could log "An exceptionCaught() event was fired, and it reached at the tail of the pipeline" on HTTP/2 connections when the server resets idle TCP connections by adding an exception handler on the HTTP/2 parent channel to handle these connection-level exceptions more appropriately. - See [PR 48890](https://github.com/Azure/azure-sdk-for-java/pull/48890)

#### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,21 @@ public CosmosClientTelemetryConfig getClientTelemetryConfig() {
return clientTelemetryConfig;
}

// Non-blocking cache for machine ID. Populated by getMachineId() on first
// successful IMDS resolution (called during client init on non-event-loop threads).
// Read by components that cannot block (e.g., Netty channel handlers).
private static volatile String cachedMachineId;

/**
* Returns the cached machine ID without blocking.
* Returns empty string if the machine ID has not yet been resolved
* (i.e., getMachineId() has not been called yet from a non-event-loop thread).
*/
public static String getCachedMachineId() {
String id = cachedMachineId;
return id != null ? id : "n/a";
}

/**
* Blocking version of machine ID lookup. Used by Spark connector (CosmosClientCache.scala).
* Delegates to getMachineId which waits up to 5s for IMDS metadata.
Expand All @@ -136,6 +151,7 @@ public static String getMachineId(DiagnosticsClientContext.DiagnosticsClientConf
AzureVMMetadata metadata = CACHED_METADATA.block(Duration.ofSeconds(5));
if (metadata != null && metadata != METADATA_NOT_AVAILABLE && metadata.getVmId() != null) {
String machineId = VM_ID_PREFIX + metadata.getVmId();
cachedMachineId = machineId;
if (diagnosticsClientConfig != null) {
diagnosticsClientConfig.withMachineId(machineId);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.http;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http2.Http2FrameCodec;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Exception handler for the HTTP/2 parent (TCP) channel pipeline.
* <p>
* In HTTP/2, reactor-netty multiplexes streams on a shared parent TCP connection.
* Child stream channels have {@code ChannelOperationsHandler} which catches exceptions
* and fails the active subscriber (matching HTTP/1.1 behavior). However, the parent
* channel has no such handler — exceptions propagate to Netty's {@code TailContext}
* which logs them as WARN ("An exceptionCaught() event was fired, and it reached at
* the tail of the pipeline").
* <p>
* This handler consumes {@link Exception}s on the parent channel and uses connection
* state to decide the log level:
* <ul>
* <li><b>DEBUG</b> — when {@code activeStreams == 0} OR {@code !channelActive}.
* No in-flight requests are affected (e.g., TCP RST from LB idle timeout,
* post-close cleanup).</li>
* <li><b>WARN</b> — when active streams exist on a live channel, or when the
* active stream count cannot be determined. The exception may affect
* in-flight requests and is worth alerting on.</li>
* </ul>
* <p>
* {@link Error} types (e.g., {@code OutOfMemoryError}) are never consumed — they
* propagate to {@code TailContext} for standard Netty handling.
* <p>
* The handler does NOT close the channel or alter connection lifecycle — reactor-netty
* and the connection pool's eviction predicate ({@code !channel.isActive()}) handle that
* independently.
*
* @see ReactorNettyClient#configureChannelPipelineHandlers()
*/
@ChannelHandler.Sharable
final class Http2ParentChannelExceptionHandler extends ChannelInboundHandlerAdapter {
Comment thread
jeet1995 marked this conversation as resolved.

static final Http2ParentChannelExceptionHandler INSTANCE = new Http2ParentChannelExceptionHandler();

static final String HANDLER_NAME = "cosmosH2ParentExceptionHandler";

private static final Logger logger = LoggerFactory.getLogger(Http2ParentChannelExceptionHandler.class);

private Http2ParentChannelExceptionHandler() {
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Do not consume JVM-level errors (OOM, StackOverflow, etc.) — let them
// propagate to TailContext for standard Netty handling.
if (cause instanceof Error) {
ctx.fireExceptionCaught(cause);
return;
}

Integer activeStreams = getActiveStreamCount(ctx);
boolean channelActive = ctx.channel().isActive();

if ((activeStreams != null && activeStreams == 0) || !channelActive) {
// No active streams OR channel already inactive — exception is noise
// (e.g., TCP RST from LB idle timeout, post-close cleanup).
if (logger.isDebugEnabled()) {
logger.debug(
"Exception on HTTP/2 parent connection"
+ " [channel=" + ctx.channel()
+ ", activeStreams=" + activeStreams
+ ", channelActive=" + channelActive
+ ", clientVmId=" + ClientTelemetry.getCachedMachineId() + "]",
cause);
}
} else {
// Active streams on a live channel, or stream count unknown (null) —
// exception may affect in-flight requests.
logger.warn(
Comment thread
jeet1995 marked this conversation as resolved.
"Exception on HTTP/2 parent connection"
+ " [channel=" + ctx.channel()
+ ", activeStreams=" + activeStreams
+ ", channelActive=" + channelActive
+ ", clientVmId=" + ClientTelemetry.getCachedMachineId() + "]",
cause);
}
}

private static Integer getActiveStreamCount(ChannelHandlerContext ctx) {
try {
Http2FrameCodec codec = ctx.pipeline().get(Http2FrameCodec.class);
if (codec != null) {
return codec.connection().numActiveStreams();
}
} catch (Exception e) {
logger.debug("Failed to retrieve active stream count from Http2FrameCodec", e);
}
return null;
}
}
Loading
Loading