From d20c9f8b65ac56e5bc52fc081c46a8d78c035a69 Mon Sep 17 00:00:00 2001 From: Srikanta Nagaraja Date: Fri, 17 Apr 2026 14:15:41 -0700 Subject: [PATCH 1/4] Enhance HttpResponse to include new method to return InputStream --- .../azure-core/checkstyle-suppressions.xml | 1 + .../com/azure/core/http/HttpResponse.java | 14 +- .../implementation/http/FluxInputStream.java | 272 ++++++++++++++++++ .../http/FluxInputStreamTests.java | 148 ++++++++++ 4 files changed, 433 insertions(+), 2 deletions(-) create mode 100644 sdk/core/azure-core/src/main/java/com/azure/core/implementation/http/FluxInputStream.java create mode 100644 sdk/core/azure-core/src/test/java/com/azure/core/implementation/http/FluxInputStreamTests.java diff --git a/sdk/core/azure-core/checkstyle-suppressions.xml b/sdk/core/azure-core/checkstyle-suppressions.xml index dad9b8f9c6e2..94ebf16ddbce 100644 --- a/sdk/core/azure-core/checkstyle-suppressions.xml +++ b/sdk/core/azure-core/checkstyle-suppressions.xml @@ -19,6 +19,7 @@ + diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/http/HttpResponse.java b/sdk/core/azure-core/src/main/java/com/azure/core/http/HttpResponse.java index 3e38845f7a45..3a8ca68c0b01 100644 --- a/sdk/core/azure-core/src/main/java/com/azure/core/http/HttpResponse.java +++ b/sdk/core/azure-core/src/main/java/com/azure/core/http/HttpResponse.java @@ -4,6 +4,7 @@ package com.azure.core.http; import com.azure.core.implementation.http.BufferedHttpResponse; +import com.azure.core.implementation.http.FluxInputStream; import com.azure.core.implementation.util.BinaryDataHelper; import com.azure.core.implementation.util.FluxByteBufferContent; import com.azure.core.util.BinaryData; @@ -146,14 +147,23 @@ public BinaryData getBodyAsBinaryData() { public abstract Mono getBodyAsString(Charset charset); /** - * Gets the response content as an {@link InputStream}. + * Gets the response content as an {@link InputStream} wrapped in a {@link Mono}. * - * @return The response content as an {@link InputStream}. + * @return The response content as an {@link InputStream} wrapped in a {@link Mono}. */ public Mono getBodyAsInputStream() { return getBodyAsByteArray().map(ByteArrayInputStream::new); } + /** + * Returns the response content as an {@link InputStream}. + * + * @return The response content as an {@link InputStream}. + */ + public InputStream getBodyAsInputStreamSync() { + return new FluxInputStream(getBody()); + } + /** * Gets the {@link HttpRequest request} which resulted in this response. * diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/http/FluxInputStream.java b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/http/FluxInputStream.java new file mode 100644 index 000000000000..dde40ed42b4d --- /dev/null +++ b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/http/FluxInputStream.java @@ -0,0 +1,272 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.implementation.http; + +import com.azure.core.util.FluxUtil; +import com.azure.core.util.logging.ClientLogger; +import org.reactivestreams.Subscription; +import reactor.core.publisher.Flux; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.Buffer; +import java.nio.ByteBuffer; +import java.util.Objects; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * An InputStream that subscribes to a Flux. + */ +public class FluxInputStream extends InputStream { + + private static final ClientLogger LOGGER = new ClientLogger(FluxInputStream.class); + + // The data to subscribe to. + private final Flux data; + + // Subscription to request more data from as needed + private Subscription subscription; + + private ByteArrayInputStream buffer; + + private volatile boolean subscribed; + private volatile boolean fluxComplete; + private volatile boolean fluxErrored; + private volatile boolean waitingForData; + private volatile boolean closed; + + /* The following lock and condition variable is to synchronize access between the reader and the + reactor thread asynchronously reading data from the Flux. If no data is available, the reader + acquires the lock and waits on the dataAvailable condition variable. Once data is available + (or an error or completion event occurs) the reactor thread acquires the lock and signals that + data is available. */ + private final Lock lock; + private final Condition dataAvailable; + + private IOException lastError; + + /** + * Creates a new FluxInputStream + * + * @param data The data to subscribe to and read from. + */ + public FluxInputStream(Flux data) { + this.data = Objects.requireNonNull(data, "'data' cannot be null."); + this.subscribed = false; + this.fluxComplete = false; + this.waitingForData = false; + this.closed = false; + this.lock = new ReentrantLock(); + this.dataAvailable = lock.newCondition(); + } + + @Override + public int read() throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } + byte[] ret = new byte[1]; + int count = read(ret, 0, 1); + return count == -1 ? -1 : (ret[0] & 0xFF); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } + + validateParameters(b, off, len); + + /* If len is 0, then no bytes are read and 0 is returned. */ + if (len == 0) { + return 0; + } + /* Attempt to read at least one byte. If no byte is available because the stream is at end of file, + the value -1 is returned; otherwise, at least one byte is read and stored into b. */ + + /* Not subscribed? subscribe and block for data */ + if (!subscribed) { + blockForData(); + } + /* Now, we have subscribed. */ + /* At this point, buffer should not be null. If it is, that indicates either an error or completion event + was emitted by the Flux. */ + if (this.buffer == null) { // Only executed on first subscription. + if (this.lastError != null) { + throw LOGGER.logThrowableAsError(this.lastError); + } + if (this.fluxComplete) { + return -1; + } + throw LOGGER.logExceptionAsError(new IllegalStateException("An unexpected error occurred. No data was " + + "read from the stream but the stream did not indicate completion.")); + } + + /* Now we are guaranteed that buffer is SOMETHING. */ + /* No data is available in the buffer. */ + if (this.buffer.available() == 0) { + /* If an error was signalled by the flux, throw it now that the buffer is drained. */ + if (this.fluxErrored && this.lastError != null) { + throw LOGGER.logThrowableAsError(this.lastError); + } + /* If the flux completed normally, there is no more data available to be read from the stream. */ + if (this.fluxComplete) { + return -1; + } + /* Block current thread until data is available. */ + blockForData(); + } + + /* Data available in buffer, read the buffer. */ + if (this.buffer.available() > 0) { + return this.buffer.read(b, off, len); + } + + /* If an error was signalled by the flux, throw it. */ + if (this.fluxErrored && this.lastError != null) { + throw LOGGER.logThrowableAsError(this.lastError); + } + + /* If the flux completed normally, there is no more data available to be read from the stream. Return -1. */ + if (this.fluxComplete) { + return -1; + } else { + throw LOGGER.logExceptionAsError(new IllegalStateException("An unexpected error occurred. No data was " + + "read from the stream but the stream did not indicate completion.")); + } + } + + @Override + public void close() throws IOException { + closed = true; + if (subscription != null) { + subscription.cancel(); + } + + // Unblock any thread waiting in blockForData(). + lock.lock(); + try { + waitingForData = false; + dataAvailable.signal(); + } finally { + lock.unlock(); + } + + if (this.buffer != null) { + this.buffer.close(); + } + super.close(); + } + + /** + * Request more data and wait on data to become available. + */ + private void blockForData() throws IOException { + lock.lock(); + try { + waitingForData = true; + if (!subscribed) { + subscribeToData(); + } else { + subscription.request(1); + } + // Block current thread until data is available. + while (waitingForData) { + if (fluxComplete || fluxErrored || closed) { + break; + } else { + try { + dataAvailable.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + } + } + } finally { + lock.unlock(); + } + } + + /** + * Subscribes to the data with a special subscriber. + */ + @SuppressWarnings("deprecation") + private void subscribeToData() { + this.data.filter(Buffer::hasRemaining) /* Filter to make sure only non empty byte buffers are emitted. */ + .subscribe( + // ByteBuffer consumer + byteBuffer -> { + this.buffer = new ByteArrayInputStream(FluxUtil.byteBufferToArray(byteBuffer)); + lock.lock(); + try { + this.waitingForData = false; + // Signal the consumer when data is available. + dataAvailable.signal(); + } finally { + lock.unlock(); + } + }, + // Error consumer + throwable -> { + if (throwable instanceof IOException) { + this.lastError = (IOException) throwable; + } else { + this.lastError = new IOException(throwable); + } + this.fluxErrored = true; + signalOnCompleteOrError(); + }, + // Complete consumer + // Signal the consumer in case we completed without data. + this::signalOnCompleteOrError, + // Subscription consumer + subscription -> { + this.subscription = subscription; + this.subscribed = true; + this.subscription.request(1); + }); + } + + /** + * Signals to the subscriber when the flux completes without data (onCompletion or onError) + */ + private void signalOnCompleteOrError() { + this.fluxComplete = true; + lock.lock(); + try { + this.waitingForData = false; + dataAvailable.signal(); + } finally { + lock.unlock(); + } + } + + /** + * Validates parameters according to {@link InputStream#read(byte[], int, int)} spec. + * + * @param bytes the buffer into which the data is read. + * @param offset the start offset in array bytes at which the data is written. + * @param length the maximum number of bytes to read. + */ + private void validateParameters(byte[] bytes, int offset, int length) { + if (bytes == null) { + throw LOGGER.logExceptionAsError(new NullPointerException("'bytes' cannot be null")); + } + if (offset < 0) { + throw LOGGER.logExceptionAsError(new IndexOutOfBoundsException("'offset' cannot be less than 0")); + } + if (length < 0) { + throw LOGGER.logExceptionAsError(new IndexOutOfBoundsException("'length' cannot be less than 0")); + } + if (length > (bytes.length - offset)) { + throw LOGGER.logExceptionAsError( + new IndexOutOfBoundsException("'length' cannot be greater than 'bytes'.length - 'offset'")); + } + } +} diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/http/FluxInputStreamTests.java b/sdk/core/azure-core/src/test/java/com/azure/core/implementation/http/FluxInputStreamTests.java new file mode 100644 index 000000000000..641080db09bf --- /dev/null +++ b/sdk/core/azure-core/src/test/java/com/azure/core/implementation/http/FluxInputStreamTests.java @@ -0,0 +1,148 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.core.implementation.http; + +import com.azure.core.exception.HttpResponseException; +import com.azure.core.http.HttpHeaders; +import com.azure.core.http.HttpResponse; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class FluxInputStreamTests { + private static final int KB = 1024; + private static final int MB = KB * KB; + + /* Network tests to be performed by implementors of the FluxInputStream. */ + private Flux generateData(int num) { + List buffers = new ArrayList<>(); + for (int i = 0; i < num; i++) { + buffers.add(ByteBuffer.wrap(new byte[] { (byte) i })); + } + return Flux.fromIterable(buffers); + } + + @ParameterizedTest + @ValueSource(ints = { 1, 10, 100, KB, MB }) + public void fluxInputStreamMin(int byteCount) throws IOException { + final int expected = byteCount; + + try (InputStream is = new FluxInputStream(generateData(byteCount))) { + byte[] bytes = new byte[expected]; + int totalRead = 0; + int bytesRead = 0; + int remaining = expected; + + while (bytesRead != -1 && totalRead < expected) { + bytesRead = is.read(bytes, totalRead, remaining); + if (bytesRead != -1) { + totalRead += bytesRead; + remaining -= bytesRead; + } + } + + assertEquals(expected, totalRead); + for (int i = 0; i < expected; i++) { + assertEquals((byte) i, bytes[i]); + } + } + } + + @Test + public void fluxInputStreamWithEmptyByteBuffers() throws IOException { + final int expected = KB; + List buffers = new ArrayList<>(expected * 2); + for (int i = 0; i < expected; i++) { + buffers.add(ByteBuffer.wrap(new byte[] { (byte) i })); + buffers.add(ByteBuffer.wrap(new byte[0])); + } + + try (InputStream is = new FluxInputStream(Flux.fromIterable(buffers))) { + byte[] bytes = new byte[expected]; + int totalRead = 0; + int bytesRead = 0; + int remaining = expected; + + while (bytesRead != -1 && totalRead < expected) { + bytesRead = is.read(bytes, totalRead, remaining); + if (bytesRead != -1) { + totalRead += bytesRead; + remaining -= bytesRead; + } + } + + assertEquals(expected, totalRead); + for (int i = 0; i < expected; i++) { + assertEquals((byte) i, bytes[i]); + } + } + } + + @ParameterizedTest + @MethodSource("fluxInputStreamErrorSupplier") + public void fluxInputStreamError(RuntimeException exception) { + assertThrows(IOException.class, () -> { + try (InputStream is = new FluxInputStream(Flux.error(exception))) { + is.read(); + } + }); + } + + @SuppressWarnings("deprecation") + private static Stream fluxInputStreamErrorSupplier() { + HttpResponse httpResponse = new HttpResponse(null) { + @Override + public int getStatusCode() { + return 404; + } + + @Override + public String getHeaderValue(String name) { + return ""; + } + + @Override + public HttpHeaders getHeaders() { + return null; + } + + @Override + public Flux getBody() { + return null; + } + + @Override + public Mono getBodyAsByteArray() { + return null; + } + + @Override + public Mono getBodyAsString() { + return null; + } + + @Override + public Mono getBodyAsString(Charset charset) { + return null; + } + }; + return Stream.of(new IllegalArgumentException("Mock illegal argument exception."), + new HttpResponseException("Mock exception", httpResponse, null), + new UncheckedIOException(new IOException("Mock IO Exception."))); + } +} From eb358df841c7d67f0c3df892169cdde0d5d90c2c Mon Sep 17 00:00:00 2001 From: Srikanta Nagaraja Date: Fri, 17 Apr 2026 14:19:39 -0700 Subject: [PATCH 2/4] changelog --- sdk/core/azure-core/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/core/azure-core/CHANGELOG.md b/sdk/core/azure-core/CHANGELOG.md index 8108630355d4..113d368315b5 100644 --- a/sdk/core/azure-core/CHANGELOG.md +++ b/sdk/core/azure-core/CHANGELOG.md @@ -4,6 +4,8 @@ ### Features Added +- Added `getBodyAsInputStreamSync()` method to `HttpResponse` that returns the response content as an `InputStream` synchronously. ([#48858](https://github.com/Azure/azure-sdk-for-java/pull/48858)) + ### Breaking Changes ### Bugs Fixed From 06ddbb3ae8e918511ef02f6730a52c56d970344b Mon Sep 17 00:00:00 2001 From: Srikanta Nagaraja Date: Tue, 21 Apr 2026 13:04:13 -0700 Subject: [PATCH 3/4] update unit tests and change package for flux inputstream --- sdk/core/azure-core/checkstyle-suppressions.xml | 2 +- .../main/java/com/azure/core/http/HttpResponse.java | 2 +- .../implementation/{http => }/FluxInputStream.java | 10 +++++++++- .../{http => }/FluxInputStreamTests.java | 4 ++-- .../azure/core/validation/http/HttpClientTests.java | 7 +++++++ 5 files changed, 20 insertions(+), 5 deletions(-) rename sdk/core/azure-core/src/main/java/com/azure/core/implementation/{http => }/FluxInputStream.java (96%) rename sdk/core/azure-core/src/test/java/com/azure/core/implementation/{http => }/FluxInputStreamTests.java (97%) diff --git a/sdk/core/azure-core/checkstyle-suppressions.xml b/sdk/core/azure-core/checkstyle-suppressions.xml index 94ebf16ddbce..dfe7905def71 100644 --- a/sdk/core/azure-core/checkstyle-suppressions.xml +++ b/sdk/core/azure-core/checkstyle-suppressions.xml @@ -19,7 +19,7 @@ - + diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/http/HttpResponse.java b/sdk/core/azure-core/src/main/java/com/azure/core/http/HttpResponse.java index 3a8ca68c0b01..87f0a57e57bd 100644 --- a/sdk/core/azure-core/src/main/java/com/azure/core/http/HttpResponse.java +++ b/sdk/core/azure-core/src/main/java/com/azure/core/http/HttpResponse.java @@ -4,7 +4,7 @@ package com.azure.core.http; import com.azure.core.implementation.http.BufferedHttpResponse; -import com.azure.core.implementation.http.FluxInputStream; +import com.azure.core.implementation.FluxInputStream; import com.azure.core.implementation.util.BinaryDataHelper; import com.azure.core.implementation.util.FluxByteBufferContent; import com.azure.core.util.BinaryData; diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/http/FluxInputStream.java b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/FluxInputStream.java similarity index 96% rename from sdk/core/azure-core/src/main/java/com/azure/core/implementation/http/FluxInputStream.java rename to sdk/core/azure-core/src/main/java/com/azure/core/implementation/FluxInputStream.java index dde40ed42b4d..b26d8192eea9 100644 --- a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/http/FluxInputStream.java +++ b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/FluxInputStream.java @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -package com.azure.core.implementation.http; +package com.azure.core.implementation; import com.azure.core.util.FluxUtil; import com.azure.core.util.logging.ClientLogger; @@ -227,6 +227,10 @@ private void subscribeToData() { this::signalOnCompleteOrError, // Subscription consumer subscription -> { + if (this.closed) { + subscription.cancel(); + return; + } this.subscription = subscription; this.subscribed = true; this.subscription.request(1); @@ -264,6 +268,10 @@ private void validateParameters(byte[] bytes, int offset, int length) { if (length < 0) { throw LOGGER.logExceptionAsError(new IndexOutOfBoundsException("'length' cannot be less than 0")); } + if (offset > bytes.length) { + throw LOGGER.logExceptionAsError( + new IndexOutOfBoundsException("'offset' cannot be greater than 'bytes'.length")); + } if (length > (bytes.length - offset)) { throw LOGGER.logExceptionAsError( new IndexOutOfBoundsException("'length' cannot be greater than 'bytes'.length - 'offset'")); diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/http/FluxInputStreamTests.java b/sdk/core/azure-core/src/test/java/com/azure/core/implementation/FluxInputStreamTests.java similarity index 97% rename from sdk/core/azure-core/src/test/java/com/azure/core/implementation/http/FluxInputStreamTests.java rename to sdk/core/azure-core/src/test/java/com/azure/core/implementation/FluxInputStreamTests.java index 641080db09bf..9408b4b83217 100644 --- a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/http/FluxInputStreamTests.java +++ b/sdk/core/azure-core/src/test/java/com/azure/core/implementation/FluxInputStreamTests.java @@ -1,6 +1,6 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -package com.azure.core.implementation.http; +package com.azure.core.implementation; import com.azure.core.exception.HttpResponseException; import com.azure.core.http.HttpHeaders; @@ -28,7 +28,7 @@ public class FluxInputStreamTests { private static final int KB = 1024; private static final int MB = KB * KB; - /* Network tests to be performed by implementors of the FluxInputStream. */ + /* Generates deterministic test data for FluxInputStream unit tests. */ private Flux generateData(int num) { List buffers = new ArrayList<>(); for (int i = 0; i < num; i++) { diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/validation/http/HttpClientTests.java b/sdk/core/azure-core/src/test/java/com/azure/core/validation/http/HttpClientTests.java index afd669fe45b5..63c8e1941d6d 100644 --- a/sdk/core/azure-core/src/test/java/com/azure/core/validation/http/HttpClientTests.java +++ b/sdk/core/azure-core/src/test/java/com/azure/core/validation/http/HttpClientTests.java @@ -361,6 +361,8 @@ public void canAccessResponseBody() throws IOException { assertArraysEqual(requestBody.toBytes(), responseSupplier.get().getBodyAsBinaryData().toBytes()); assertArraysEqual(requestBody.toBytes(), responseSupplier.get().getBodyAsInputStream().map(s -> BinaryData.fromStream(s).toBytes()).block()); + assertArraysEqual(requestBody.toBytes(), + BinaryData.fromStream(responseSupplier.get().getBodyAsInputStreamSync()).toBytes()); assertArraysEqual(requestBody.toBytes(), BinaryData.fromFlux(responseSupplier.get().getBody()).map(BinaryData::toBytes).block()); assertArraysEqual(requestBody.toBytes(), getResponseBytesViaWritableChannel(responseSupplier.get())); @@ -416,6 +418,11 @@ public void bufferedResponseCanBeReadMultipleTimes() throws IOException { assertArraysEqual(requestBody.toBytes(), response.getBodyAsInputStream().map(s -> BinaryData.fromStream(s).toBytes()).block()); + assertArraysEqual(requestBody.toBytes(), + BinaryData.fromStream(response.getBodyAsInputStreamSync()).toBytes()); + assertArraysEqual(requestBody.toBytes(), + BinaryData.fromStream(response.getBodyAsInputStreamSync()).toBytes()); + assertArraysEqual(requestBody.toBytes(), BinaryData.fromFlux(response.getBody()).map(BinaryData::toBytes).block()); assertArraysEqual(requestBody.toBytes(), From 4d056d83c3b464e4f30370e65508db859464cf49 Mon Sep 17 00:00:00 2001 From: Srikanta Nagaraja Date: Wed, 22 Apr 2026 12:24:21 -0700 Subject: [PATCH 4/4] spotless formatting --- .../java/com/azure/core/implementation/FluxInputStream.java | 4 ++-- .../com/azure/core/validation/http/HttpClientTests.java | 6 ++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/FluxInputStream.java b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/FluxInputStream.java index b26d8192eea9..aa962955a1b2 100644 --- a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/FluxInputStream.java +++ b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/FluxInputStream.java @@ -269,8 +269,8 @@ private void validateParameters(byte[] bytes, int offset, int length) { throw LOGGER.logExceptionAsError(new IndexOutOfBoundsException("'length' cannot be less than 0")); } if (offset > bytes.length) { - throw LOGGER.logExceptionAsError( - new IndexOutOfBoundsException("'offset' cannot be greater than 'bytes'.length")); + throw LOGGER + .logExceptionAsError(new IndexOutOfBoundsException("'offset' cannot be greater than 'bytes'.length")); } if (length > (bytes.length - offset)) { throw LOGGER.logExceptionAsError( diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/validation/http/HttpClientTests.java b/sdk/core/azure-core/src/test/java/com/azure/core/validation/http/HttpClientTests.java index 63c8e1941d6d..541a9c69e82a 100644 --- a/sdk/core/azure-core/src/test/java/com/azure/core/validation/http/HttpClientTests.java +++ b/sdk/core/azure-core/src/test/java/com/azure/core/validation/http/HttpClientTests.java @@ -418,10 +418,8 @@ public void bufferedResponseCanBeReadMultipleTimes() throws IOException { assertArraysEqual(requestBody.toBytes(), response.getBodyAsInputStream().map(s -> BinaryData.fromStream(s).toBytes()).block()); - assertArraysEqual(requestBody.toBytes(), - BinaryData.fromStream(response.getBodyAsInputStreamSync()).toBytes()); - assertArraysEqual(requestBody.toBytes(), - BinaryData.fromStream(response.getBodyAsInputStreamSync()).toBytes()); + assertArraysEqual(requestBody.toBytes(), BinaryData.fromStream(response.getBodyAsInputStreamSync()).toBytes()); + assertArraysEqual(requestBody.toBytes(), BinaryData.fromStream(response.getBodyAsInputStreamSync()).toBytes()); assertArraysEqual(requestBody.toBytes(), BinaryData.fromFlux(response.getBody()).map(BinaryData::toBytes).block());