From 13c7f2a749ab4766f9b35b1865ec3d5114a3bfda Mon Sep 17 00:00:00 2001 From: Isabelle Date: Sun, 29 Mar 2026 15:26:59 -0700 Subject: [PATCH 1/3] all upload stress tests --- .../scenarios-matrix.yaml | 179 ++++++++++++++++++ .../com/azure/storage/blob/stress/App.java | 10 + ...ntentValidationAppendBlobOutputStream.java | 80 ++++++++ .../stress/ContentValidationAppendBlock.java | 76 ++++++++ ...ontentValidationBlockBlobOutputStream.java | 75 ++++++++ .../ContentValidationBlockBlobUpload.java | 70 +++++++ ...ContentValidationPageBlobOutputStream.java | 99 ++++++++++ ...entValidationSeekableByteChannelWrite.java | 81 ++++++++ .../stress/ContentValidationStageBlock.java | 89 +++++++++ .../ContentValidationStressOptions.java | 26 +++ .../blob/stress/ContentValidationUpload.java | 70 +++++++ .../ContentValidationUploadFromFile.java | 134 +++++++++++++ .../stress/ContentValidationUploadPages.java | 82 ++++++++ .../azure/storage/stress/TelemetryHelper.java | 2 +- 14 files changed, 1072 insertions(+), 1 deletion(-) create mode 100644 sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationAppendBlobOutputStream.java create mode 100644 sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationAppendBlock.java create mode 100644 sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationBlockBlobOutputStream.java create mode 100644 sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationBlockBlobUpload.java create mode 100644 sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationPageBlobOutputStream.java create mode 100644 sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationSeekableByteChannelWrite.java create mode 100644 sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationStageBlock.java create mode 100644 sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationStressOptions.java create mode 100644 sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUpload.java create mode 100644 sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadFromFile.java create mode 100644 sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadPages.java diff --git a/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml b/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml index 3a8920a67b35..128ae81472a6 100644 --- a/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml +++ b/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml @@ -323,3 +323,182 @@ matrix: uploadFaults: true durationMin: 60 imageBuildDir: "../../.." + + # --- Content validation (ContentValidation*; default CRC64). Scenario keys are short: K8s label + # testInstance = "{Scenario}-{BaseName}-{revision}" must be <= 63 chars (see stress-test-job.yaml). + + cv-apb-sm: + testScenario: contentvalidationappendblock + sync: true + sizeBytes: 1024 + uploadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + cv-apb-async: + testScenario: contentvalidationappendblock + sync: false + sizeBytes: 1024 + uploadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + cv-apb-lg: + testScenario: contentvalidationappendblock + sync: true + sizeBytes: "26214400" + uploadFaults: true + durationMin: 30 + imageBuildDir: "../../.." + + cv-apbos-sm: + testScenario: contentvalidationappendbloboutputstream + sync: true + sizeBytes: 1024 + uploadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + cv-apbos-lg: + testScenario: contentvalidationappendbloboutputstream + sync: true + sizeBytes: "10240" + uploadFaults: true + durationMin: 60 + imageBuildDir: "../../.." + + cv-bbup-sm: + testScenario: contentvalidationblockblobupload + sync: true + sizeBytes: 1024 + uploadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + cv-bbup-lg: + testScenario: contentvalidationblockblobupload + sync: true + sizeBytes: "26214400" + uploadFaults: true + durationMin: 60 + imageBuildDir: "../../.." + + cv-bbos-sm: + testScenario: contentvalidationblockbloboutputstream + sync: true + sizeBytes: 1024 + uploadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + cv-bbos-lg: + testScenario: contentvalidationblockbloboutputstream + sync: true + sizeBytes: "26214400" + uploadFaults: true + durationMin: 60 + imageBuildDir: "../../.." + + cv-sbbcw-sm: + testScenario: contentvalidationseekablebytechannelwrite + sync: true + sizeBytes: 1024 + uploadFaults: true + durationMin: 10 + imageBuildDir: "../../.." + + cv-sbbcw-lg: + testScenario: contentvalidationseekablebytechannelwrite + sync: true + sizeBytes: "52428800" + uploadFaults: true + durationMin: 60 + imageBuildDir: "../../.." + + cv-stg-sm: + testScenario: contentvalidationstageblock + sync: true + sizeBytes: 1024 + uploadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + cv-stg-lg: + testScenario: contentvalidationstageblock + sync: true + sizeBytes: "26214400" + uploadFaults: true + durationMin: 60 + imageBuildDir: "../../.." + + cv-pbos-sm: + testScenario: contentvalidationpagebloboutputstream + sync: true + sizeBytes: 1024 + uploadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + cv-pbos-lg: + testScenario: contentvalidationpagebloboutputstream + sync: true + sizeBytes: "10240" + uploadFaults: true + durationMin: 60 + imageBuildDir: "../../.." + + cv-pgup-sm: + testScenario: contentvalidationuploadpages + sync: true + sizeBytes: 1024 + uploadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + cv-pgup-lg: + testScenario: contentvalidationuploadpages + sync: true + sizeBytes: "4194304" + uploadFaults: true + durationMin: 60 + imageBuildDir: "../../.." + + cv-par-sm: + testScenario: contentvalidationupload + sync: true + sizeBytes: 1024 + uploadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + cv-par-async: + testScenario: contentvalidationupload + sync: false + sizeBytes: 1024 + uploadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + cv-par-lg: + testScenario: contentvalidationupload + sync: true + sizeBytes: "52428800" + uploadFaults: true + durationMin: 60 + imageBuildDir: "../../.." + + cv-uff-sm: + testScenario: contentvalidationuploadfromfile + sync: true + sizeBytes: 1024 + uploadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + cv-uff-lg: + testScenario: contentvalidationuploadfromfile + sync: true + sizeBytes: "52428800" + uploadFaults: true + durationMin: 60 + imageBuildDir: "../../.." diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/App.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/App.java index e38bd16791ca..3f3219930596 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/App.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/App.java @@ -15,6 +15,16 @@ public static void main(String[] args) { BlockBlobOutputStream.class, BlockBlobUpload.class, CommitBlockList.class, + ContentValidationAppendBlobOutputStream.class, + ContentValidationAppendBlock.class, + ContentValidationBlockBlobOutputStream.class, + ContentValidationBlockBlobUpload.class, + ContentValidationPageBlobOutputStream.class, + ContentValidationStageBlock.class, + ContentValidationSeekableByteChannelWrite.class, + ContentValidationUpload.class, + ContentValidationUploadFromFile.class, + ContentValidationUploadPages.class, DownloadToFile.class, DownloadStream.class, DownloadContent.class, diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationAppendBlobOutputStream.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationAppendBlobOutputStream.java new file mode 100644 index 000000000000..8cdb0c6349e3 --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationAppendBlobOutputStream.java @@ -0,0 +1,80 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.Context; +import com.azure.core.util.logging.ClientLogger; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.options.AppendBlobOutputStreamOptions; +import com.azure.storage.blob.specialized.AppendBlobClient; +import com.azure.storage.blob.specialized.BlobOutputStream; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.stress.CrcInputStream; +import reactor.core.publisher.Mono; + +import java.io.IOException; + +import static com.azure.core.util.FluxUtil.monoError; + +/** + * Append blob output stream with {@link AppendBlobOutputStreamOptions#setRequestChecksumAlgorithm} (sync only). + */ +public class ContentValidationAppendBlobOutputStream extends BlobScenarioBase { + private static final ClientLogger LOGGER = new ClientLogger(ContentValidationAppendBlobOutputStream.class); + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncNoFaultClient; + /** Separate blob used to upload reference content for {@link OriginalContent} checksum (block blob). */ + private final BlobAsyncClient tempSetupBlobClient; + + public ContentValidationAppendBlobOutputStream(ContentValidationStressOptions options) { + super(options); + String blobName = generateBlobName(); + String tempBlobName = generateBlobName(); + + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.tempSetupBlobClient = getAsyncContainerClientNoFault().getBlobAsyncClient(tempBlobName); + } + + @Override + protected void runInternal(Context span) throws IOException { + AppendBlobClient appendBlobClient = syncClient.getAppendBlobClient(); + AppendBlobOutputStreamOptions streamOptions = new AppendBlobOutputStreamOptions() + .setRequestChecksumAlgorithm(options.getRequestChecksumAlgorithm()); + + try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize()); + BlobOutputStream outputStream = appendBlobClient.getBlobOutputStream(streamOptions)) { + byte[] buffer = new byte[4096]; + int bytesRead; + + while ((bytesRead = inputStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, bytesRead); + } + + outputStream.close(); + originalContent.checkMatch(inputStream.getContentInfo(), span).block(); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + return monoError(LOGGER, new RuntimeException("getBlobOutputStream() does not exist on the async client")); + } + + @Override + public Mono setupAsync() { + return super.setupAsync() + .then(asyncNoFaultClient.getAppendBlobAsyncClient().create()) + .then(originalContent.setupBlob(tempSetupBlobClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.deleteIfExists() + .then(tempSetupBlobClient.deleteIfExists()) + .then(super.cleanupAsync()); + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationAppendBlock.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationAppendBlock.java new file mode 100644 index 000000000000..050d53ddb999 --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationAppendBlock.java @@ -0,0 +1,76 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.Context; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.options.AppendBlobAppendBlockOptions; +import com.azure.storage.blob.specialized.AppendBlobAsyncClient; +import com.azure.storage.blob.specialized.AppendBlobClient; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.stress.CrcInputStream; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.nio.ByteBuffer; + +/** + * Append block with {@link AppendBlobAppendBlockOptions#setRequestChecksumAlgorithm}. + */ +public class ContentValidationAppendBlock extends BlobScenarioBase { + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncClient; + private final BlobAsyncClient asyncNoFaultClient; + private final BlobAsyncClient tempSetupBlobClient; + + public ContentValidationAppendBlock(ContentValidationStressOptions options) { + super(options); + String blobName = generateBlobName(); + String tempBlobName = generateBlobName(); + + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.asyncClient = getAsyncContainerClient().getBlobAsyncClient(blobName); + this.tempSetupBlobClient = getAsyncContainerClientNoFault().getBlobAsyncClient(tempBlobName); + } + + @Override + protected void runInternal(Context span) { + try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) { + AppendBlobClient appendBlobClient = syncClient.getAppendBlobClient(); + appendBlobClient.appendBlockWithResponse( + new AppendBlobAppendBlockOptions(inputStream, options.getSize()) + .setRequestChecksumAlgorithm(options.getRequestChecksumAlgorithm()), + null, span); + originalContent.checkMatch(inputStream.getContentInfo(), span).block(); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + AppendBlobAsyncClient appendBlobAsyncClient = asyncClient.getAppendBlobAsyncClient(); + Flux byteBufferFlux = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize()) + .convertStreamToByteBuffer(); + return appendBlobAsyncClient.appendBlockWithResponse( + new AppendBlobAppendBlockOptions(byteBufferFlux, options.getSize()) + .setRequestChecksumAlgorithm(options.getRequestChecksumAlgorithm())) + .then(originalContent.checkMatch(byteBufferFlux, span)); + } + + @Override + public Mono setupAsync() { + return super.setupAsync().then(asyncNoFaultClient.getAppendBlobAsyncClient().create()) + .then(originalContent.setupBlob(tempSetupBlobClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.getAppendBlobAsyncClient().deleteIfExists() + .onErrorResume(e -> Mono.empty()) + .then(tempSetupBlobClient.deleteIfExists()) + .then(super.cleanupAsync()); + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationBlockBlobOutputStream.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationBlockBlobOutputStream.java new file mode 100644 index 000000000000..b0c98c492e6c --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationBlockBlobOutputStream.java @@ -0,0 +1,75 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.Context; +import com.azure.core.util.logging.ClientLogger; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.models.ParallelTransferOptions; +import com.azure.storage.blob.options.BlockBlobOutputStreamOptions; +import com.azure.storage.blob.specialized.BlobOutputStream; +import com.azure.storage.blob.specialized.BlockBlobClient; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.stress.CrcInputStream; +import reactor.core.publisher.Mono; + +import java.io.IOException; + +import static com.azure.core.util.FluxUtil.monoError; + +/** + * Block blob output stream with {@link BlockBlobOutputStreamOptions#setRequestChecksumAlgorithm} (sync only). + */ +public class ContentValidationBlockBlobOutputStream extends BlobScenarioBase { + private static final ClientLogger LOGGER = new ClientLogger(ContentValidationBlockBlobOutputStream.class); + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncNoFaultClient; + private final ParallelTransferOptions parallelTransferOptions; + + public ContentValidationBlockBlobOutputStream(ContentValidationStressOptions options) { + super(options); + String blobName = generateBlobName(); + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.parallelTransferOptions = new ParallelTransferOptions().setMaxConcurrency(options.getMaxConcurrency()); + } + + @Override + protected void runInternal(Context span) throws IOException { + BlockBlobClient blockBlobClient = syncClient.getBlockBlobClient(); + BlockBlobOutputStreamOptions streamOptions = new BlockBlobOutputStreamOptions() + .setParallelTransferOptions(parallelTransferOptions) + .setRequestChecksumAlgorithm(options.getRequestChecksumAlgorithm()); + + try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize()); + BlobOutputStream outputStream = blockBlobClient.getBlobOutputStream(streamOptions, span)) { + byte[] buffer = new byte[4096]; + int bytesRead; + + while ((bytesRead = inputStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, bytesRead); + } + + outputStream.close(); + originalContent.checkMatch(inputStream.getContentInfo(), span).block(); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + return monoError(LOGGER, new RuntimeException("getBlobOutputStream() does not exist on the async client")); + } + + @Override + public Mono setupAsync() { + return super.setupAsync().then(originalContent.setupBlob(asyncNoFaultClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.deleteIfExists().then(super.cleanupAsync()); + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationBlockBlobUpload.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationBlockBlobUpload.java new file mode 100644 index 000000000000..63ddd254dc41 --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationBlockBlobUpload.java @@ -0,0 +1,70 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.Context; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.options.BlockBlobSimpleUploadOptions; +import com.azure.storage.blob.specialized.BlockBlobAsyncClient; +import com.azure.storage.blob.specialized.BlockBlobClient; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.stress.CrcInputStream; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.nio.ByteBuffer; + +/** + * Single-shot block blob upload with request content validation + * ({@link BlockBlobSimpleUploadOptions#setRequestChecksumAlgorithm}). + */ +public class ContentValidationBlockBlobUpload extends BlobScenarioBase { + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncClient; + private final BlobAsyncClient asyncNoFaultClient; + + public ContentValidationBlockBlobUpload(ContentValidationStressOptions options) { + super(options); + String blobName = generateBlobName(); + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.asyncClient = getAsyncContainerClient().getBlobAsyncClient(blobName); + } + + @Override + protected void runInternal(Context span) { + BlockBlobClient blockBlobClient = syncClient.getBlockBlobClient(); + try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) { + blockBlobClient.uploadWithResponse( + new BlockBlobSimpleUploadOptions(inputStream, options.getSize()) + .setRequestChecksumAlgorithm(options.getRequestChecksumAlgorithm()), + null, span); + originalContent.checkMatch(inputStream.getContentInfo(), span).block(); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + Flux byteBufferFlux = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize()) + .convertStreamToByteBuffer(); + BlockBlobAsyncClient blockBlobAsyncClient = asyncClient.getBlockBlobAsyncClient(); + return blockBlobAsyncClient.uploadWithResponse( + new BlockBlobSimpleUploadOptions(byteBufferFlux, options.getSize()) + .setRequestChecksumAlgorithm(options.getRequestChecksumAlgorithm())) + .then(originalContent.checkMatch(byteBufferFlux, span)); + } + + @Override + public Mono setupAsync() { + return super.setupAsync().then(originalContent.setupBlob(asyncNoFaultClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.deleteIfExists() + .then(super.cleanupAsync()); + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationPageBlobOutputStream.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationPageBlobOutputStream.java new file mode 100644 index 000000000000..49bfc5caa740 --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationPageBlobOutputStream.java @@ -0,0 +1,99 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.Context; +import com.azure.core.util.logging.ClientLogger; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.models.PageRange; +import com.azure.storage.blob.options.PageBlobOutputStreamOptions; +import com.azure.storage.blob.specialized.BlobOutputStream; +import com.azure.storage.blob.specialized.PageBlobAsyncClient; +import com.azure.storage.blob.specialized.PageBlobClient; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.stress.CrcInputStream; +import reactor.core.publisher.Mono; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import static com.azure.core.util.FluxUtil.monoError; + +/** + * Page blob output stream with {@link PageBlobOutputStreamOptions#setRequestChecksumAlgorithm} (sync only). + */ +public class ContentValidationPageBlobOutputStream extends PageBlobScenarioBase { + private static final ClientLogger LOGGER = new ClientLogger(ContentValidationPageBlobOutputStream.class); + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncNoFaultClient; + /** Page blob used only to seed {@link OriginalContent} (same pattern as {@link PageBlobOutputStream}). */ + private final PageBlobAsyncClient tempSetupPageBlobClient; + + public ContentValidationPageBlobOutputStream(ContentValidationStressOptions options) { + super(options); + String blobName = generateBlobName(); + String tempBlobName = generateBlobName(); + + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + BlobAsyncClient tempSetupBlobClient = getAsyncContainerClientNoFault().getBlobAsyncClient(tempBlobName); + this.tempSetupPageBlobClient = tempSetupBlobClient.getPageBlobAsyncClient(); + } + + @Override + protected void runInternal(Context span) throws IOException { + PageBlobClient pageBlobClient = syncClient.getPageBlobClient(); + PageBlobOutputStreamOptions streamOptions = new PageBlobOutputStreamOptions( + new PageRange().setStart(0).setEnd(options.getSize() - 1)) + .setRequestChecksumAlgorithm(options.getRequestChecksumAlgorithm()); + + try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize()); + BlobOutputStream outputStream = pageBlobClient.getBlobOutputStream(streamOptions)) { + ByteArrayOutputStream bufferStream = new ByteArrayOutputStream(); + byte[] buffer = new byte[512]; + int bytesRead; + + while ((bytesRead = inputStream.read(buffer)) != -1) { + if (bytesRead < buffer.length) { + bufferStream.write(buffer, 0, bytesRead); + if (bufferStream.size() >= buffer.length) { + byte[] toWrite = bufferStream.toByteArray(); + int length = toWrite.length - (toWrite.length % buffer.length); + outputStream.write(toWrite, 0, length); + bufferStream.reset(); + bufferStream.write(toWrite, length, (toWrite.length % buffer.length)); + } + } else { + outputStream.write(buffer, 0, bytesRead); + } + } + + outputStream.close(); + originalContent.checkMatch(inputStream.getContentInfo(), span).block(); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + return monoError(LOGGER, new RuntimeException("getBlobOutputStream() does not exist on the async client")); + } + + @Override + public Mono setupAsync() { + return super.setupAsync() + .then(asyncNoFaultClient.getPageBlobAsyncClient().create(options.getSize())) + .then(tempSetupPageBlobClient.create(options.getSize())) + .then(originalContent.setupPageBlob(tempSetupPageBlobClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.getPageBlobAsyncClient().deleteIfExists() + .onErrorResume(e -> Mono.empty()) + .then(tempSetupPageBlobClient.deleteIfExists()) + .then(super.cleanupAsync()); + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationSeekableByteChannelWrite.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationSeekableByteChannelWrite.java new file mode 100644 index 000000000000..82ed215a4514 --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationSeekableByteChannelWrite.java @@ -0,0 +1,81 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.Context; +import com.azure.core.util.logging.ClientLogger; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.options.BlockBlobSeekableByteChannelWriteOptions; +import com.azure.storage.blob.specialized.BlockBlobClient; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.common.implementation.StorageSeekableByteChannel; +import com.azure.storage.stress.CrcInputStream; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import static com.azure.core.util.FluxUtil.monoError; +import static com.azure.storage.blob.options.BlockBlobSeekableByteChannelWriteOptions.WriteMode.OVERWRITE; + +/** + * Block-blob seekable byte channel write with {@link BlockBlobSeekableByteChannelWriteOptions#setRequestChecksumAlgorithm}. + * Matches {@link com.azure.storage.blob.BlobContentValidationUploadTests} seekable-channel scenarios (sync only). + */ +public class ContentValidationSeekableByteChannelWrite extends BlobScenarioBase { + private static final ClientLogger LOGGER = new ClientLogger(ContentValidationSeekableByteChannelWrite.class); + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncNoFaultClient; + + public ContentValidationSeekableByteChannelWrite(ContentValidationStressOptions options) { + super(options); + String blobName = generateBlobName(); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + } + + @Override + protected void runInternal(Context span) throws IOException { + BlockBlobClient blockBlobClient = syncClient.getBlockBlobClient(); + BlockBlobSeekableByteChannelWriteOptions writeOptions = new BlockBlobSeekableByteChannelWriteOptions(OVERWRITE) + .setRequestChecksumAlgorithm(options.getRequestChecksumAlgorithm()); + + try (CrcInputStream crcInput = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) { + Flux byteBufferFlux = crcInput.convertStreamToByteBuffer(); + try (StorageSeekableByteChannel channel = (StorageSeekableByteChannel) blockBlobClient.openSeekableByteChannelWrite( + writeOptions)) { + Mono writeOperation = byteBufferFlux + .doOnNext(buffer -> { + try { + channel.write(buffer); + } catch (IOException e) { + throw LOGGER.logExceptionAsError(new RuntimeException(e)); + } + }).then(); + writeOperation.block(); + channel.getWriteBehavior().commit(options.getSize()); + } + originalContent.checkMatch(byteBufferFlux, span).block(); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + return monoError(LOGGER, new RuntimeException( + "openSeekableByteChannelWrite() does not exist on the async client")); + } + + @Override + public Mono setupAsync() { + return super.setupAsync().then(originalContent.setupBlob(asyncNoFaultClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.deleteIfExists().then(super.cleanupAsync()); + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationStageBlock.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationStageBlock.java new file mode 100644 index 000000000000..5e3a26224068 --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationStageBlock.java @@ -0,0 +1,89 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.BinaryData; +import com.azure.core.util.Context; +import com.azure.core.util.CoreUtils; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.options.BlockBlobCommitBlockListOptions; +import com.azure.storage.blob.options.BlockBlobStageBlockOptions; +import com.azure.storage.blob.specialized.BlockBlobAsyncClient; +import com.azure.storage.blob.specialized.BlockBlobClient; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.stress.CrcInputStream; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Collections; + +/** + * Stage block with request content validation on the faulted client, then commit via the non-faulted client. + */ +public class ContentValidationStageBlock extends BlobScenarioBase { + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncClient; + private final BlobClient syncNoFaultClient; + private final BlobAsyncClient asyncNoFaultClient; + + public ContentValidationStageBlock(ContentValidationStressOptions options) { + super(options); + String blobName = generateBlobName(); + this.syncNoFaultClient = getSyncContainerClientNoFault().getBlobClient(blobName); + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.asyncClient = getAsyncContainerClient().getBlobAsyncClient(blobName); + } + + @Override + protected void runInternal(Context span) { + BlockBlobClient blockBlobClient = syncClient.getBlockBlobClient(); + BlockBlobClient blockBlobClientNoFault = syncNoFaultClient.getBlockBlobClient(); + String blockId = Base64.getEncoder().encodeToString(CoreUtils.randomUuid().toString() + .getBytes(StandardCharsets.UTF_8)); + try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) { + BinaryData data = BinaryData.fromStream(inputStream, options.getSize()); + blockBlobClient.stageBlockWithResponse( + new BlockBlobStageBlockOptions(blockId, data) + .setRequestChecksumAlgorithm(options.getRequestChecksumAlgorithm()), + null, span); + blockBlobClientNoFault.commitBlockListWithResponse( + new BlockBlobCommitBlockListOptions(Collections.singletonList(blockId)), null, span); + originalContent.checkMatch(inputStream.getContentInfo(), span).block(); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + BlockBlobAsyncClient blockBlobAsyncClient = asyncClient.getBlockBlobAsyncClient(); + BlockBlobAsyncClient blockBlobAsyncClientNoFault = asyncNoFaultClient.getBlockBlobAsyncClient(); + String blockId = Base64.getEncoder().encodeToString(CoreUtils.randomUuid().toString() + .getBytes(StandardCharsets.UTF_8)); + Flux byteBufferFlux = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize()) + .convertStreamToByteBuffer(); + return BinaryData.fromFlux(byteBufferFlux, options.getSize(), false) + .flatMap(binaryData -> blockBlobAsyncClient.stageBlockWithResponse( + new BlockBlobStageBlockOptions(blockId, binaryData) + .setRequestChecksumAlgorithm(options.getRequestChecksumAlgorithm()))) + .then(blockBlobAsyncClientNoFault.commitBlockListWithResponse( + new BlockBlobCommitBlockListOptions(Collections.singletonList(blockId)))) + .then(originalContent.checkMatch(byteBufferFlux, span)); + } + + @Override + public Mono setupAsync() { + return super.setupAsync().then(originalContent.setupBlob(asyncNoFaultClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.deleteIfExists() + .then(super.cleanupAsync()); + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationStressOptions.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationStressOptions.java new file mode 100644 index 000000000000..948c8139a506 --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationStressOptions.java @@ -0,0 +1,26 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.storage.common.StorageChecksumAlgorithm; +import com.azure.storage.stress.StorageStressOptions; +import com.beust.jcommander.Parameter; + +/** + * Options for stress scenarios that enable transactional request content validation on uploads + * (CRC64 / structured message). See {@link com.azure.storage.blob.BlobContentValidationUploadTests}. + */ +public class ContentValidationStressOptions extends StorageStressOptions { + /** + * Request checksum behavior for upload APIs. Use CRC64 or AUTO to exercise content validation. + * MD5 is not supported for uploadFromFile. NONE disables request validation. + */ + @Parameter(names = { "--requestChecksumAlgorithm" }, + description = "CRC64 (default), AUTO, NONE, or MD5 (not valid for upload-from-file scenarios)") + private StorageChecksumAlgorithm requestChecksumAlgorithm = StorageChecksumAlgorithm.CRC64; + + public StorageChecksumAlgorithm getRequestChecksumAlgorithm() { + return requestChecksumAlgorithm; + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUpload.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUpload.java new file mode 100644 index 000000000000..889c55d41735 --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUpload.java @@ -0,0 +1,70 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.Context; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.models.ParallelTransferOptions; +import com.azure.storage.blob.options.BlobParallelUploadOptions; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.stress.CrcInputStream; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.nio.ByteBuffer; + +/** + * Parallel blob upload with {@link com.azure.storage.blob.options.BlobParallelUploadOptions#setRequestChecksumAlgorithm} + * enabled. Verifies stored data via CRC after upload (see {@code BlobContentValidationUploadTests}). + */ +public class ContentValidationUpload extends BlobScenarioBase { + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncClient; + private final BlobAsyncClient asyncNoFaultClient; + private final ParallelTransferOptions parallelTransferOptions; + + public ContentValidationUpload(ContentValidationStressOptions options) { + super(options); + String blobName = generateBlobName(); + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.asyncClient = getAsyncContainerClient().getBlobAsyncClient(blobName); + parallelTransferOptions = new ParallelTransferOptions() + .setMaxConcurrency(options.getMaxConcurrency()) + .setMaxSingleUploadSizeLong(4 * 1024 * 1024L); + } + + @Override + protected void runInternal(Context span) { + try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) { + syncClient.uploadWithResponse(new BlobParallelUploadOptions(inputStream) + .setParallelTransferOptions(parallelTransferOptions) + .setRequestChecksumAlgorithm(options.getRequestChecksumAlgorithm()), null, span); + originalContent.checkMatch(inputStream.getContentInfo(), span).block(); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + Flux byteBufferFlux = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize()) + .convertStreamToByteBuffer(); + return asyncClient.uploadWithResponse(new BlobParallelUploadOptions(byteBufferFlux) + .setParallelTransferOptions(parallelTransferOptions) + .setRequestChecksumAlgorithm(options.getRequestChecksumAlgorithm())) + .then(originalContent.checkMatch(byteBufferFlux, span)); + } + + @Override + public Mono setupAsync() { + return super.setupAsync().then(originalContent.setupBlob(asyncNoFaultClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.deleteIfExists() + .then(super.cleanupAsync()); + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadFromFile.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadFromFile.java new file mode 100644 index 000000000000..31db0f985bb6 --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadFromFile.java @@ -0,0 +1,134 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.BinaryData; +import com.azure.core.util.Context; +import com.azure.core.util.CoreUtils; +import com.azure.core.util.logging.ClientLogger; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.models.ParallelTransferOptions; +import com.azure.storage.blob.options.BlobDownloadToFileOptions; +import com.azure.storage.blob.options.BlobUploadFromFileOptions; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.common.StorageChecksumAlgorithm; +import com.azure.storage.stress.CrcInputStream; +import reactor.core.publisher.Mono; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.UUID; + +/** + * Upload from file with {@link BlobUploadFromFileOptions#setRequestChecksumAlgorithm}. MD5 is not supported for this API. + */ +public class ContentValidationUploadFromFile extends BlobScenarioBase { + private static final ClientLogger LOGGER = new ClientLogger(ContentValidationUploadFromFile.class); + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncClient; + private final BlobClient syncNoFaultClient; + private final BlobAsyncClient asyncNoFaultClient; + private final ParallelTransferOptions parallelTransferOptions; + + public ContentValidationUploadFromFile(ContentValidationStressOptions options) { + super(options); + if (options.getRequestChecksumAlgorithm() == StorageChecksumAlgorithm.MD5) { + throw LOGGER.logExceptionAsError(new IllegalArgumentException( + "StorageChecksumAlgorithm.MD5 is not supported for uploadFromFile. Use CRC64 or AUTO.")); + } + String blobName = generateBlobName(); + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + this.syncNoFaultClient = getSyncContainerClientNoFault().getBlobClient(blobName); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.asyncClient = getAsyncContainerClient().getBlobAsyncClient(blobName); + this.parallelTransferOptions = new ParallelTransferOptions() + .setMaxConcurrency(options.getMaxConcurrency()) + .setMaxSingleUploadSizeLong(4 * 1024 * 1024L); + } + + @Override + protected void runInternal(Context span) { + Path downloadPath = getTempPath("test"); + try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) { + Path uploadFilePath = generateFile(inputStream); + downloadPath = downloadPath.resolve(CoreUtils.randomUuid() + ".txt"); + syncClient.uploadFromFileWithResponse(new BlobUploadFromFileOptions(uploadFilePath.toString()) + .setParallelTransferOptions(parallelTransferOptions) + .setRequestChecksumAlgorithm(options.getRequestChecksumAlgorithm()), + null, span); + syncNoFaultClient.downloadToFileWithResponse( + new BlobDownloadToFileOptions(downloadPath.toString()), null, span); + originalContent.checkMatch(BinaryData.fromFile(downloadPath), span).block(); + } finally { + deleteFile(downloadPath); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + Path downloadPath = getTempPath("test"); + return Mono.using( + () -> new CrcInputStream(originalContent.getBlobContentHead(), options.getSize()), + inputStream -> { + Path uploadFilePath = generateFile(inputStream); + return Mono.using( + () -> downloadPath.resolve(UUID.randomUUID() + ".txt"), + path -> asyncClient.uploadFromFileWithResponse(new BlobUploadFromFileOptions(uploadFilePath.toString()) + .setParallelTransferOptions(parallelTransferOptions) + .setRequestChecksumAlgorithm(options.getRequestChecksumAlgorithm())) + .flatMap(ignored -> asyncNoFaultClient.downloadToFileWithResponse( + new BlobDownloadToFileOptions(path.toString()))) + .flatMap(ignored -> originalContent.checkMatch(BinaryData.fromFile(path), span)), + ContentValidationUploadFromFile::deleteFile); + }, + CrcInputStream::close); + } + + @Override + public Mono setupAsync() { + return super.setupAsync().then(originalContent.setupBlob(asyncNoFaultClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.deleteIfExists() + .then(super.cleanupAsync()); + } + + private Path getTempPath(String prefix) { + try { + return Files.createTempDirectory(prefix); + } catch (IOException e) { + throw LOGGER.logExceptionAsError(new UncheckedIOException(e)); + } + } + + private static void deleteFile(Path path) { + try { + Files.deleteIfExists(path); + } catch (Throwable e) { + LOGGER.atError() + .addKeyValue("path", path) + .log("failed to delete file", e); + } + } + + private static Path generateFile(InputStream inputStream) { + try { + File file = Files.createTempFile(CoreUtils.randomUuid().toString(), ".txt").toFile(); + file.deleteOnExit(); + Files.copy(inputStream, file.toPath(), StandardCopyOption.REPLACE_EXISTING); + return file.toPath(); + } catch (IOException e) { + throw LOGGER.logExceptionAsError(new UncheckedIOException(e)); + } + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadPages.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadPages.java new file mode 100644 index 000000000000..bb8779de826d --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadPages.java @@ -0,0 +1,82 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.Context; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.models.PageRange; +import com.azure.storage.blob.options.PageBlobUploadPagesOptions; +import com.azure.storage.blob.specialized.PageBlobAsyncClient; +import com.azure.storage.blob.specialized.PageBlobClient; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.stress.CrcInputStream; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.nio.ByteBuffer; + +/** + * Page blob upload pages with {@link PageBlobUploadPagesOptions#setRequestChecksumAlgorithm}. + */ +public class ContentValidationUploadPages extends PageBlobScenarioBase { + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncClient; + private final BlobAsyncClient asyncNoFaultClient; + private final PageBlobAsyncClient tempSetupPageBlobClient; + + public ContentValidationUploadPages(ContentValidationStressOptions options) { + super(options); + String blobName = generateBlobName(); + String tempBlobName = generateBlobName(); + + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.asyncClient = getAsyncContainerClient().getBlobAsyncClient(blobName); + BlobAsyncClient tempSetupBlobClient = getAsyncContainerClientNoFault().getBlobAsyncClient(tempBlobName); + this.tempSetupPageBlobClient = tempSetupBlobClient.getPageBlobAsyncClient(); + } + + @Override + protected void runInternal(Context span) { + try (CrcInputStream inputStream = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize())) { + PageBlobClient pageBlobClient = syncClient.getPageBlobClient(); + PageRange range = new PageRange().setStart(0).setEnd(options.getSize() - 1); + pageBlobClient.uploadPagesWithResponse( + new PageBlobUploadPagesOptions(range, inputStream) + .setRequestChecksumAlgorithm(options.getRequestChecksumAlgorithm()), + null, span); + originalContent.checkMatch(inputStream.getContentInfo(), span).block(); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + PageBlobAsyncClient pageBlobAsyncClient = asyncClient.getPageBlobAsyncClient(); + Flux byteBufferFlux = new CrcInputStream(originalContent.getBlobContentHead(), options.getSize()) + .convertStreamToByteBuffer(); + PageRange range = new PageRange().setStart(0).setEnd(options.getSize() - 1); + return pageBlobAsyncClient.uploadPagesWithResponse( + new PageBlobUploadPagesOptions(range, byteBufferFlux) + .setRequestChecksumAlgorithm(options.getRequestChecksumAlgorithm())) + .then(originalContent.checkMatch(byteBufferFlux, span)); + } + + @Override + public Mono setupAsync() { + return super.setupAsync() + .then(asyncNoFaultClient.getPageBlobAsyncClient().create(options.getSize())) + .then(tempSetupPageBlobClient.create(options.getSize())) + .then(originalContent.setupPageBlob(tempSetupPageBlobClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.getPageBlobAsyncClient().deleteIfExists() + .onErrorResume(e -> Mono.empty()) + .then(tempSetupPageBlobClient.deleteIfExists()) + .then(super.cleanupAsync()); + } +} diff --git a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java index 4bf6e523eae1..b633479100b1 100644 --- a/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java +++ b/sdk/storage/azure-storage-stress/src/main/java/com/azure/storage/stress/TelemetryHelper.java @@ -124,7 +124,7 @@ public String getDescription() { Cpu.registerObservers(otel); MemoryPools.registerObservers(otel); Threads.registerObservers(otel); - GarbageCollector.registerObservers(otel); + GarbageCollector.registerObservers(otel, true); OpenTelemetryAppender.install(otel); return otel; } From 9aad9e47cb463a60319e9456bc5f5b5e9c6bc127 Mon Sep 17 00:00:00 2001 From: Isabelle Date: Mon, 30 Mar 2026 12:37:30 -0700 Subject: [PATCH 2/3] addressing comments --- .../scenarios-matrix.yaml | 128 ++++++++++++++---- ...ContentValidationPageBlobOutputStream.java | 22 +-- ...entValidationSeekableByteChannelWrite.java | 4 +- .../blob/stress/ContentValidationUpload.java | 2 +- .../ContentValidationUploadFromFile.java | 42 +++--- 5 files changed, 145 insertions(+), 53 deletions(-) diff --git a/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml b/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml index 128ae81472a6..eede2cb17875 100644 --- a/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml +++ b/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml @@ -327,7 +327,7 @@ matrix: # --- Content validation (ContentValidation*; default CRC64). Scenario keys are short: K8s label # testInstance = "{Scenario}-{BaseName}-{revision}" must be <= 63 chars (see stress-test-job.yaml). - cv-apb-sm: + cv-appendblock-sm: testScenario: contentvalidationappendblock sync: true sizeBytes: 1024 @@ -335,7 +335,15 @@ matrix: durationMin: 25 imageBuildDir: "../../.." - cv-apb-async: + cv-appendblock-lg: + testScenario: contentvalidationappendblock + sync: true + sizeBytes: "26214400" + uploadFaults: true + durationMin: 30 + imageBuildDir: "../../.." + + cv-appendblock-async-sm: testScenario: contentvalidationappendblock sync: false sizeBytes: 1024 @@ -343,15 +351,15 @@ matrix: durationMin: 25 imageBuildDir: "../../.." - cv-apb-lg: + cv-appendblock-async-lg: testScenario: contentvalidationappendblock - sync: true + sync: false sizeBytes: "26214400" uploadFaults: true durationMin: 30 imageBuildDir: "../../.." - cv-apbos-sm: + cv-appendbloboutputstream-sm: testScenario: contentvalidationappendbloboutputstream sync: true sizeBytes: 1024 @@ -359,7 +367,7 @@ matrix: durationMin: 25 imageBuildDir: "../../.." - cv-apbos-lg: + cv-appendbloboutputstream-lg: testScenario: contentvalidationappendbloboutputstream sync: true sizeBytes: "10240" @@ -367,7 +375,7 @@ matrix: durationMin: 60 imageBuildDir: "../../.." - cv-bbup-sm: + cv-blockblobupload-sm: testScenario: contentvalidationblockblobupload sync: true sizeBytes: 1024 @@ -375,7 +383,7 @@ matrix: durationMin: 25 imageBuildDir: "../../.." - cv-bbup-lg: + cv-blockblobupload-lg: testScenario: contentvalidationblockblobupload sync: true sizeBytes: "26214400" @@ -383,7 +391,23 @@ matrix: durationMin: 60 imageBuildDir: "../../.." - cv-bbos-sm: + cv-blockbloboutputstream-async-sm: + testScenario: contentvalidationblockbloboutputstream + sync: false + sizeBytes: 1024 + uploadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + cv-blockbloboutputstream-async-lg: + testScenario: contentvalidationblockbloboutputstream + sync: false + sizeBytes: "26214400" + uploadFaults: true + durationMin: 60 + imageBuildDir: "../../.." + + cv-blockbloboutputstream-sm: testScenario: contentvalidationblockbloboutputstream sync: true sizeBytes: 1024 @@ -391,7 +415,7 @@ matrix: durationMin: 25 imageBuildDir: "../../.." - cv-bbos-lg: + cv-blockbloboutputstream-lg: testScenario: contentvalidationblockbloboutputstream sync: true sizeBytes: "26214400" @@ -399,7 +423,7 @@ matrix: durationMin: 60 imageBuildDir: "../../.." - cv-sbbcw-sm: + cv-seekablebytechannelwrite-sm: testScenario: contentvalidationseekablebytechannelwrite sync: true sizeBytes: 1024 @@ -407,7 +431,7 @@ matrix: durationMin: 10 imageBuildDir: "../../.." - cv-sbbcw-lg: + cv-seekablebytechannelwrite-lg: testScenario: contentvalidationseekablebytechannelwrite sync: true sizeBytes: "52428800" @@ -415,7 +439,7 @@ matrix: durationMin: 60 imageBuildDir: "../../.." - cv-stg-sm: + cv-stageblock-sm: testScenario: contentvalidationstageblock sync: true sizeBytes: 1024 @@ -423,7 +447,7 @@ matrix: durationMin: 25 imageBuildDir: "../../.." - cv-stg-lg: + cv-stageblock-lg: testScenario: contentvalidationstageblock sync: true sizeBytes: "26214400" @@ -431,7 +455,23 @@ matrix: durationMin: 60 imageBuildDir: "../../.." - cv-pbos-sm: + cv-stageblock-async-sm: + testScenario: contentvalidationstageblock + sync: false + sizeBytes: 1024 + uploadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + cv-stageblock-async-lg: + testScenario: contentvalidationstageblock + sync: false + sizeBytes: "26214400" + uploadFaults: true + durationMin: 60 + imageBuildDir: "../../.." + + cv-pagebloboutputstream-sm: testScenario: contentvalidationpagebloboutputstream sync: true sizeBytes: 1024 @@ -439,7 +479,7 @@ matrix: durationMin: 25 imageBuildDir: "../../.." - cv-pbos-lg: + cv-pagebloboutputstream-lg: testScenario: contentvalidationpagebloboutputstream sync: true sizeBytes: "10240" @@ -447,7 +487,7 @@ matrix: durationMin: 60 imageBuildDir: "../../.." - cv-pgup-sm: + cv-uploadpages-sm: testScenario: contentvalidationuploadpages sync: true sizeBytes: 1024 @@ -455,7 +495,7 @@ matrix: durationMin: 25 imageBuildDir: "../../.." - cv-pgup-lg: + cv-uploadpages-lg: testScenario: contentvalidationuploadpages sync: true sizeBytes: "4194304" @@ -463,7 +503,23 @@ matrix: durationMin: 60 imageBuildDir: "../../.." - cv-par-sm: + cv-uploadpages-async-sm: + testScenario: contentvalidationuploadpages + sync: false + sizeBytes: 1024 + uploadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + cv-uploadpages-async-lg: + testScenario: contentvalidationuploadpages + sync: false + sizeBytes: "4194304" + uploadFaults: true + durationMin: 60 + imageBuildDir: "../../.." + + cv-upload-sm: testScenario: contentvalidationupload sync: true sizeBytes: 1024 @@ -471,7 +527,15 @@ matrix: durationMin: 25 imageBuildDir: "../../.." - cv-par-async: + cv-upload-lg: + testScenario: contentvalidationupload + sync: true + sizeBytes: "52428800" + uploadFaults: true + durationMin: 60 + imageBuildDir: "../../.." + + cv-upload-async-sm: testScenario: contentvalidationupload sync: false sizeBytes: 1024 @@ -479,15 +543,15 @@ matrix: durationMin: 25 imageBuildDir: "../../.." - cv-par-lg: + cv-upload-async-lg: testScenario: contentvalidationupload - sync: true + sync: false sizeBytes: "52428800" uploadFaults: true durationMin: 60 imageBuildDir: "../../.." - cv-uff-sm: + cv-uploadfromfile-sm: testScenario: contentvalidationuploadfromfile sync: true sizeBytes: 1024 @@ -495,10 +559,26 @@ matrix: durationMin: 25 imageBuildDir: "../../.." - cv-uff-lg: + cv-uploadfromfile-lg: testScenario: contentvalidationuploadfromfile sync: true sizeBytes: "52428800" uploadFaults: true durationMin: 60 imageBuildDir: "../../.." + + cv-uploadfromfile-async-sm: + testScenario: contentvalidationuploadfromfile + sync: false + sizeBytes: 1024 + uploadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + cv-uploadfromfile-async-lg: + testScenario: contentvalidationuploadfromfile + sync: false + sizeBytes: "52428800" + uploadFaults: true + durationMin: 60 + imageBuildDir: "../../.." \ No newline at end of file diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationPageBlobOutputStream.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationPageBlobOutputStream.java index 49bfc5caa740..cc854625fd0a 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationPageBlobOutputStream.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationPageBlobOutputStream.java @@ -57,19 +57,25 @@ protected void runInternal(Context span) throws IOException { int bytesRead; while ((bytesRead = inputStream.read(buffer)) != -1) { - if (bytesRead < buffer.length) { - bufferStream.write(buffer, 0, bytesRead); - if (bufferStream.size() >= buffer.length) { - byte[] toWrite = bufferStream.toByteArray(); - int length = toWrite.length - (toWrite.length % buffer.length); + // Always accumulate into bufferStream to avoid dropping or reordering bytes + bufferStream.write(buffer, 0, bytesRead); + // Flush all full 512-byte pages from the accumulator + if (bufferStream.size() >= buffer.length) { + byte[] toWrite = bufferStream.toByteArray(); + int length = toWrite.length - (toWrite.length % buffer.length); + if (length > 0) { outputStream.write(toWrite, 0, length); bufferStream.reset(); - bufferStream.write(toWrite, length, (toWrite.length % buffer.length)); + // Keep any remaining partial page bytes in the accumulator + bufferStream.write(toWrite, length, toWrite.length - length); } - } else { - outputStream.write(buffer, 0, bytesRead); } } + // For page blobs, total content size must be a multiple of 512 bytes. + // Any remaining bytes here indicate misalignment and would result in silent truncation. + if (bufferStream.size() != 0) { + throw new IOException("Remaining bytes in buffer that do not align to 512-byte page size."); + } outputStream.close(); originalContent.checkMatch(inputStream.getContentInfo(), span).block(); diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationSeekableByteChannelWrite.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationSeekableByteChannelWrite.java index 82ed215a4514..def7e097688b 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationSeekableByteChannelWrite.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationSeekableByteChannelWrite.java @@ -51,7 +51,9 @@ protected void runInternal(Context span) throws IOException { Mono writeOperation = byteBufferFlux .doOnNext(buffer -> { try { - channel.write(buffer); + while (buffer.hasRemaining()) { + channel.write(buffer); + } } catch (IOException e) { throw LOGGER.logExceptionAsError(new RuntimeException(e)); } diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUpload.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUpload.java index 889c55d41735..d1724ef6ad0d 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUpload.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUpload.java @@ -17,7 +17,7 @@ /** * Parallel blob upload with {@link com.azure.storage.blob.options.BlobParallelUploadOptions#setRequestChecksumAlgorithm} - * enabled. Verifies stored data via CRC after upload (see {@code BlobContentValidationUploadTests}). + * enabled. Verifies the correctness of the upload request content via CRC (see {@code BlobContentValidationUploadTests}). */ public class ContentValidationUpload extends BlobScenarioBase { private final OriginalContent originalContent = new OriginalContent(); diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadFromFile.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadFromFile.java index 31db0f985bb6..6af05e3470d0 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadFromFile.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationUploadFromFile.java @@ -13,7 +13,6 @@ import com.azure.storage.blob.options.BlobDownloadToFileOptions; import com.azure.storage.blob.options.BlobUploadFromFileOptions; import com.azure.storage.blob.stress.utils.OriginalContent; -import com.azure.storage.common.StorageChecksumAlgorithm; import com.azure.storage.stress.CrcInputStream; import reactor.core.publisher.Mono; @@ -27,7 +26,7 @@ import java.util.UUID; /** - * Upload from file with {@link BlobUploadFromFileOptions#setRequestChecksumAlgorithm}. MD5 is not supported for this API. + * Upload from file with {@link BlobUploadFromFileOptions#setRequestChecksumAlgorithm}. */ public class ContentValidationUploadFromFile extends BlobScenarioBase { private static final ClientLogger LOGGER = new ClientLogger(ContentValidationUploadFromFile.class); @@ -40,10 +39,6 @@ public class ContentValidationUploadFromFile extends BlobScenarioBase runInternalAsync(Context span) { Path downloadPath = getTempPath("test"); + // This is written differently than the other runInternalAsync methods because uploadFromFile requires a file + // path, so we need to generate the temp file. return Mono.using( () -> new CrcInputStream(originalContent.getBlobContentHead(), options.getSize()), - inputStream -> { - Path uploadFilePath = generateFile(inputStream); - return Mono.using( - () -> downloadPath.resolve(UUID.randomUUID() + ".txt"), - path -> asyncClient.uploadFromFileWithResponse(new BlobUploadFromFileOptions(uploadFilePath.toString()) - .setParallelTransferOptions(parallelTransferOptions) - .setRequestChecksumAlgorithm(options.getRequestChecksumAlgorithm())) - .flatMap(ignored -> asyncNoFaultClient.downloadToFileWithResponse( - new BlobDownloadToFileOptions(path.toString()))) - .flatMap(ignored -> originalContent.checkMatch(BinaryData.fromFile(path), span)), - ContentValidationUploadFromFile::deleteFile); - }, + inputStream -> uploadAndVerifyAsync(inputStream, downloadPath, span), CrcInputStream::close); } + private Mono uploadAndVerifyAsync(CrcInputStream inputStream, Path downloadDir, Context span) { + Path uploadFilePath = generateFile(inputStream); + Path downloadFilePath = downloadDir.resolve(UUID.randomUUID() + ".txt"); + + return asyncClient.uploadFromFileWithResponse(new BlobUploadFromFileOptions(uploadFilePath.toString()) + .setParallelTransferOptions(parallelTransferOptions) + .setRequestChecksumAlgorithm(options.getRequestChecksumAlgorithm())) + .flatMap(ignored -> asyncNoFaultClient.downloadToFileWithResponse( + new BlobDownloadToFileOptions(downloadFilePath.toString()))) + .flatMap(ignored -> originalContent.checkMatch(BinaryData.fromFile(downloadFilePath), span)) + .doFinally(signal -> { + deleteFile(uploadFilePath); + deleteFile(downloadFilePath); + }); + } + @Override public Mono setupAsync() { return super.setupAsync().then(originalContent.setupBlob(asyncNoFaultClient, options.getSize())); From 405e1085acb88c22d2947d40af79f6fc21cbc3ae Mon Sep 17 00:00:00 2001 From: Isabelle Date: Thu, 16 Apr 2026 10:04:56 -0700 Subject: [PATCH 3/3] removing no-op tests --- .../scenarios-matrix.yaml | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml b/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml index eede2cb17875..c5f644c699d2 100644 --- a/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml +++ b/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml @@ -391,22 +391,6 @@ matrix: durationMin: 60 imageBuildDir: "../../.." - cv-blockbloboutputstream-async-sm: - testScenario: contentvalidationblockbloboutputstream - sync: false - sizeBytes: 1024 - uploadFaults: true - durationMin: 25 - imageBuildDir: "../../.." - - cv-blockbloboutputstream-async-lg: - testScenario: contentvalidationblockbloboutputstream - sync: false - sizeBytes: "26214400" - uploadFaults: true - durationMin: 60 - imageBuildDir: "../../.." - cv-blockbloboutputstream-sm: testScenario: contentvalidationblockbloboutputstream sync: true