diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/wait/ConcurrentWaitForConditionExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/wait/ConcurrentWaitForConditionExample.java new file mode 100644 index 000000000..815ee2475 --- /dev/null +++ b/examples/src/main/java/software/amazon/lambda/durable/examples/wait/ConcurrentWaitForConditionExample.java @@ -0,0 +1,57 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples.wait; + +import java.util.stream.IntStream; +import software.amazon.lambda.durable.DurableContext; +import software.amazon.lambda.durable.DurableHandler; +import software.amazon.lambda.durable.config.MapConfig; +import software.amazon.lambda.durable.config.WaitForConditionConfig; +import software.amazon.lambda.durable.model.WaitForConditionResult; + +/** + * Example demonstrating concurrent waitForCondition operations using map. + * + *

Runs many (totalOperations) waitForCondition operations concurrently (maxConcurrency). Each operation: + * + *

    + *
  1. Uses attempt count as state (replay-safe). + *
  2. Fails and retries until the attempt count reaches the given threshold, and then succeeds + *
+ */ +public class ConcurrentWaitForConditionExample extends DurableHandler { + + public record Input(int threshold, int totalOperations, int maxConcurrency) {} + + @Override + public String handleRequest(Input input, DurableContext context) { + var items = IntStream.range(0, input.totalOperations()).boxed().toList(); + + var config = MapConfig.builder().maxConcurrency(input.maxConcurrency()).build(); + + var result = context.map( + "concurrent-wait-for-conditions", + items, + String.class, + (item, index, ctx) -> { + var conditionConfig = WaitForConditionConfig.builder() + .initialState(1) + .build(); + // Poll until the counter reaches the input threshold + var count = ctx.waitForCondition( + "condition-" + index, + Integer.class, + (callCount, stepCtx) -> { + if (callCount >= input.threshold()) { + return WaitForConditionResult.stopPolling(callCount); + } + return WaitForConditionResult.continuePolling(callCount + 1); + }, + conditionConfig); + return String.valueOf(count); + }, + config); + + return String.join(" | ", result.results()); + } +} diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/wait/WaitForConditionExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/wait/WaitForConditionExample.java index 6bed577fd..c6ec0492e 100644 --- a/examples/src/main/java/software/amazon/lambda/durable/examples/wait/WaitForConditionExample.java +++ b/examples/src/main/java/software/amazon/lambda/durable/examples/wait/WaitForConditionExample.java @@ -10,26 +10,29 @@ /** * Example demonstrating the waitForCondition operation. * - *

This example simulates waiting for an order to ship, by repeatedly calling a check function. + *

This handler polls a condition function until it signals completion: + * + *

    + *
  1. The attempt count is used as a state (replay safe) + *
  2. Fails and retries until the attempt count reaches the given threshold, and then succeeds + *
*/ public class WaitForConditionExample extends DurableHandler { @Override - public Integer handleRequest(Integer input, DurableContext context) { - // Poll the shipment status until the order is shipped. - // The check function simulates an order shipment (0 -> 1 -> 2 -> 3 -> 4) + public Integer handleRequest(Integer threshold, DurableContext context) { + // Poll until the counter reaches the input threshold return context.waitForCondition( - "wait-for-shipment", + "wait-for-condition", Integer.class, (callCount, stepCtx) -> { - // Simulate checking shipment status from an external service - if (callCount >= 3) { - // Order has shipped — stop polling - return WaitForConditionResult.stopPolling(callCount + 1); + if (callCount >= threshold) { + // Condition met, stop polling + return WaitForConditionResult.stopPolling(callCount); } - // Order still processing — continue polling + // Condition not met, keep polling return WaitForConditionResult.continuePolling(callCount + 1); }, - WaitForConditionConfig.builder().initialState(1).build()); // Order pending - initial status + WaitForConditionConfig.builder().initialState(1).build()); } } diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java index 55346efa7..d935e1719 100644 --- a/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java +++ b/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java @@ -5,6 +5,7 @@ import static org.junit.jupiter.api.Assertions.*; import static software.amazon.lambda.durable.TypeToken.get; +import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -25,6 +26,7 @@ import software.amazon.lambda.durable.examples.step.ManyAsyncStepsExample; import software.amazon.lambda.durable.examples.types.ApprovalRequest; import software.amazon.lambda.durable.examples.types.GreetingRequest; +import software.amazon.lambda.durable.examples.wait.ConcurrentWaitForConditionExample; import software.amazon.lambda.durable.model.ExecutionStatus; import software.amazon.lambda.durable.testing.CloudDurableTestRunner; @@ -594,4 +596,45 @@ void testComplexMapExample() { assertTrue(output.contains("healthy")); assertTrue(output.contains("reason=MIN_SUCCESSFUL_REACHED")); } + + @Test + void testWaitForConditionExample() { + var runner = CloudDurableTestRunner.create( + arn("wait-for-condition-example"), Integer.class, Integer.class, lambdaClient); + var result = runner.run(3); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals(3, result.getResult(Integer.class)); + } + + @Test + void testConcurrentWaitForConditionExample() { + var runner = CloudDurableTestRunner.create( + arn("concurrent-wait-for-condition-example"), + ConcurrentWaitForConditionExample.Input.class, + String.class, + lambdaClient); + var result = runner.run(new ConcurrentWaitForConditionExample.Input(3, 100, 50)); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + + // Verify each operation finished with 3 attempts + var allOperationsOutput = result.getResult(String.class); + var operationOutputs = allOperationsOutput.split(" \\| "); + assertEquals(100, operationOutputs.length); + for (var operationOutput : operationOutputs) { + assertEquals("3", operationOutput); + } + + // Verify each waitForCondition operation completes in under 30 seconds + var waitForConditionOps = result.getOperations().stream() + .filter(op -> "WaitForCondition".equals(op.getSubtype())) + .toList(); + for (var waitForConditionResult : waitForConditionOps) { + assertTrue( + waitForConditionResult.getDuration().compareTo(Duration.ofSeconds(30)) < 0, + "waitForCondition operation took " + + waitForConditionResult.getDuration().toSeconds() + "s, expected < 30s"); + } + } } diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/wait/ConcurrentWaitForConditionExampleTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/wait/ConcurrentWaitForConditionExampleTest.java new file mode 100644 index 000000000..7a4aabaa8 --- /dev/null +++ b/examples/src/test/java/software/amazon/lambda/durable/examples/wait/ConcurrentWaitForConditionExampleTest.java @@ -0,0 +1,29 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples.wait; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; +import software.amazon.lambda.durable.model.ExecutionStatus; +import software.amazon.lambda.durable.testing.LocalDurableTestRunner; + +class ConcurrentWaitForConditionExampleTest { + + @Test + void testConcurrentWaitForConditionExample() { + var handler = new ConcurrentWaitForConditionExample(); + var runner = LocalDurableTestRunner.create(ConcurrentWaitForConditionExample.Input.class, handler); + + var result = runner.runUntilComplete(new ConcurrentWaitForConditionExample.Input(3, 100, 50)); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + + var allOperationsOutput = result.getResult(String.class); + var operationResults = allOperationsOutput.split(" \\| "); + assertEquals(100, operationResults.length); + for (var operationResult : operationResults) { + assertEquals("3", operationResult); + } + } +} diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/wait/WaitForConditionExampleTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/wait/WaitForConditionExampleTest.java index f74ba8f61..121f1a1cd 100644 --- a/examples/src/test/java/software/amazon/lambda/durable/examples/wait/WaitForConditionExampleTest.java +++ b/examples/src/test/java/software/amazon/lambda/durable/examples/wait/WaitForConditionExampleTest.java @@ -15,9 +15,9 @@ void testWaitForConditionExample() { var handler = new WaitForConditionExample(); var runner = LocalDurableTestRunner.create(Integer.class, handler); - var result = runner.runUntilComplete(123); + var result = runner.runUntilComplete(3); assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); - assertEquals(4, result.getResult(Integer.class)); + assertEquals(3, result.getResult(Integer.class)); } } diff --git a/examples/template.yaml b/examples/template.yaml index f69e87998..324ff008e 100644 --- a/examples/template.yaml +++ b/examples/template.yaml @@ -518,6 +518,56 @@ Resources: DockerContext: ../ DockerTag: durable-examples + WaitForConditionExampleFunction: + Type: AWS::Serverless::Function + Properties: + PackageType: Image + FunctionName: !Join + - '' + - - 'wait-for-condition-example' + - !Ref FunctionNameSuffix + ImageConfig: + Command: ["software.amazon.lambda.durable.examples.wait.WaitForConditionExample::handleRequest"] + DurableConfig: + ExecutionTimeout: 300 + RetentionPeriodInDays: 7 + Policies: + - Statement: + - Effect: Allow + Action: + - lambda:CheckpointDurableExecutions + - lambda:GetDurableExecutionState + Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:wait-for-condition-example${FunctionNameSuffix}" + Metadata: + Dockerfile: !Ref DockerFile + DockerContext: ../ + DockerTag: durable-examples + + ConcurrentWaitForConditionExampleFunction: + Type: AWS::Serverless::Function + Properties: + PackageType: Image + FunctionName: !Join + - '' + - - 'concurrent-wait-for-condition-example' + - !Ref FunctionNameSuffix + ImageConfig: + Command: ["software.amazon.lambda.durable.examples.wait.ConcurrentWaitForConditionExample::handleRequest"] + DurableConfig: + ExecutionTimeout: 300 + RetentionPeriodInDays: 7 + Policies: + - Statement: + - Effect: Allow + Action: + - lambda:CheckpointDurableExecutions + - lambda:GetDurableExecutionState + Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:concurrent-wait-for-condition-example${FunctionNameSuffix}" + Metadata: + Dockerfile: !Ref DockerFile + DockerContext: ../ + DockerTag: durable-examples + Outputs: NoopExampleFunction: Description: Noop Example Function ARN @@ -679,3 +729,19 @@ Outputs: Description: Complex Map Example Function Name Value: !Ref ComplexMapExampleFunction + WaitForConditionExampleFunction: + Description: Wait For Condition Example Function ARN + Value: !GetAtt WaitForConditionExampleFunction.Arn + + WaitForConditionExampleFunctionName: + Description: Wait For Condition Example Function Name + Value: !Ref WaitForConditionExampleFunction + + ConcurrentWaitForConditionExampleFunction: + Description: Concurrent Wait For Condition Example Function ARN + Value: !GetAtt ConcurrentWaitForConditionExampleFunction.Arn + + ConcurrentWaitForConditionExampleFunctionName: + Description: Concurrent Wait For Condition Example Function Name + Value: !Ref ConcurrentWaitForConditionExampleFunction +