Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.examples.wait;

import java.util.stream.IntStream;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.config.MapConfig;
import software.amazon.lambda.durable.config.WaitForConditionConfig;
import software.amazon.lambda.durable.model.WaitForConditionResult;

/**
* Example demonstrating concurrent waitForCondition operations using map.
*
* <p>Runs many (totalOperations) waitForCondition operations concurrently (maxConcurrency). Each operation:
*
* <ol>
* <li>Uses attempt count as state (replay-safe).
* <li>Fails and retries until the attempt count reaches the given threshold, and then succeeds
* </ol>
*/
public class ConcurrentWaitForConditionExample extends DurableHandler<ConcurrentWaitForConditionExample.Input, String> {

public record Input(int threshold, int totalOperations, int maxConcurrency) {}

@Override
public String handleRequest(Input input, DurableContext context) {
var items = IntStream.range(0, input.totalOperations()).boxed().toList();

var config = MapConfig.builder().maxConcurrency(input.maxConcurrency()).build();

var result = context.map(
"concurrent-wait-for-conditions",
items,
String.class,
(item, index, ctx) -> {
var conditionConfig = WaitForConditionConfig.<Integer>builder()
.initialState(1)
.build();
// Poll until the counter reaches the input threshold
var count = ctx.waitForCondition(
"condition-" + index,
Integer.class,
(callCount, stepCtx) -> {
if (callCount >= input.threshold()) {
return WaitForConditionResult.stopPolling(callCount);
}
return WaitForConditionResult.continuePolling(callCount + 1);
},
conditionConfig);
return String.valueOf(count);
},
config);

return String.join(" | ", result.results());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,29 @@
/**
* Example demonstrating the waitForCondition operation.
*
* <p>This example simulates waiting for an order to ship, by repeatedly calling a check function.
* <p>This handler polls a condition function until it signals completion:
*
* <ol>
* <li>The attempt count is used as a state (replay safe)
* <li>Fails and retries until the attempt count reaches the given threshold, and then succeeds
* </ol>
*/
public class WaitForConditionExample extends DurableHandler<Integer, Integer> {

@Override
public Integer handleRequest(Integer input, DurableContext context) {
// Poll the shipment status until the order is shipped.
// The check function simulates an order shipment (0 -> 1 -> 2 -> 3 -> 4)
public Integer handleRequest(Integer threshold, DurableContext context) {
// Poll until the counter reaches the input threshold
return context.waitForCondition(
"wait-for-shipment",
"wait-for-condition",
Integer.class,
(callCount, stepCtx) -> {
// Simulate checking shipment status from an external service
if (callCount >= 3) {
// Order has shipped — stop polling
return WaitForConditionResult.stopPolling(callCount + 1);
if (callCount >= threshold) {
// Condition met, stop polling
return WaitForConditionResult.stopPolling(callCount);
}
// Order still processing — continue polling
// Condition not met, keep polling
return WaitForConditionResult.continuePolling(callCount + 1);
},
WaitForConditionConfig.<Integer>builder().initialState(1).build()); // Order pending - initial status
WaitForConditionConfig.<Integer>builder().initialState(1).build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.junit.jupiter.api.Assertions.*;
import static software.amazon.lambda.durable.TypeToken.get;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -25,6 +26,7 @@
import software.amazon.lambda.durable.examples.step.ManyAsyncStepsExample;
import software.amazon.lambda.durable.examples.types.ApprovalRequest;
import software.amazon.lambda.durable.examples.types.GreetingRequest;
import software.amazon.lambda.durable.examples.wait.ConcurrentWaitForConditionExample;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.CloudDurableTestRunner;

Expand Down Expand Up @@ -594,4 +596,45 @@ void testComplexMapExample() {
assertTrue(output.contains("healthy"));
assertTrue(output.contains("reason=MIN_SUCCESSFUL_REACHED"));
}

@Test
void testWaitForConditionExample() {
var runner = CloudDurableTestRunner.create(
arn("wait-for-condition-example"), Integer.class, Integer.class, lambdaClient);
var result = runner.run(3);

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals(3, result.getResult(Integer.class));
}

@Test
void testConcurrentWaitForConditionExample() {
var runner = CloudDurableTestRunner.create(
arn("concurrent-wait-for-condition-example"),
ConcurrentWaitForConditionExample.Input.class,
String.class,
lambdaClient);
var result = runner.run(new ConcurrentWaitForConditionExample.Input(3, 100, 50));

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());

// Verify each operation finished with 3 attempts
var allOperationsOutput = result.getResult(String.class);
var operationOutputs = allOperationsOutput.split(" \\| ");
assertEquals(100, operationOutputs.length);
for (var operationOutput : operationOutputs) {
assertEquals("3", operationOutput);
}

// Verify each waitForCondition operation completes in under 30 seconds
var waitForConditionOps = result.getOperations().stream()
.filter(op -> "WaitForCondition".equals(op.getSubtype()))
.toList();
for (var waitForConditionResult : waitForConditionOps) {
assertTrue(
waitForConditionResult.getDuration().compareTo(Duration.ofSeconds(30)) < 0,
"waitForCondition operation took "
+ waitForConditionResult.getDuration().toSeconds() + "s, expected < 30s");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.examples.wait;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

class ConcurrentWaitForConditionExampleTest {

@Test
void testConcurrentWaitForConditionExample() {
var handler = new ConcurrentWaitForConditionExample();
var runner = LocalDurableTestRunner.create(ConcurrentWaitForConditionExample.Input.class, handler);

var result = runner.runUntilComplete(new ConcurrentWaitForConditionExample.Input(3, 100, 50));

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());

var allOperationsOutput = result.getResult(String.class);
var operationResults = allOperationsOutput.split(" \\| ");
assertEquals(100, operationResults.length);
for (var operationResult : operationResults) {
assertEquals("3", operationResult);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we assert the duration of waitForCondition operations?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is local runner, never mind. We can assert the duration only in our cloud tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make an update to this PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added duration check for each operation in the cloud test

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ void testWaitForConditionExample() {
var handler = new WaitForConditionExample();
var runner = LocalDurableTestRunner.create(Integer.class, handler);

var result = runner.runUntilComplete(123);
var result = runner.runUntilComplete(3);

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals(4, result.getResult(Integer.class));
assertEquals(3, result.getResult(Integer.class));
}
}
66 changes: 66 additions & 0 deletions examples/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,56 @@ Resources:
DockerContext: ../
DockerTag: durable-examples

WaitForConditionExampleFunction:
Type: AWS::Serverless::Function
Properties:
PackageType: Image
FunctionName: !Join
- ''
- - 'wait-for-condition-example'
- !Ref FunctionNameSuffix
ImageConfig:
Command: ["software.amazon.lambda.durable.examples.wait.WaitForConditionExample::handleRequest"]
DurableConfig:
ExecutionTimeout: 300
RetentionPeriodInDays: 7
Policies:
- Statement:
- Effect: Allow
Action:
- lambda:CheckpointDurableExecutions
- lambda:GetDurableExecutionState
Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:wait-for-condition-example${FunctionNameSuffix}"
Metadata:
Dockerfile: !Ref DockerFile
DockerContext: ../
DockerTag: durable-examples

ConcurrentWaitForConditionExampleFunction:
Type: AWS::Serverless::Function
Properties:
PackageType: Image
FunctionName: !Join
- ''
- - 'concurrent-wait-for-condition-example'
- !Ref FunctionNameSuffix
ImageConfig:
Command: ["software.amazon.lambda.durable.examples.wait.ConcurrentWaitForConditionExample::handleRequest"]
DurableConfig:
ExecutionTimeout: 300
RetentionPeriodInDays: 7
Policies:
- Statement:
- Effect: Allow
Action:
- lambda:CheckpointDurableExecutions
- lambda:GetDurableExecutionState
Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:concurrent-wait-for-condition-example${FunctionNameSuffix}"
Metadata:
Dockerfile: !Ref DockerFile
DockerContext: ../
DockerTag: durable-examples

Outputs:
NoopExampleFunction:
Description: Noop Example Function ARN
Expand Down Expand Up @@ -679,3 +729,19 @@ Outputs:
Description: Complex Map Example Function Name
Value: !Ref ComplexMapExampleFunction

WaitForConditionExampleFunction:
Description: Wait For Condition Example Function ARN
Value: !GetAtt WaitForConditionExampleFunction.Arn

WaitForConditionExampleFunctionName:
Description: Wait For Condition Example Function Name
Value: !Ref WaitForConditionExampleFunction

ConcurrentWaitForConditionExampleFunction:
Description: Concurrent Wait For Condition Example Function ARN
Value: !GetAtt ConcurrentWaitForConditionExampleFunction.Arn

ConcurrentWaitForConditionExampleFunctionName:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt if all these outputs are useful

Description: Concurrent Wait For Condition Example Function Name
Value: !Ref ConcurrentWaitForConditionExampleFunction

Loading