diff --git a/sdk/cosmos/azure-cosmos-test/CHANGELOG.md b/sdk/cosmos/azure-cosmos-test/CHANGELOG.md index 0fd61e740521..5f3553330aa7 100644 --- a/sdk/cosmos/azure-cosmos-test/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-test/CHANGELOG.md @@ -7,6 +7,7 @@ #### Breaking Changes #### Bugs Fixed +* Fixed Netty ByteBuf leak in `GatewayServerErrorInjector` fault injection delay paths under HTTP/2. - See [PR 48880](https://github.com/Azure/azure-sdk-for-java/pull/48880) #### Other Changes diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/GatewayServerErrorInjector.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/GatewayServerErrorInjector.java index 90b9c96bda78..c506897e5e4c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/GatewayServerErrorInjector.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/GatewayServerErrorInjector.java @@ -21,6 +21,7 @@ import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper; import io.netty.channel.ConnectTimeoutException; import io.netty.handler.timeout.ReadTimeoutException; +import io.netty.util.ReferenceCountUtil; import reactor.core.publisher.Mono; import java.net.URI; @@ -174,11 +175,18 @@ public Mono injectGatewayErrors( if (this.injectGatewayServerResponseDelayAfterProcessing(faultInjectionRequestArgs, delayToBeInjected)) { if (delayToBeInjected.v.toMillis() >= effectiveResponseTimeout.toMillis()) { return originalResponseMono + .doOnNext(GatewayServerErrorInjector::releaseResponseBody) .delayElement(delayToBeInjected.v) .then(Mono.error(new ReadTimeoutException())); } else { + // In the happy path the response flows downstream after the delay + // and the caller drains the body normally. However, if the downstream + // cancels during delayElement (e.g., an outer timeout fires), + // delayElement silently drops the buffered HttpResponse. doOnDiscard + // catches that discard and releases the body ByteBuf to prevent leaks. return originalResponseMono - .delayElement(delayToBeInjected.v); + .delayElement(delayToBeInjected.v) + .doOnDiscard(HttpResponse.class, GatewayServerErrorInjector::releaseResponseBody); } } @@ -243,4 +251,17 @@ private GatewayFaultInjectionRequestArgs createFaultInjectionRequestArgs( serviceRequest, partitionKeyRangeIds); } + + /** + * Drains and releases the response body to prevent Netty ByteBuf leaks. + * Called when a fault-injected path discards the real HTTP response (e.g., + * simulating a timeout after the response has already arrived). + */ + private static void releaseResponseBody(HttpResponse httpResponse) { + httpResponse.body() + .subscribe( + buf -> ReferenceCountUtil.safeRelease(buf), + ex -> { }, + () -> { }); + } }