Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions conserver/links/delay/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Delay Link

The Delay link is a lightweight **testing fixture**. It sleeps for a configurable
number of seconds and then passes the vCon through the chain unchanged. It does
not read or modify the vCon.

Its purpose is to make a chain take a predictable amount of wall-clock time so
that concurrent processing behaviour can be observed during QA — for example,
verifying that per-worker in-flight concurrency (`CONSERVER_VCON_CONCURRENCY`)
actually processes multiple vCons in parallel rather than serially.

> This link is intended for test and staging environments. It is not meant to
> run in a production chain.

## Configuration Options

```python
default_options = {
"seconds": 5 # how long to sleep before continuing the chain
}
```

### Options Description

- `seconds`: The number of seconds to sleep before returning. Accepts an int or
float. Negative values are clamped to `0` (a warning is logged).

## How It Works

1. Merges the provided options over the defaults.
2. Logs the start of the delay.
3. Sleeps for `seconds`.
4. Logs that processing has resumed.
5. Returns the vCon UUID so the chain continues.

## Usage in a Chain

Insert the link anywhere in a chain to artificially widen its processing window:

```yaml
links:
delay_10s:
module: links.delay
options:
seconds: 10

chains:
qa_concurrency_chain:
links:
- delay_10s
ingress_lists:
- qa_ingress
```

Enqueue several vCons and watch the logs / traces: with concurrency enabled the
delayed links should overlap in time instead of running back-to-back.

## Dependencies

- Custom utilities:
- logging_utils

## Testing

```bash
pytest conserver/links/delay/test_delay.py
```

The tests mock `time.sleep`, so they run instantly and assert the sleep duration
and pass-through behaviour (including default fallback and negative clamping).
48 changes: 48 additions & 0 deletions conserver/links/delay/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from lib.logging_utils import init_logger
import time

logger = init_logger(__name__)

AUDIT_META = {
"third_party_service": "internal",
"policy_url": None,
"data_type": "none",
"transformation": "Delayed chain processing by sleeping (test fixture)",
"transformation_opts_key": "seconds",
"safe_opts_keys": ["seconds"],
}

# Default: sleep 5 seconds. This link is a test fixture used to widen the
# per-vCon processing window so concurrent worker behaviour (see
# CONSERVER_VCON_CONCURRENCY) can be observed during QA.
default_options = {"seconds": 5}


def run(vcon_uuid, link_name, opts=default_options):
"""Sleep for ``opts["seconds"]`` then pass the vCon through unchanged.

Purely a testing aid: it makes a chain take a predictable amount of wall
time so parallel/concurrent processing can be exercised. The vCon itself
is never read or modified. Returns ``vcon_uuid`` so the chain continues.
"""
merged_opts = default_options.copy()
merged_opts.update(opts or {})

seconds = merged_opts["seconds"]
if seconds < 0:
logger.warning(
"delay link '%s': negative seconds (%s) for vCon %s — clamping to 0",
link_name,
seconds,
vcon_uuid,
)
seconds = 0

logger.info(
"delay link '%s': sleeping %ss for vCon %s", link_name, seconds, vcon_uuid
)
time.sleep(seconds)
logger.info(
"delay link '%s': resumed after %ss for vCon %s", link_name, seconds, vcon_uuid
)
return vcon_uuid
36 changes: 36 additions & 0 deletions conserver/links/delay/test_delay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from unittest.mock import patch

from links.delay import default_options, run


@patch("links.delay.time.sleep")
def test_run_sleeps_default(mock_sleep):
result = run("test-uuid", "delay")

assert result == "test-uuid"
mock_sleep.assert_called_once_with(default_options["seconds"])


@patch("links.delay.time.sleep")
def test_run_sleeps_custom(mock_sleep):
result = run("test-uuid", "delay", opts={"seconds": 12})

assert result == "test-uuid"
mock_sleep.assert_called_once_with(12)


@patch("links.delay.time.sleep")
def test_partial_opts_keep_default_seconds(mock_sleep):
# An opts dict without "seconds" must fall back to the default, not KeyError.
result = run("test-uuid", "delay", opts={"unrelated": True})

assert result == "test-uuid"
mock_sleep.assert_called_once_with(default_options["seconds"])


@patch("links.delay.time.sleep")
def test_negative_seconds_clamped_to_zero(mock_sleep):
result = run("test-uuid", "delay", opts={"seconds": -3})

assert result == "test-uuid"
mock_sleep.assert_called_once_with(0)