From 79bc0d7c2f969d5ac8d9fab52ae4862688bcfbfb Mon Sep 17 00:00:00 2001 From: mchain0 Date: Thu, 2 Apr 2026 12:55:22 +0200 Subject: [PATCH 1/4] cre-2803: http boundry blocker investigation --- .../wasm/host/execution_await_order_test.go | 98 +++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 pkg/workflows/wasm/host/execution_await_order_test.go diff --git a/pkg/workflows/wasm/host/execution_await_order_test.go b/pkg/workflows/wasm/host/execution_await_order_test.go new file mode 100644 index 0000000000..fc5b274ed4 --- /dev/null +++ b/pkg/workflows/wasm/host/execution_await_order_test.go @@ -0,0 +1,98 @@ +package host + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/emptypb" + + sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" +) + +// awaitOrderStub implements ExecutionHelper for testing awaitCapabilities ordering. +type awaitOrderStub struct { + unblock chan struct{} +} + +func (a *awaitOrderStub) CallCapability(_ context.Context, req *sdkpb.CapabilityRequest) (*sdkpb.CapabilityResponse, error) { + payload, err := anypb.New(&emptypb.Empty{}) + if err != nil { + return nil, err + } + ok := &sdkpb.CapabilityResponse{ + Response: &sdkpb.CapabilityResponse_Payload{ + Payload: payload, + }, + } + if req.CallbackId == 1 { + <-a.unblock + } + return ok, nil +} + +func (a *awaitOrderStub) GetSecrets(context.Context, *sdkpb.GetSecretsRequest) ([]*sdkpb.SecretResponse, error) { + return nil, nil +} + +func (a *awaitOrderStub) GetWorkflowExecutionID() string { return "" } + +func (a *awaitOrderStub) GetNodeTime() time.Time { return time.Time{} } + +func (a *awaitOrderStub) GetDONTime() (time.Time, error) { return time.Time{}, nil } + +func (a *awaitOrderStub) EmitUserLog(string) error { return nil } + +var _ ExecutionHelper = (*awaitOrderStub)(nil) + +// TestAwaitCapabilities_headOfLineBlocksOnEarlierID proves awaitCapabilities receives from +// callback channels in acr.Ids order: it cannot finish until an earlier ID completes, even when +// a later callback finishes first. +func TestAwaitCapabilities_headOfLineBlocksOnEarlierID(t *testing.T) { + t.Parallel() + + unblock := make(chan struct{}) + stub := &awaitOrderStub{unblock: unblock} + + exec := &execution[*sdkpb.ExecutionResult]{ + ctx: t.Context(), + capabilityResponses: make(map[int32]<-chan *sdkpb.CapabilityResponse), + executor: stub, + } + + req := func(id int32) *sdkpb.CapabilityRequest { + return &sdkpb.CapabilityRequest{CallbackId: id} + } + + require.NoError(t, exec.callCapAsync(t.Context(), req(1))) + require.NoError(t, exec.callCapAsync(t.Context(), req(2))) + + awaitDone := make(chan struct{}) + var awaitResp *sdkpb.AwaitCapabilitiesResponse + var awaitErr error + go func() { + awaitResp, awaitErr = exec.awaitCapabilities(t.Context(), &sdkpb.AwaitCapabilitiesRequest{Ids: []int32{1, 2}}) + close(awaitDone) + }() + + select { + case <-awaitDone: + t.Fatal("awaitCapabilities returned before callback 1 was unblocked; head-of-line invariant violated") + case <-time.After(200 * time.Millisecond): + } + + // Unblock callback 1 so the first channel receive in awaitCapabilities can complete. + close(unblock) + + select { + case <-awaitDone: + case <-time.After(2 * time.Second): + t.Fatal("awaitCapabilities did not complete after unblocking callback 1") + } + require.NoError(t, awaitErr) + require.Len(t, awaitResp.Responses, 2) + require.Contains(t, awaitResp.Responses, int32(1)) + require.Contains(t, awaitResp.Responses, int32(2)) +} From 5b4a641cd02558b49d0151e80df2b56cc8b9321f Mon Sep 17 00:00:00 2001 From: mchain0 Date: Wed, 8 Apr 2026 14:13:12 +0200 Subject: [PATCH 2/4] cre-2802: minor improvements --- .../wasm/host/execution_await_order_test.go | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/pkg/workflows/wasm/host/execution_await_order_test.go b/pkg/workflows/wasm/host/execution_await_order_test.go index fc5b274ed4..6650a06f7c 100644 --- a/pkg/workflows/wasm/host/execution_await_order_test.go +++ b/pkg/workflows/wasm/host/execution_await_order_test.go @@ -2,6 +2,7 @@ package host import ( "context" + "sync/atomic" "testing" "time" @@ -10,6 +11,7 @@ import ( "google.golang.org/protobuf/types/known/emptypb" sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" + wfpb "github.com/smartcontractkit/chainlink-protos/workflows/go/v2" ) // awaitOrderStub implements ExecutionHelper for testing awaitCapabilities ordering. @@ -45,6 +47,8 @@ func (a *awaitOrderStub) GetDONTime() (time.Time, error) { return time.Time{}, n func (a *awaitOrderStub) EmitUserLog(string) error { return nil } +func (a *awaitOrderStub) EmitUserMetric(context.Context, *wfpb.WorkflowUserMetric) error { return nil } + var _ ExecutionHelper = (*awaitOrderStub)(nil) // TestAwaitCapabilities_headOfLineBlocksOnEarlierID proves awaitCapabilities receives from @@ -69,28 +73,22 @@ func TestAwaitCapabilities_headOfLineBlocksOnEarlierID(t *testing.T) { require.NoError(t, exec.callCapAsync(t.Context(), req(1))) require.NoError(t, exec.callCapAsync(t.Context(), req(2))) - awaitDone := make(chan struct{}) + var awaitFinished atomic.Bool var awaitResp *sdkpb.AwaitCapabilitiesResponse var awaitErr error go func() { awaitResp, awaitErr = exec.awaitCapabilities(t.Context(), &sdkpb.AwaitCapabilitiesRequest{Ids: []int32{1, 2}}) - close(awaitDone) + awaitFinished.Store(true) }() - select { - case <-awaitDone: - t.Fatal("awaitCapabilities returned before callback 1 was unblocked; head-of-line invariant violated") - case <-time.After(200 * time.Millisecond): - } + time.Sleep(200 * time.Millisecond) + require.False(t, awaitFinished.Load(), "awaitCapabilities returned before callback 1 was unblocked; head-of-line invariant violated") // Unblock callback 1 so the first channel receive in awaitCapabilities can complete. close(unblock) - select { - case <-awaitDone: - case <-time.After(2 * time.Second): - t.Fatal("awaitCapabilities did not complete after unblocking callback 1") - } + require.Eventually(t, func() bool { return awaitFinished.Load() }, 2*time.Second, 10*time.Millisecond, + "awaitCapabilities did not complete after unblocking callback 1") require.NoError(t, awaitErr) require.Len(t, awaitResp.Responses, 2) require.Contains(t, awaitResp.Responses, int32(1)) From d64c37e189d606897929a007449e46a037a20d65 Mon Sep 17 00:00:00 2001 From: mchain0 Date: Wed, 8 Apr 2026 14:16:16 +0200 Subject: [PATCH 3/4] cre-2802: minor fixes --- .../wasm/host/execution_await_order_test.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/pkg/workflows/wasm/host/execution_await_order_test.go b/pkg/workflows/wasm/host/execution_await_order_test.go index 6650a06f7c..682234860a 100644 --- a/pkg/workflows/wasm/host/execution_await_order_test.go +++ b/pkg/workflows/wasm/host/execution_await_order_test.go @@ -17,6 +17,7 @@ import ( // awaitOrderStub implements ExecutionHelper for testing awaitCapabilities ordering. type awaitOrderStub struct { unblock chan struct{} + id2Done chan struct{} } func (a *awaitOrderStub) CallCapability(_ context.Context, req *sdkpb.CapabilityRequest) (*sdkpb.CapabilityResponse, error) { @@ -32,6 +33,9 @@ func (a *awaitOrderStub) CallCapability(_ context.Context, req *sdkpb.Capability if req.CallbackId == 1 { <-a.unblock } + if req.CallbackId == 2 && a.id2Done != nil { + close(a.id2Done) + } return ok, nil } @@ -58,7 +62,8 @@ func TestAwaitCapabilities_headOfLineBlocksOnEarlierID(t *testing.T) { t.Parallel() unblock := make(chan struct{}) - stub := &awaitOrderStub{unblock: unblock} + id2Done := make(chan struct{}) + stub := &awaitOrderStub{unblock: unblock, id2Done: id2Done} exec := &execution[*sdkpb.ExecutionResult]{ ctx: t.Context(), @@ -81,7 +86,11 @@ func TestAwaitCapabilities_headOfLineBlocksOnEarlierID(t *testing.T) { awaitFinished.Store(true) }() - time.Sleep(200 * time.Millisecond) + select { + case <-id2Done: + case <-time.After(2 * time.Second): + t.Fatal("CallCapability for callback 2 did not return while callback 1 was still blocked") + } require.False(t, awaitFinished.Load(), "awaitCapabilities returned before callback 1 was unblocked; head-of-line invariant violated") // Unblock callback 1 so the first channel receive in awaitCapabilities can complete. From e6e9575f1dfb85ef59ae2afa7ac006fa31d2c405 Mon Sep 17 00:00:00 2001 From: mchain0 Date: Wed, 8 Apr 2026 16:00:08 +0200 Subject: [PATCH 4/4] cre-2802: assert improvements --- pkg/workflows/wasm/host/execution_await_order_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/workflows/wasm/host/execution_await_order_test.go b/pkg/workflows/wasm/host/execution_await_order_test.go index 682234860a..29084302a2 100644 --- a/pkg/workflows/wasm/host/execution_await_order_test.go +++ b/pkg/workflows/wasm/host/execution_await_order_test.go @@ -55,9 +55,10 @@ func (a *awaitOrderStub) EmitUserMetric(context.Context, *wfpb.WorkflowUserMetri var _ ExecutionHelper = (*awaitOrderStub)(nil) -// TestAwaitCapabilities_headOfLineBlocksOnEarlierID proves awaitCapabilities receives from -// callback channels in acr.Ids order: it cannot finish until an earlier ID completes, even when -// a later callback finishes first. +// TestAwaitCapabilities_headOfLineBlocksOnEarlierID proves awaitCapabilities reads from +// callback channels in acr.Ids order (head-of-line): it cannot finish until an earlier ID +// completes, even when a later callback finishes first. AwaitCapabilitiesResponse.Responses is a +// map keyed by CallbackId, not a slice ordered by completion time. func TestAwaitCapabilities_headOfLineBlocksOnEarlierID(t *testing.T) { t.Parallel() @@ -100,6 +101,6 @@ func TestAwaitCapabilities_headOfLineBlocksOnEarlierID(t *testing.T) { "awaitCapabilities did not complete after unblocking callback 1") require.NoError(t, awaitErr) require.Len(t, awaitResp.Responses, 2) - require.Contains(t, awaitResp.Responses, int32(1)) - require.Contains(t, awaitResp.Responses, int32(2)) + require.NotNil(t, awaitResp.Responses[1], "expected entry keyed by CallbackId 1 (map is by id, not completion order)") + require.NotNil(t, awaitResp.Responses[2], "expected entry keyed by CallbackId 2") }