Enhance HttpResponse to include new method to return InputStream#48858
Enhance HttpResponse to include new method to return InputStream#48858srnagar wants to merge 2 commits intoAzure:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a new blocking bridge from reactive HTTP response bodies (Flux<ByteBuffer>) to InputStream, and exposes it through a new synchronous accessor on HttpResponse.
Changes:
- Added
FluxInputStream(InputStreambacked byFlux<ByteBuffer>) inazure-coreimplementation code. - Added
HttpResponse#getBodyAsInputStreamSync()to expose a synchronousInputStreamAPI. - Added unit tests for
FluxInputStreamand updatedazure-corechangelog.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/core/azure-core/src/main/java/com/azure/core/implementation/http/FluxInputStream.java | New InputStream implementation that subscribes to and blocks on a Flux<ByteBuffer>. |
| sdk/core/azure-core/src/test/java/com/azure/core/implementation/http/FluxInputStreamTests.java | New tests covering basic read behavior, empty buffers, and error propagation. |
| sdk/core/azure-core/src/main/java/com/azure/core/http/HttpResponse.java | Adds getBodyAsInputStreamSync() which returns a blocking InputStream over the reactive body. |
| sdk/core/azure-core/checkstyle-suppressions.xml | Adds a suppression for the new implementation class. |
| sdk/core/azure-core/CHANGELOG.md | Documents the newly added synchronous InputStream API. |
| subscription -> { | ||
| this.subscription = subscription; | ||
| this.subscribed = true; | ||
| this.subscription.request(1); | ||
| }); |
There was a problem hiding this comment.
There is a race between close() and the onSubscribe callback: if close() happens before subscription is assigned, the stream won't cancel the subscription, and onSubscribe will still call request(1) even though the stream is closed. Consider checking closed in the onSubscribe callback and canceling immediately (and/or avoiding further requests) to prevent leaking work/resources after close.
| private static final int KB = 1024; | ||
| private static final int MB = KB * KB; | ||
|
|
||
| /* Network tests to be performed by implementors of the FluxInputStream. */ | ||
| private Flux<ByteBuffer> generateData(int num) { | ||
| List<ByteBuffer> buffers = new ArrayList<>(); | ||
| for (int i = 0; i < num; i++) { | ||
| buffers.add(ByteBuffer.wrap(new byte[] { (byte) i })); | ||
| } | ||
| return Flux.fromIterable(buffers); | ||
| } | ||
|
|
||
| @ParameterizedTest | ||
| @ValueSource(ints = { 1, 10, 100, KB, MB }) | ||
| public void fluxInputStreamMin(int byteCount) throws IOException { | ||
| final int expected = byteCount; | ||
|
|
||
| try (InputStream is = new FluxInputStream(generateData(byteCount))) { | ||
| byte[] bytes = new byte[expected]; |
There was a problem hiding this comment.
This parameterized test case includes MB (1,048,576) and generateData(MB) allocates a list of 1M single-byte ByteBuffer instances. This can make the unit test suite slow and memory-intensive (and can be flaky under constrained CI). Consider testing large payloads using fewer, larger buffers (or reduce the upper bound) while still validating the same behavior.
| private static final int KB = 1024; | ||
| private static final int MB = KB * KB; | ||
|
|
||
| /* Network tests to be performed by implementors of the FluxInputStream. */ |
There was a problem hiding this comment.
The comment "Network tests to be performed by implementors of the FluxInputStream" is confusing in a unit test class (these tests don't appear to be network tests, and there are no "implementors" here). Consider rewording or removing it to better describe what the helper does (it generates deterministic test data for the stream).
| /* Network tests to be performed by implementors of the FluxInputStream. */ | |
| /* Generates deterministic test data for FluxInputStream unit tests. */ |
| public InputStream getBodyAsInputStreamSync() { | ||
| return new FluxInputStream(getBody()); | ||
| } |
There was a problem hiding this comment.
This adds a new public API surface on HttpResponse, but there are no tests validating its behavior (for example: reading fully, handling empty bodies, and ensuring the underlying response is disposed/closed when the stream completes or is closed early). There are existing tests covering the other body accessors, so it would be good to add analogous coverage for getBodyAsInputStreamSync() to prevent regressions.
| * @return The response content as an {@link InputStream}. | ||
| */ | ||
| public InputStream getBodyAsInputStreamSync() { | ||
| return new FluxInputStream(getBody()); |
There was a problem hiding this comment.
getBodyAsInputStreamSync() passes getBody() directly into FluxInputStream, which will throw if getBody() is null. Other HttpResponse helpers (for example writeBodyTo*) explicitly handle a null body, so this new method should do the same (for example, return an empty InputStream when the body publisher is null).
| return new FluxInputStream(getBody()); | |
| Flux<ByteBuffer> body = getBody(); | |
| return (body == null) ? new ByteArrayInputStream(new byte[0]) : new FluxInputStream(body); |
| /** | ||
| * Returns the response content as an {@link InputStream}. | ||
| * | ||
| * @return The response content as an {@link InputStream}. | ||
| */ | ||
| public InputStream getBodyAsInputStreamSync() { | ||
| return new FluxInputStream(getBody()); | ||
| } |
There was a problem hiding this comment.
The new synchronous InputStream API is a blocking read over the reactive body. The JavaDoc should explicitly call out the blocking behavior and the required resource lifecycle (that callers should close the returned InputStream, and whether/when the underlying HttpResponse must also be closed to release the connection).
| <suppress files="com.azure.core.http.policy.HttpLoggingPolicy.java" checks="io.clientcore.linting.extensions.checkstyle.checks.ThrowFromClientLoggerCheck" /> | ||
| <suppress files="com.azure.core.implementation.AccessTokenCache.java" checks="io.clientcore.linting.extensions.checkstyle.checks.ThrowFromClientLoggerCheck" /> | ||
| <suppress files="com.azure.core.implementation.AccessibleByteArrayOutputStream.java" checks="io.clientcore.linting.extensions.checkstyle.checks.ThrowFromClientLoggerCheck" /> | ||
| <suppress files="com.azure.core.implementation.http.FluxInputStream.java" checks="io.clientcore.linting.extensions.checkstyle.checks.ThrowFromClientLoggerCheck" /> |
There was a problem hiding this comment.
Adding a Checkstyle suppression for ThrowFromClientLoggerCheck makes the new file an exception to the repo's linting rules. Prefer updating FluxInputStream to avoid throwing exceptions returned from ClientLogger methods (for example, throw the exception directly and, if needed, log separately) so this suppression can be removed.
| <suppress files="com.azure.core.implementation.http.FluxInputStream.java" checks="io.clientcore.linting.extensions.checkstyle.checks.ThrowFromClientLoggerCheck" /> |
| private void validateParameters(byte[] bytes, int offset, int length) { | ||
| if (bytes == null) { | ||
| throw LOGGER.logExceptionAsError(new NullPointerException("'bytes' cannot be null")); | ||
| } | ||
| if (offset < 0) { | ||
| throw LOGGER.logExceptionAsError(new IndexOutOfBoundsException("'offset' cannot be less than 0")); | ||
| } | ||
| if (length < 0) { | ||
| throw LOGGER.logExceptionAsError(new IndexOutOfBoundsException("'length' cannot be less than 0")); | ||
| } | ||
| if (length > (bytes.length - offset)) { | ||
| throw LOGGER.logExceptionAsError( | ||
| new IndexOutOfBoundsException("'length' cannot be greater than 'bytes'.length - 'offset'")); | ||
| } |
There was a problem hiding this comment.
validateParameters doesn't check offset > bytes.length (or offset + length overflow). In those cases it will currently throw with a misleading message about length, and it may diverge from InputStream#read(byte[], int, int) bounds-checking behavior. Add an explicit offset > bytes.length check (and consider guarding against integer overflow when computing offset + length).
This pull request introduces a new utility class,
FluxInputStream, which allows consuming aFlux<ByteBuffer>as a blockingInputStream. This enables synchronous reading of HTTP response bodies that are provided as reactive streams. The change also adds comprehensive tests for this new class and updatesHttpResponseto expose a synchronousInputStreamAPI.