From 3f9d0cf101b42ce09ee3b97922e7bc5c689fb20d Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Thu, 18 Jun 2026 16:43:57 +0530 Subject: [PATCH] feat(conserver): add delay test link for chain concurrency QA (CON-624) A pass-through link that sleeps a configurable number of seconds so a chain takes predictable wall-clock time, letting QA observe per-worker vCon concurrency. Never reads or modifies the vCon. Includes unit tests and README. Co-Authored-By: Claude Opus 4.8 --- conserver/links/delay/README.md | 70 +++++++++++++++++++++++++++++ conserver/links/delay/__init__.py | 48 ++++++++++++++++++++ conserver/links/delay/test_delay.py | 36 +++++++++++++++ 3 files changed, 154 insertions(+) create mode 100644 conserver/links/delay/README.md create mode 100644 conserver/links/delay/__init__.py create mode 100644 conserver/links/delay/test_delay.py diff --git a/conserver/links/delay/README.md b/conserver/links/delay/README.md new file mode 100644 index 0000000..e8bbb58 --- /dev/null +++ b/conserver/links/delay/README.md @@ -0,0 +1,70 @@ +# Delay Link + +The Delay link is a lightweight **testing fixture**. It sleeps for a configurable +number of seconds and then passes the vCon through the chain unchanged. It does +not read or modify the vCon. + +Its purpose is to make a chain take a predictable amount of wall-clock time so +that concurrent processing behaviour can be observed during QA — for example, +verifying that per-worker in-flight concurrency (`CONSERVER_VCON_CONCURRENCY`) +actually processes multiple vCons in parallel rather than serially. + +> This link is intended for test and staging environments. It is not meant to +> run in a production chain. + +## Configuration Options + +```python +default_options = { + "seconds": 5 # how long to sleep before continuing the chain +} +``` + +### Options Description + +- `seconds`: The number of seconds to sleep before returning. Accepts an int or + float. Negative values are clamped to `0` (a warning is logged). + +## How It Works + +1. Merges the provided options over the defaults. +2. Logs the start of the delay. +3. Sleeps for `seconds`. +4. Logs that processing has resumed. +5. Returns the vCon UUID so the chain continues. + +## Usage in a Chain + +Insert the link anywhere in a chain to artificially widen its processing window: + +```yaml +links: + delay_10s: + module: links.delay + options: + seconds: 10 + +chains: + qa_concurrency_chain: + links: + - delay_10s + ingress_lists: + - qa_ingress +``` + +Enqueue several vCons and watch the logs / traces: with concurrency enabled the +delayed links should overlap in time instead of running back-to-back. + +## Dependencies + +- Custom utilities: + - logging_utils + +## Testing + +```bash +pytest conserver/links/delay/test_delay.py +``` + +The tests mock `time.sleep`, so they run instantly and assert the sleep duration +and pass-through behaviour (including default fallback and negative clamping). diff --git a/conserver/links/delay/__init__.py b/conserver/links/delay/__init__.py new file mode 100644 index 0000000..701d608 --- /dev/null +++ b/conserver/links/delay/__init__.py @@ -0,0 +1,48 @@ +from lib.logging_utils import init_logger +import time + +logger = init_logger(__name__) + +AUDIT_META = { + "third_party_service": "internal", + "policy_url": None, + "data_type": "none", + "transformation": "Delayed chain processing by sleeping (test fixture)", + "transformation_opts_key": "seconds", + "safe_opts_keys": ["seconds"], +} + +# Default: sleep 5 seconds. This link is a test fixture used to widen the +# per-vCon processing window so concurrent worker behaviour (see +# CONSERVER_VCON_CONCURRENCY) can be observed during QA. +default_options = {"seconds": 5} + + +def run(vcon_uuid, link_name, opts=default_options): + """Sleep for ``opts["seconds"]`` then pass the vCon through unchanged. + + Purely a testing aid: it makes a chain take a predictable amount of wall + time so parallel/concurrent processing can be exercised. The vCon itself + is never read or modified. Returns ``vcon_uuid`` so the chain continues. + """ + merged_opts = default_options.copy() + merged_opts.update(opts or {}) + + seconds = merged_opts["seconds"] + if seconds < 0: + logger.warning( + "delay link '%s': negative seconds (%s) for vCon %s — clamping to 0", + link_name, + seconds, + vcon_uuid, + ) + seconds = 0 + + logger.info( + "delay link '%s': sleeping %ss for vCon %s", link_name, seconds, vcon_uuid + ) + time.sleep(seconds) + logger.info( + "delay link '%s': resumed after %ss for vCon %s", link_name, seconds, vcon_uuid + ) + return vcon_uuid diff --git a/conserver/links/delay/test_delay.py b/conserver/links/delay/test_delay.py new file mode 100644 index 0000000..34a3502 --- /dev/null +++ b/conserver/links/delay/test_delay.py @@ -0,0 +1,36 @@ +from unittest.mock import patch + +from links.delay import default_options, run + + +@patch("links.delay.time.sleep") +def test_run_sleeps_default(mock_sleep): + result = run("test-uuid", "delay") + + assert result == "test-uuid" + mock_sleep.assert_called_once_with(default_options["seconds"]) + + +@patch("links.delay.time.sleep") +def test_run_sleeps_custom(mock_sleep): + result = run("test-uuid", "delay", opts={"seconds": 12}) + + assert result == "test-uuid" + mock_sleep.assert_called_once_with(12) + + +@patch("links.delay.time.sleep") +def test_partial_opts_keep_default_seconds(mock_sleep): + # An opts dict without "seconds" must fall back to the default, not KeyError. + result = run("test-uuid", "delay", opts={"unrelated": True}) + + assert result == "test-uuid" + mock_sleep.assert_called_once_with(default_options["seconds"]) + + +@patch("links.delay.time.sleep") +def test_negative_seconds_clamped_to_zero(mock_sleep): + result = run("test-uuid", "delay", opts={"seconds": -3}) + + assert result == "test-uuid" + mock_sleep.assert_called_once_with(0)