From affac5e190b87c16494e3d986699b2f96837afce Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 30 May 2026 22:53:37 +0000 Subject: [PATCH 1/2] docs: add ChainWeaver + evaluation-artifact integration cookbooks (#95, #96) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add two more ecosystem integration cookbooks under docs/integrations/, each with a runnable, offline companion wired into `make ci`. Follows the pattern established by the contextweaver/repository-check cookbooks. #95 — ChainWeaver compiled flows as capabilities: a ChainWeaverDriver wraps a compiled flow behind the Driver protocol so the flow runs through the normal policy/audit pipeline and produces a kernel-visible ActionTrace. A flow-step failure is translated into a DriverError that preserves the flow id and the failing step, so the orchestration context survives for the caller and the audit trail. ChainWeaver stays an optional dependency (the driver only needs a run(inputs) method and a flow_id), so the example ships tiny CompiledFlow / FlowExecutionError stand-ins and depends on no ChainWeaver package. New docs/integrations/chainweaver.md, examples/chainweaver_flow.py, and tests/test_chainweaver_flow.py. #96 — policy guardrails for statistical evaluation artifacts: a generic, producer-agnostic assess_artifact() layer lets an agent summarize an evaluation artifact while gating deployment/rollout recommendations on its support diagnostics. The gate is multi-signal (support_health, decision_stable, warnings, recommendation.intent) — a good point estimate with weak support is still blocked — and an unknown/missing support_health normalises to the safest state. Denied actions downgrade to a manual-review recommendation whose reason is recorded in ActionTrace.args. No statistical estimation is added and no producer dependency is taken; artifacts are fixtures. New docs/integrations/evaluation_artifacts.md, examples/evaluation_artifact_policy.py, and tests/test_evaluation_artifact_policy.py (covering ok/caution/high_risk). Wiring: both examples added to the Makefile `example` target; README and docs/integrations.md link the new pages; CHANGELOG updated. make ci passes (fmt-check, lint, mypy strict, 580 passed / 1 skipped, examples run). https://claude.ai/code/session_013hGyqqjAquhtSZXeYPkAuU --- CHANGELOG.md | 18 ++ Makefile | 2 + README.md | 2 + docs/integrations.md | 11 + docs/integrations/chainweaver.md | 104 +++++++ docs/integrations/evaluation_artifacts.md | 104 +++++++ examples/chainweaver_flow.py | 264 +++++++++++++++++ examples/evaluation_artifact_policy.py | 343 ++++++++++++++++++++++ tests/test_chainweaver_flow.py | 74 +++++ tests/test_evaluation_artifact_policy.py | 144 +++++++++ 10 files changed, 1066 insertions(+) create mode 100644 docs/integrations/chainweaver.md create mode 100644 docs/integrations/evaluation_artifacts.md create mode 100644 examples/chainweaver_flow.py create mode 100644 examples/evaluation_artifact_policy.py create mode 100644 tests/test_chainweaver_flow.py create mode 100644 tests/test_evaluation_artifact_policy.py diff --git a/CHANGELOG.md b/CHANGELOG.md index fea2a5c..634a326 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- Two more ecosystem integration cookbooks under `docs/integrations/`, each with + a runnable, offline companion wired into `make ci`: + - **ChainWeaver compiled flows as capabilities** (#95): a `ChainWeaverDriver` + wraps a compiled flow behind the `Driver` protocol so the flow runs through + the normal policy/audit pipeline and produces a kernel-visible `ActionTrace`. + A flow-step failure is translated into a `DriverError` that preserves the + flow id and failing step. ChainWeaver stays an optional dependency. New + [`docs/integrations/chainweaver.md`](docs/integrations/chainweaver.md) and + [`examples/chainweaver_flow.py`](examples/chainweaver_flow.py). + - **Policy guardrails for statistical evaluation artifacts** (#96): a generic, + producer-agnostic `assess_artifact()` layer lets an agent summarize an + evaluation artifact while gating deployment/rollout recommendations on its + support diagnostics (multi-signal: `support_health`, `decision_stable`, + `warnings`, `recommendation.intent`). Denied actions are downgraded to a + manual-review recommendation whose reason is recorded in `ActionTrace.args`. + No statistical estimation is added and no producer dependency is taken. New + [`docs/integrations/evaluation_artifacts.md`](docs/integrations/evaluation_artifacts.md) + and [`examples/evaluation_artifact_policy.py`](examples/evaluation_artifact_policy.py). - `ActionTrace.result_summary` (#93): successful invocations now record a redaction-safe summary of the firewalled `Frame` (`fact_count`, `row_count`, `warning_count`, `has_handle` — counts/flags only, never raw driver data), so diff --git a/Makefile b/Makefile index b4886a2..b014013 100644 --- a/Makefile +++ b/Makefile @@ -23,5 +23,7 @@ example: python examples/readme_quickstart.py python examples/contextweaver_policy_flow.py python examples/repository_safety_check.py + python examples/chainweaver_flow.py + python examples/evaluation_artifact_policy.py ci: fmt-check lint type test example diff --git a/README.md b/README.md index b8d2d09..5f983b7 100644 --- a/README.md +++ b/README.md @@ -189,6 +189,8 @@ See [docs/agent-context/invariants.md](docs/agent-context/invariants.md) for the - [Integrations (MCP, HTTPDriver)](docs/integrations.md) - [contextweaver: policy before action](docs/integrations/contextweaver.md) - [Repository safety checks as a capability](docs/integrations/repository_safety_check.md) + - [ChainWeaver compiled flows as capabilities](docs/integrations/chainweaver.md) + - [Policy guardrails for evaluation artifacts](docs/integrations/evaluation_artifacts.md) - [Designing capabilities](docs/capabilities.md) - [Context Firewall](docs/context_firewall.md) diff --git a/docs/integrations.md b/docs/integrations.md index dfbc8cd..a80f5af 100644 --- a/docs/integrations.md +++ b/docs/integrations.md @@ -377,3 +377,14 @@ projects and external checkers. Each has a runnable, offline companion under — gate a high-impact action behind a deterministic check that shells out to a local command (e.g. VibeGuard), with the result recorded in the audit trace. Companion: [`examples/repository_safety_check.py`](../examples/repository_safety_check.py). +- [ChainWeaver compiled flows as policy-controlled capabilities](integrations/chainweaver.md) + — wrap a ChainWeaver compiled flow behind the `Driver` protocol so it runs + through the normal policy/audit pipeline; step failures surface as a + `DriverError` that preserves the flow id and failing step. ChainWeaver stays + an optional dependency. + Companion: [`examples/chainweaver_flow.py`](../examples/chainweaver_flow.py). +- [Policy guardrails for statistical evaluation artifacts](integrations/evaluation_artifacts.md) + — let an agent summarize an evaluation artifact while gating deployment/rollout + recommendations on its support diagnostics; the downgrade reason is recorded in + the audit trace. Producer-agnostic; no statistical estimation in agent-kernel. + Companion: [`examples/evaluation_artifact_policy.py`](../examples/evaluation_artifact_policy.py). diff --git a/docs/integrations/chainweaver.md b/docs/integrations/chainweaver.md new file mode 100644 index 0000000..8400ff3 --- /dev/null +++ b/docs/integrations/chainweaver.md @@ -0,0 +1,104 @@ +# ChainWeaver compiled flows as policy-controlled capabilities + +[ChainWeaver](https://github.com/dgenio/ChainWeaver) is the Weaver-ecosystem +orchestration layer: it *compiles* a multi-step flow that an agent can run. +agent-kernel owns authorization, execution, and audit. Wrapping a compiled flow +as a capability means the flow runs through the normal pipeline +(policy → token → invoke → firewall → trace) instead of as an out-of-band side +channel — so a flow invocation is policy-checked and auditable like any other +capability (weaver-spec **I-02**). + +This page describes the pattern. The runnable companion is +[`examples/chainweaver_flow.py`](../../examples/chainweaver_flow.py), which is +deterministic, offline, and depends on no ChainWeaver package. + +> ChainWeaver is **not** a required dependency. The adapter only relies on a +> compiled flow exposing a `run(inputs)` method and a `flow_id` attribute. The +> example ships tiny `CompiledFlow` / `FlowExecutionError` stand-ins so it runs +> in CI; in production you pass a real compiled flow to `ChainWeaverDriver`. + +## The pattern + +``` +agent invokes flows.summarize_release + │ + ▼ +ChainWeaverDriver.execute() → compiled_flow.run(inputs) + │ │ + │ ├─ all steps ok → RawResult → Frame + ActionTrace + │ └─ step raises → FlowExecutionError + ▼ │ + DriverError (flow id + failing step preserved) ◄───────────┘ +``` + +| Component | Role | +|---|---| +| `CompiledFlow` | A ChainWeaver compiled flow: ordered, named steps run over a shared context. | +| `ChainWeaverDriver` | Implements the `Driver` protocol; maps a capability operation to a flow and runs it. | +| `flows.summarize_release` | A `READ` capability whose implementation is the compiled flow. | + +## The adapter + +`ChainWeaverDriver` implements the `Driver` protocol and runs the flow bound to +the capability's operation: + +```python +class ChainWeaverDriver: + def __init__(self, flows: dict[str, CompiledFlow], *, driver_id: str = "chainweaver") -> None: + self._flows = dict(flows) + self._driver_id = driver_id + + @property + def driver_id(self) -> str: + return self._driver_id + + async def execute(self, ctx: ExecutionContext) -> RawResult: + operation = str(ctx.args.get("operation", ctx.capability_id)) + flow = self._flows.get(operation) + if flow is None: + raise DriverError(f"... no flow for operation='{operation}'.") + inputs = {k: v for k, v in ctx.args.items() if k != "operation"} + try: + output = flow.run(inputs) + except FlowExecutionError as exc: + raise DriverError( + f"ChainWeaver flow '{exc.flow_id}' failed at step '{exc.step}': {exc.cause}" + ) from exc + return RawResult(capability_id=ctx.capability_id, data=output, + metadata={"flow_id": flow.flow_id, "steps": [s.name for s in flow.steps]}) +``` + +## Errors preserve ChainWeaver context + +A real ChainWeaver flow raises its own exception when a step fails. The adapter +**translates** that native error into a kernel `DriverError`, carrying the flow +id and the failing step into the message rather than leaking a raw traceback. +`Kernel.invoke()` then wraps it as +`All drivers failed for capability '...'. Last error: ChainWeaver flow '' +failed at step '': `, so the orchestration context survives for the +caller — and a failed run still records an `ActionTrace` (with `error` set), so +I-02 holds even on failure. + +## Audit trail + +A successful invocation is recorded like any capability: + +```python +action_id = await run_flow(kernel, principal, {"release": "v1.4.0", "changes": [...]}) +trace = kernel.explain(action_id) +# trace.driver_id == "chainweaver" +# trace.result_summary == {"fact_count": ..., "row_count": ..., "warning_count": ..., "has_handle": ...} +``` + +## Non-goals + +- agent-kernel does not compile flows — that is ChainWeaver's job. +- ChainWeaver is never a required dependency. +- Wrapping a flow does not bypass policy: the flow capability is granted and + invoked through the same pipeline as every other capability. + +## Related + +- `examples/chainweaver_flow.py` — runnable, offline. +- [ChainWeaver](https://github.com/dgenio/ChainWeaver) +- [weaver-spec](https://github.com/dgenio/weaver-spec) diff --git a/docs/integrations/evaluation_artifacts.md b/docs/integrations/evaluation_artifacts.md new file mode 100644 index 0000000..5f82358 --- /dev/null +++ b/docs/integrations/evaluation_artifacts.md @@ -0,0 +1,104 @@ +# Policy guardrails for statistical evaluation artifacts + +Agents increasingly consume *evaluation artifacts* — structured reports such as +an offline policy-evaluation result (e.g. from +[`skdr-eval`](https://github.com/dgenio/skdr-eval)). These artifacts are easy to +misuse: an agent that reads a favorable headline estimate and recommends +deployment, while ignoring the support diagnostics, uncertainty, and warnings, +turns a *caveated* result into an *unconditional* action. + +agent-kernel already separates **reading** from **acting** through safety +classes. This page adds a small, generic policy layer on top: an agent may +always *summarize* an artifact (with caveats), but recommending **deployment** +or **automatic rollout** is gated on the artifact's diagnostics. + +The runnable companion is +[`examples/evaluation_artifact_policy.py`](../../examples/evaluation_artifact_policy.py), +which is deterministic, offline, and uses fixture artifacts. + +> agent-kernel does **not** implement offline policy evaluation or any +> statistical estimation, and takes no dependency on a specific producer. The +> policy reads documented fields off a plain dict artifact, so it works for any +> producer — not just `skdr-eval`. + +## Summarizing evidence vs. acting on evidence + +This is the distinction the guardrail enforces: + +| Action | Capability | Safety class | Gated? | +|---|---|---|---| +| Summarize the artifact and its caveats | `eval.summarize_artifact` | `READ` | No — always allowed. | +| Recommend deployment / rollout | `eval.recommend_deployment` | `WRITE` | Yes — only when diagnostics are healthy. | +| Recommend manual review / better logs | `eval.recommend_manual_review` | `WRITE` | The downgrade target when deployment is denied. | + +Summarizing a high-risk result is fine — it informs the human. Recommending +deployment *as if the result were reliable* is what the policy blocks. + +## The generic assessment + +`assess_artifact(artifact)` is producer-agnostic. It inspects documented fields +and returns stable decision codes: + +```python +decision = assess_artifact(artifact) +# decision.allowed_actions → e.g. ("allow_summary", "allow_manual_review_recommendation", ...) +# decision.denied_actions → e.g. ("deny_deployment_recommendation", "deny_automatic_rollout") +# decision.reasons → e.g. ("support_health=high_risk", "decision is not stable") +# decision.allows_deployment → bool gate the host branches on +``` + +Fields inspected (all optional; missing fields default to the safest reading): + +| Field | Meaning | +|---|---| +| `support_health` | `"ok"` / `"caution"` / `"high_risk"`. | +| `decision_stable` | Whether the comparison is robust to reasonable perturbation. | +| `warnings` | Producer warnings (e.g. low ESS, poor overlap). | +| `recommendation.intent` | The artifact's own steer (`"deploy"`, `"hold"`, …). | +| `uncertainty` / `limitations` | Surfaced as caveats in the summary. | + +Deployment is permitted **only** when several signals agree: `support_health` +is `ok`, the decision is stable, there are no warnings, and the artifact does +not itself recommend holding. This is deliberately *not* a single-metric gate — +a good point estimate with poor support is still blocked. + +| `support_health` | Deployment | Outcome | +|---|---|---| +| `ok` (stable, no warnings) | allowed | `allow_summary` + deployment recommendation | +| `caution` | denied | downgraded to manual-review recommendation | +| `high_risk` | denied | downgraded + `require_human_review` | + +## Audit trail records why + +When deployment is denied, the host does not grant `eval.recommend_deployment`; +instead it invokes `eval.recommend_manual_review` with the reasons in the call +args. Because `ActionTrace.args` is preserved for non-memory capabilities, the +audit trace records *why* the action was downgraded: + +```python +capability_id, action_id = await act_on_artifact(kernel, principal, artifact) +trace = kernel.explain(action_id) +# capability_id == "eval.recommend_manual_review" +# trace.args["reason"] == "support_health=high_risk; decision is not stable; 2 warning(s): ..." +# trace.args["downgraded_from"] == "recommend_deployment" +``` + +## Non-goals + +- No OPE / statistical estimation in agent-kernel. +- No hard dependency on `skdr-eval` or any producer. +- No decision based on a single numeric metric. + +## Aligning with `weaver-spec` + +If `weaver-spec` publishes a formal `EvaluationArtifact` contract, the field +names read by `assess_artifact` should be aligned to it; the decision codes here +(`allow_summary`, `allow_manual_review_recommendation`, `require_human_review`, +`deny_deployment_recommendation`, `deny_automatic_rollout`) are intended to be a +stable, producer-neutral vocabulary in the meantime. + +## Related + +- `examples/evaluation_artifact_policy.py` — runnable, offline. +- [skdr-eval](https://github.com/dgenio/skdr-eval) +- [weaver-spec](https://github.com/dgenio/weaver-spec) diff --git a/examples/chainweaver_flow.py b/examples/chainweaver_flow.py new file mode 100644 index 0000000..00040ab --- /dev/null +++ b/examples/chainweaver_flow.py @@ -0,0 +1,264 @@ +"""chainweaver_flow.py — wrap a ChainWeaver compiled flow as a capability. + +The written walkthrough lives in ``docs/integrations/chainweaver.md``. This +script is the runnable companion. It shows the Weaver-ecosystem pattern for +running a *ChainWeaver* compiled flow (an orchestration of ordered steps) +through the normal agent-kernel pipeline so the flow becomes a policy-checked, +audited capability rather than an out-of-band side channel: + + 1. A compiled flow is wrapped behind a ``ChainWeaverDriver``. The driver + implements the ``Driver`` protocol and runs the flow when the capability + is invoked. + 2. Running the wrapped flow through ``Kernel.invoke()`` produces a + kernel-visible execution record (an ``ActionTrace``) exactly like any + other capability — so I-02 (every execution is audited) holds. + 3. When a flow step fails, the driver translates ChainWeaver's native + ``FlowExecutionError`` into a kernel ``DriverError`` that *preserves* the + flow id and the failing step, so the audit trail and the caller keep the + orchestration context. + +``chainweaver`` is **not** a dependency of this example. ``CompiledFlow`` and +``FlowExecutionError`` are tiny, deterministic stand-ins for the compiled-flow +object and native exception a real ChainWeaver build would hand you, so the +demo runs offline and in CI. In production you point ``ChainWeaverDriver`` at a +real compiled flow; the driver only relies on a ``run(inputs)`` method and a +``flow_id`` attribute, which keeps ChainWeaver an optional dependency. + +Run with: ``python examples/chainweaver_flow.py`` +""" + +from __future__ import annotations + +import asyncio +from collections.abc import Callable +from dataclasses import dataclass, field +from typing import Any + +from agent_kernel import ( + Capability, + CapabilityRegistry, + HMACTokenProvider, + Kernel, + Principal, + SafetyClass, + StaticRouter, +) +from agent_kernel.drivers.base import ExecutionContext +from agent_kernel.errors import DriverError +from agent_kernel.models import CapabilityRequest, ImplementationRef, RawResult + +_SECRET = "example-secret-do-not-use-in-prod" + + +# ── ChainWeaver stand-ins (a real build supplies these) ───────────────────── + + +class FlowExecutionError(Exception): + """Stand-in for ChainWeaver's native flow-execution exception. + + Carries the orchestration context a real ChainWeaver error would: which + flow failed and at which step. The :class:`ChainWeaverDriver` translates + this into a kernel :class:`~agent_kernel.errors.DriverError` so the context + survives into the audit trail instead of leaking a raw stack trace. + """ + + def __init__(self, flow_id: str, step: str, cause: str) -> None: + super().__init__(f"flow '{flow_id}' failed at step '{step}': {cause}") + self.flow_id = flow_id + self.step = step + self.cause = cause + + +@dataclass(slots=True) +class FlowStep: + """One named step in a compiled flow.""" + + name: str + run: Callable[[dict[str, Any]], dict[str, Any]] + """Transforms the running context dict and returns the keys it produced.""" + + +@dataclass(slots=True) +class CompiledFlow: + """Deterministic stand-in for a ChainWeaver compiled flow. + + Executes its :attr:`steps` in order, threading a shared context dict. A step + that raises is reported as a :class:`FlowExecutionError` naming the flow and + the failing step — the shape a real ChainWeaver error carries. + """ + + flow_id: str + steps: list[FlowStep] = field(default_factory=list) + + def run(self, inputs: dict[str, Any]) -> dict[str, Any]: + """Run every step in order and return the accumulated context.""" + context: dict[str, Any] = dict(inputs) + for step in self.steps: + try: + context.update(step.run(context)) + except Exception as exc: # translate to ChainWeaver's error shape + raise FlowExecutionError(self.flow_id, step.name, str(exc)) from exc + return context + + +# ── The adapter: a compiled flow behind the Driver protocol ───────────────── + + +class ChainWeaverDriver: + """Driver that runs ChainWeaver compiled flows as capabilities. + + Operations map to compiled flows. ``execute`` runs the flow named by + ``ctx.args['operation']`` (falling back to ``ctx.capability_id``) with the + remaining args as inputs. A :class:`FlowExecutionError` is re-raised as a + :class:`~agent_kernel.errors.DriverError` that keeps the flow id and failing + step, so the orchestration context is preserved for the caller and the + audit trail. + """ + + def __init__(self, flows: dict[str, CompiledFlow], *, driver_id: str = "chainweaver") -> None: + self._flows = dict(flows) + self._driver_id = driver_id + + @property + def driver_id(self) -> str: + """Unique identifier for this driver.""" + return self._driver_id + + async def execute(self, ctx: ExecutionContext) -> RawResult: + """Run the compiled flow bound to this capability's operation.""" + operation = str(ctx.args.get("operation", ctx.capability_id)) + flow = self._flows.get(operation) + if flow is None: + raise DriverError( + f"ChainWeaverDriver '{self._driver_id}' has no flow for operation='{operation}'." + ) + inputs = {k: v for k, v in ctx.args.items() if k != "operation"} + try: + output = flow.run(inputs) + except FlowExecutionError as exc: + raise DriverError( + f"ChainWeaver flow '{exc.flow_id}' failed at step '{exc.step}': {exc.cause}" + ) from exc + return RawResult( + capability_id=ctx.capability_id, + data=output, + metadata={"flow_id": flow.flow_id, "steps": [s.name for s in flow.steps]}, + ) + + +# ── A small, deterministic release-notes flow ─────────────────────────────── + + +def _collect_changes(context: dict[str, Any]) -> dict[str, Any]: + """Step 1: normalise the raw change list, failing loudly when absent.""" + changes = context.get("changes") + if not changes: + raise ValueError("no 'changes' provided to summarize") + return {"changes": [str(c).strip() for c in changes]} + + +def _classify(context: dict[str, Any]) -> dict[str, Any]: + """Step 2: split changes into highlights (feat/fix) and the rest.""" + highlights = [c for c in context["changes"] if c.startswith(("feat", "fix"))] + return {"highlights": highlights} + + +def _render_summary(context: dict[str, Any]) -> dict[str, Any]: + """Step 3: render the final, audit-friendly summary payload.""" + return { + "release": context.get("release", "unreleased"), + "change_count": len(context["changes"]), + "highlights": context["highlights"], + } + + +def build_release_flow() -> CompiledFlow: + """A three-step compiled flow that summarizes release notes.""" + return CompiledFlow( + flow_id="release_notes_summary", + steps=[ + FlowStep("collect_changes", _collect_changes), + FlowStep("classify", _classify), + FlowStep("render_summary", _render_summary), + ], + ) + + +def build_kernel() -> Kernel: + """Wire a kernel with one capability backed by the compiled flow.""" + registry = CapabilityRegistry() + registry.register( + Capability( + capability_id="flows.summarize_release", + name="Summarize Release", + description="Run the ChainWeaver release-notes summary flow", + safety_class=SafetyClass.READ, + tags=["flows", "release", "summary", "chainweaver"], + impl=ImplementationRef(driver_id="chainweaver", operation="summarize_release"), + ) + ) + router = StaticRouter(routes={"flows.summarize_release": ["chainweaver"]}) + kernel = Kernel( + registry=registry, + token_provider=HMACTokenProvider(secret=_SECRET), + router=router, + ) + kernel.register_driver(ChainWeaverDriver(flows={"summarize_release": build_release_flow()})) + return kernel + + +async def run_flow(kernel: Kernel, principal: Principal, args: dict[str, Any]) -> str: + """Invoke the flow-backed capability and return its audit ``action_id``.""" + request = CapabilityRequest( + capability_id="flows.summarize_release", + goal="summarize the release notes for the next tag", + ) + token = kernel.get_token(request, principal, justification="") + frame = await kernel.invoke( + token, + principal=principal, + args={"operation": "summarize_release", **args}, + ) + return frame.action_id + + +async def main() -> None: + kernel = build_kernel() + # A release agent: may read (run flows), not an admin. + agent = Principal(principal_id="release-bot", roles=["reader"]) + + print("=== ChainWeaver compiled flow as a policy-controlled capability ===") + + print("\n=== Success: the wrapped flow runs and is audited ===") + action_id = await run_flow( + kernel, + agent, + { + "release": "v1.4.0", + "changes": ["feat: add export", "fix: token drift", "chore: bump deps"], + }, + ) + trace = kernel.explain(action_id) + print(f" audited: action_id={trace.action_id} driver={trace.driver_id}") + print(f" result_summary={trace.result_summary}") + assert trace.driver_id == "chainweaver", "the flow must run through the ChainWeaver driver" + assert trace.error is None, "the success path must not record an error" + + print("\n=== Failure: a step error is preserved as kernel context ===") + try: + await run_flow(kernel, agent, {"release": "v1.4.1", "changes": []}) + except DriverError as exc: + # The kernel wraps the driver error, but the ChainWeaver context — the + # flow id and the failing step — is preserved inside the message. + message = str(exc) + print(f" DriverError: {message}") + assert "release_notes_summary" in message, "flow id must survive into the kernel error" + assert "collect_changes" in message, "failing step must survive into the kernel error" + else: # pragma: no cover - defensive + raise SystemExit("Expected DriverError when the flow's first step fails") + + print("\n✓ chainweaver_flow.py complete.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/evaluation_artifact_policy.py b/examples/evaluation_artifact_policy.py new file mode 100644 index 0000000..139307b --- /dev/null +++ b/examples/evaluation_artifact_policy.py @@ -0,0 +1,343 @@ +"""evaluation_artifact_policy.py — gate agent actions on evaluation artifacts. + +The written walkthrough lives in ``docs/integrations/evaluation_artifacts.md``. +This script is the runnable companion. It shows the Weaver-ecosystem pattern for +letting an agent *summarize* a statistical/model-evaluation artifact (such as an +offline policy-evaluation report) while *denying* high-impact actions — +deployment or automatic rollout — when the artifact's support diagnostics say +the headline estimate is not trustworthy. + +The key distinction the policy enforces: + + * **Summarizing evidence** is always allowed (with caveats surfaced). + * **Acting on evidence** (recommending deployment / automatic rollout) is + gated: it is downgraded to a *manual-review* recommendation whenever the + support diagnostics are weak, the decision is unstable, or warnings exist. + +``assess_artifact`` is a generic, producer-agnostic policy layer: it inspects +documented fields (``support_health``, ``warnings``, ``uncertainty``, +``decision_stable``, ``recommendation.intent``, ``limitations``) on a plain +dict artifact, so it works for any producer — not just ``skdr-eval``. agent-kernel +does **not** implement offline policy evaluation or any statistical estimation, +and takes no dependency on a specific producer; the artifacts here are fixtures. + +Run with: ``python examples/evaluation_artifact_policy.py`` +""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field +from typing import Any + +from agent_kernel import ( + Capability, + CapabilityRegistry, + HMACTokenProvider, + InMemoryDriver, + Kernel, + Principal, + SafetyClass, + StaticRouter, +) +from agent_kernel.drivers.base import ExecutionContext +from agent_kernel.models import CapabilityRequest, ImplementationRef + +_SECRET = "example-secret-do-not-use-in-prod" + +# Stable decision codes (see docs/integrations/evaluation_artifacts.md). Kept as +# string constants so callers branch on a stable vocabulary, not on prose. +ALLOW_SUMMARY = "allow_summary" +ALLOW_MANUAL_REVIEW_RECOMMENDATION = "allow_manual_review_recommendation" +REQUIRE_HUMAN_REVIEW = "require_human_review" +DENY_DEPLOYMENT_RECOMMENDATION = "deny_deployment_recommendation" +DENY_AUTOMATIC_ROLLOUT = "deny_automatic_rollout" + +# Support-health states an artifact may report, safest first when defaulting. +_HEALTH_OK = "ok" +_HEALTH_CAUTION = "caution" +_HEALTH_HIGH_RISK = "high_risk" +_KNOWN_HEALTH = frozenset({_HEALTH_OK, _HEALTH_CAUTION, _HEALTH_HIGH_RISK}) + + +@dataclass(slots=True) +class ArtifactDecision: + """The outcome of assessing an evaluation artifact for a deployment intent. + + ``allowed_actions`` / ``denied_actions`` carry the stable decision codes; the + convenience flag :attr:`allows_deployment` is the top-level gate the host + uses to decide whether to grant the high-impact capability. ``reasons`` + explains *why* — these strings are recorded in the audit trace when an + action is downgraded. + """ + + support_health: str + allowed_actions: tuple[str, ...] + denied_actions: tuple[str, ...] + reasons: tuple[str, ...] = field(default_factory=tuple) + + @property + def allows_deployment(self) -> bool: + """True only when recommending deployment is not a denied action.""" + return DENY_DEPLOYMENT_RECOMMENDATION not in self.denied_actions + + +def assess_artifact(artifact: dict[str, Any]) -> ArtifactDecision: + """Decide which actions an agent may take given an evaluation *artifact*. + + Generic and producer-agnostic: reads documented fields off a plain dict. + Deployment is permitted only when *several* signals agree — support health + is ``ok``, the decision is stable, no warnings are present, and the + artifact's own recommendation does not say to hold. Relying on more than a + single numeric metric is deliberate (a good point estimate with poor support + must still be blocked). + + Args: + artifact: A mapping with optional keys ``support_health`` (``"ok"`` / + ``"caution"`` / ``"high_risk"``), ``warnings`` (list), + ``decision_stable`` (bool), ``uncertainty`` (mapping/str), + ``recommendation`` (mapping with ``intent``), and ``limitations`` + (list). Missing fields default to the safest interpretation. + + Returns: + An :class:`ArtifactDecision`. Summarizing is always allowed; deployment + is allowed only when every gating signal is satisfied. + """ + # Default to — and normalise any unknown value to — the safest state, so a + # missing or garbage support_health can never read as deployable. + raw_health = str(artifact.get("support_health", _HEALTH_HIGH_RISK)) + support_health = raw_health if raw_health in _KNOWN_HEALTH else _HEALTH_HIGH_RISK + warnings = list(artifact.get("warnings") or []) + decision_stable = bool(artifact.get("decision_stable", False)) + recommendation = artifact.get("recommendation") or {} + intent = ( + str(recommendation.get("intent", "")).lower() if isinstance(recommendation, dict) else "" + ) + + reasons: list[str] = [] + if support_health != _HEALTH_OK: + reasons.append(f"support_health={support_health}") + if not decision_stable: + reasons.append("decision is not stable") + if warnings: + reasons.append(f"{len(warnings)} warning(s): {', '.join(str(w) for w in warnings)}") + if intent in {"hold", "do_not_deploy", "manual_review"}: + reasons.append(f"artifact recommends '{intent}'") + + # Summarizing evidence is always allowed. + allowed: list[str] = [ALLOW_SUMMARY] + denied: list[str] = [] + + if reasons: + # Acting on the evidence is downgraded to a manual-review recommendation. + allowed.append(ALLOW_MANUAL_REVIEW_RECOMMENDATION) + denied.extend([DENY_DEPLOYMENT_RECOMMENDATION, DENY_AUTOMATIC_ROLLOUT]) + if support_health == _HEALTH_HIGH_RISK: + allowed.append(REQUIRE_HUMAN_REVIEW) + + return ArtifactDecision( + support_health=support_health, + allowed_actions=tuple(allowed), + denied_actions=tuple(denied), + reasons=tuple(reasons), + ) + + +def build_kernel() -> Kernel: + """Wire a kernel with a summarize READ and two gated WRITE recommendations.""" + registry = CapabilityRegistry() + registry.register( + Capability( + capability_id="eval.summarize_artifact", + name="Summarize Evaluation Artifact", + description="Summarize an evaluation artifact and surface its caveats", + safety_class=SafetyClass.READ, + tags=["eval", "artifact", "summarize", "report"], + impl=ImplementationRef(driver_id="memory", operation="summarize_artifact"), + ) + ) + registry.register( + Capability( + capability_id="eval.recommend_deployment", + name="Recommend Deployment", + description="Recommend deploying/rolling out the evaluated candidate", + safety_class=SafetyClass.WRITE, + tags=["eval", "deploy", "rollout", "recommendation"], + impl=ImplementationRef(driver_id="memory", operation="recommend_deployment"), + ) + ) + registry.register( + Capability( + capability_id="eval.recommend_manual_review", + name="Recommend Manual Review", + description="Recommend human review / improving logs instead of deploying", + safety_class=SafetyClass.WRITE, + tags=["eval", "manual", "review", "recommendation"], + impl=ImplementationRef(driver_id="memory", operation="recommend_manual_review"), + ) + ) + + driver = InMemoryDriver() + + def summarize_artifact(ctx: ExecutionContext) -> dict[str, Any]: + artifact = ctx.args.get("artifact", {}) + decision = assess_artifact(artifact) + return { + "artifact_type": artifact.get("artifact_type", "evaluation"), + "support_health": decision.support_health, + "caveats": list(artifact.get("limitations") or []) + list(decision.reasons), + } + + def recommend_deployment(ctx: ExecutionContext) -> dict[str, Any]: + return {"recommendation": "deploy", "candidate": ctx.args.get("candidate", "candidate")} + + def recommend_manual_review(ctx: ExecutionContext) -> dict[str, Any]: + return {"recommendation": "manual_review", "reason": ctx.args.get("reason", "")} + + driver.register_handler("summarize_artifact", summarize_artifact) + driver.register_handler("recommend_deployment", recommend_deployment) + driver.register_handler("recommend_manual_review", recommend_manual_review) + + router = StaticRouter( + routes={ + "eval.summarize_artifact": ["memory"], + "eval.recommend_deployment": ["memory"], + "eval.recommend_manual_review": ["memory"], + } + ) + kernel = Kernel( + registry=registry, + token_provider=HMACTokenProvider(secret=_SECRET), + router=router, + ) + kernel.register_driver(driver) + return kernel + + +async def summarize(kernel: Kernel, principal: Principal, artifact: dict[str, Any]) -> str: + """Summarize the artifact (always allowed) and return the audit ``action_id``.""" + request = CapabilityRequest( + capability_id="eval.summarize_artifact", + goal="summarize the evaluation artifact and its caveats", + ) + token = kernel.get_token(request, principal, justification="") + frame = await kernel.invoke( + token, + principal=principal, + args={"operation": "summarize_artifact", "artifact": artifact}, + ) + return frame.action_id + + +async def act_on_artifact( + kernel: Kernel, principal: Principal, artifact: dict[str, Any] +) -> tuple[str, str]: + """Gate the deployment recommendation on the artifact assessment. + + Returns ``(capability_id, action_id)`` for whichever action was taken: the + deployment recommendation when the artifact is trustworthy, otherwise the + downgraded manual-review recommendation whose audit args record *why*. + """ + decision = assess_artifact(artifact) + if decision.allows_deployment: + request = CapabilityRequest( + capability_id="eval.recommend_deployment", + goal="recommend deploying the evaluated candidate", + ) + token = kernel.get_token( + request, + principal, + justification="evaluation support is healthy and the decision is stable", + ) + frame = await kernel.invoke( + token, + principal=principal, + args={"operation": "recommend_deployment", "candidate": artifact.get("candidate", "")}, + ) + return ("eval.recommend_deployment", frame.action_id) + + # Downgrade: record the reasons in the audit args so the trace explains why + # deployment was denied (ActionTrace.args is preserved for non-memory caps). + request = CapabilityRequest( + capability_id="eval.recommend_manual_review", + goal="recommend manual review because the evaluation support is weak", + ) + token = kernel.get_token( + request, + principal, + justification="downgraded from deployment: weak support diagnostics", + ) + frame = await kernel.invoke( + token, + principal=principal, + args={ + "operation": "recommend_manual_review", + "reason": "; ".join(decision.reasons), + "downgraded_from": "recommend_deployment", + }, + ) + return ("eval.recommend_manual_review", frame.action_id) + + +async def handle( + kernel: Kernel, principal: Principal, label: str, artifact: dict[str, Any] +) -> None: + """Summarize then (conditionally) act on one artifact, printing the audit.""" + print(f"\n=== Artifact: {label} (support_health={artifact.get('support_health')}) ===") + summary_action = await summarize(kernel, principal, artifact) + print(f" summarize: allowed — action_id={summary_action} (always permitted)") + + capability_id, action_id = await act_on_artifact(kernel, principal, artifact) + trace = kernel.explain(action_id) + if capability_id == "eval.recommend_deployment": + print(f" action: deployment recommended — action_id={trace.action_id}") + else: + print(f" action: DOWNGRADED to manual review — action_id={trace.action_id}") + print(f" audited reason: {trace.args.get('reason')}") + assert trace.args.get("reason"), "the downgrade reason must be recorded in the audit trace" + + +async def main() -> None: + kernel = build_kernel() + # An analyst agent: may read and write recommendations, not an admin. + analyst = Principal(principal_id="eval-analyst", roles=["reader", "writer"]) + + print("=== Policy guardrails for statistical evaluation artifacts ===") + + healthy = { + "artifact_type": "offline_policy_evaluation", + "support_health": "ok", + "decision_stable": True, + "warnings": [], + "uncertainty": {"ci_width": "narrow"}, + "recommendation": {"intent": "deploy"}, + "limitations": [], + "candidate": "policy-v2", + } + cautious = { + "artifact_type": "offline_policy_evaluation", + "support_health": "caution", + "decision_stable": True, + "warnings": ["moderate overlap"], + "recommendation": {"intent": "deploy"}, + "candidate": "policy-v2", + } + risky = { + "artifact_type": "offline_policy_evaluation", + "support_health": "high_risk", + "decision_stable": False, + "warnings": ["low effective sample size", "poor overlap"], + "recommendation": {"intent": "hold"}, + "limitations": ["estimate extrapolates beyond logged support"], + "candidate": "policy-v2", + } + + await handle(kernel, analyst, "healthy", healthy) + await handle(kernel, analyst, "caution", cautious) + await handle(kernel, analyst, "high_risk", risky) + + print("\n✓ evaluation_artifact_policy.py complete.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/test_chainweaver_flow.py b/tests/test_chainweaver_flow.py new file mode 100644 index 0000000..b8a0338 --- /dev/null +++ b/tests/test_chainweaver_flow.py @@ -0,0 +1,74 @@ +"""Tests for the ChainWeaver integration example (issue #95). + +Verifies the two acceptance-critical behaviors of +``examples/chainweaver_flow.py``: running a wrapped compiled flow produces a +kernel-visible execution record (an ``ActionTrace``), and a failing flow step +surfaces as a ``DriverError`` that preserves the ChainWeaver context (flow id +and failing step). +""" + +from __future__ import annotations + +import importlib.util +import sys +from pathlib import Path +from types import ModuleType + +import pytest + +from agent_kernel import Principal +from agent_kernel.errors import DriverError + +_EXAMPLES = Path(__file__).resolve().parent.parent / "examples" + + +def _load_example(name: str) -> ModuleType: + """Import an example module by file path (examples are not a package).""" + spec = importlib.util.spec_from_file_location(name, _EXAMPLES / f"{name}.py") + assert spec is not None and spec.loader is not None + module = importlib.util.module_from_spec(spec) + sys.modules[name] = module # let dataclass field resolution find the module + spec.loader.exec_module(module) + return module + + +cw = _load_example("chainweaver_flow") + + +def test_compiled_flow_runs_steps_in_order() -> None: + """The compiled-flow stand-in threads context through every step.""" + flow = cw.build_release_flow() + result = flow.run({"release": "v1.0.0", "changes": ["feat: a", "chore: b"]}) + assert result["change_count"] == 2 + assert result["highlights"] == ["feat: a"] + + +def test_compiled_flow_raises_flow_error_with_context() -> None: + """A failing step is reported with the flow id and failing step name.""" + flow = cw.build_release_flow() + with pytest.raises(cw.FlowExecutionError) as excinfo: + flow.run({"release": "v1.0.0", "changes": []}) + assert excinfo.value.flow_id == "release_notes_summary" + assert excinfo.value.step == "collect_changes" + + +async def test_wrapped_flow_produces_audit_trace() -> None: + """Invoking the flow-backed capability records a kernel-visible trace.""" + kernel = cw.build_kernel() + principal = Principal(principal_id="release-bot", roles=["reader"]) + action_id = await cw.run_flow( + kernel, principal, {"release": "v2.0.0", "changes": ["feat: x", "fix: y"]} + ) + trace = kernel.explain(action_id) + assert trace.capability_id == "flows.summarize_release" + assert trace.driver_id == "chainweaver" + assert trace.error is None + assert trace.result_summary is not None + + +async def test_flow_failure_preserves_chainweaver_context() -> None: + """A flow step failure surfaces as DriverError keeping flow id + step.""" + kernel = cw.build_kernel() + principal = Principal(principal_id="release-bot", roles=["reader"]) + with pytest.raises(DriverError, match="release_notes_summary.*collect_changes"): + await cw.run_flow(kernel, principal, {"release": "v2.0.1", "changes": []}) diff --git a/tests/test_evaluation_artifact_policy.py b/tests/test_evaluation_artifact_policy.py new file mode 100644 index 0000000..90c7a44 --- /dev/null +++ b/tests/test_evaluation_artifact_policy.py @@ -0,0 +1,144 @@ +"""Tests for the evaluation-artifact policy example (issue #96). + +Covers the three required support-health states (``ok`` / ``caution`` / +``high_risk``), confirms the gate relies on more than a single signal, and +verifies that a downgraded action records *why* in the audit trace. +""" + +from __future__ import annotations + +import importlib.util +import sys +from pathlib import Path +from types import ModuleType + +import pytest + +from agent_kernel import Principal + +_EXAMPLES = Path(__file__).resolve().parent.parent / "examples" + + +def _load_example(name: str) -> ModuleType: + """Import an example module by file path (examples are not a package).""" + spec = importlib.util.spec_from_file_location(name, _EXAMPLES / f"{name}.py") + assert spec is not None and spec.loader is not None + module = importlib.util.module_from_spec(spec) + sys.modules[name] = module # let dataclass field resolution find the module + spec.loader.exec_module(module) + return module + + +eap = _load_example("evaluation_artifact_policy") + + +def test_ok_artifact_allows_deployment() -> None: + """A healthy, stable artifact with no warnings permits deployment.""" + artifact = { + "support_health": "ok", + "decision_stable": True, + "warnings": [], + "recommendation": {"intent": "deploy"}, + } + decision = eap.assess_artifact(artifact) + assert decision.allows_deployment is True + assert decision.denied_actions == () + assert eap.ALLOW_SUMMARY in decision.allowed_actions + + +def test_caution_artifact_downgrades_without_human_review() -> None: + """A caution artifact denies deployment but does not force human review.""" + artifact = { + "support_health": "caution", + "decision_stable": True, + "warnings": ["moderate overlap"], + "recommendation": {"intent": "deploy"}, + } + decision = eap.assess_artifact(artifact) + assert decision.allows_deployment is False + assert eap.DENY_DEPLOYMENT_RECOMMENDATION in decision.denied_actions + assert eap.DENY_AUTOMATIC_ROLLOUT in decision.denied_actions + assert eap.ALLOW_MANUAL_REVIEW_RECOMMENDATION in decision.allowed_actions + assert eap.REQUIRE_HUMAN_REVIEW not in decision.allowed_actions + + +def test_high_risk_artifact_requires_human_review() -> None: + """A high_risk artifact denies deployment and requires human review.""" + artifact = { + "support_health": "high_risk", + "decision_stable": False, + "warnings": ["low effective sample size", "poor overlap"], + "recommendation": {"intent": "hold"}, + } + decision = eap.assess_artifact(artifact) + assert decision.allows_deployment is False + assert eap.DENY_DEPLOYMENT_RECOMMENDATION in decision.denied_actions + assert eap.REQUIRE_HUMAN_REVIEW in decision.allowed_actions + assert decision.reasons # at least one reason recorded + + +def test_gate_is_not_single_metric() -> None: + """Good support health is insufficient when the decision is unstable.""" + artifact = { + "support_health": "ok", + "decision_stable": False, + "warnings": [], + "recommendation": {"intent": "deploy"}, + } + decision = eap.assess_artifact(artifact) + assert decision.allows_deployment is False + assert "decision is not stable" in "; ".join(decision.reasons) + + +def test_missing_fields_default_to_safest() -> None: + """An artifact missing diagnostics is treated as high_risk, not deployable.""" + decision = eap.assess_artifact({}) + assert decision.support_health == "high_risk" + assert decision.allows_deployment is False + + +def test_unknown_support_health_normalised_to_safest() -> None: + """An unrecognised support_health value cannot read as deployable.""" + decision = eap.assess_artifact( + {"support_health": "looks_fine", "decision_stable": True, "warnings": []} + ) + assert decision.support_health == "high_risk" + assert decision.allows_deployment is False + + +@pytest.mark.parametrize( + ("artifact", "expected_capability"), + [ + ( + { + "support_health": "ok", + "decision_stable": True, + "warnings": [], + "recommendation": {"intent": "deploy"}, + }, + "eval.recommend_deployment", + ), + ( + { + "support_health": "high_risk", + "decision_stable": False, + "warnings": ["low ESS"], + "recommendation": {"intent": "hold"}, + }, + "eval.recommend_manual_review", + ), + ], +) +async def test_act_on_artifact_audits_decision( + artifact: dict[str, object], expected_capability: str +) -> None: + """The chosen action is audited; a downgrade records its reason.""" + kernel = eap.build_kernel() + analyst = Principal(principal_id="eval-analyst", roles=["reader", "writer"]) + capability_id, action_id = await eap.act_on_artifact(kernel, analyst, artifact) + assert capability_id == expected_capability + trace = kernel.explain(action_id) + assert trace.capability_id == expected_capability + if expected_capability == "eval.recommend_manual_review": + assert trace.args.get("reason"), "downgrade must record the reason in the audit trace" + assert trace.args.get("downgraded_from") == "recommend_deployment" From c8ec6c42aead7ffa5c224e659d54ce1c6374f17d Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 3 Jun 2026 21:58:16 +0000 Subject: [PATCH 2/2] docs: clarify ActionTrace.args redaction note in evaluation-artifact cookbook MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The example comment and integration doc said args are preserved for 'non-memory capabilities', which could be misread against the driver_id="memory" InMemoryDriver. Clarify that the kernel only redacts args for 'memory.'-prefixed capability ids, so the eval.* caps keep their args in ActionTrace.args — the InMemoryDriver is unrelated to that redaction. https://claude.ai/code/session_01MYbb8GVSbkq6MpNo2VhGR9 --- docs/integrations/evaluation_artifacts.md | 5 +++-- examples/evaluation_artifact_policy.py | 4 +++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/docs/integrations/evaluation_artifacts.md b/docs/integrations/evaluation_artifacts.md index 5f82358..0fc2b0e 100644 --- a/docs/integrations/evaluation_artifacts.md +++ b/docs/integrations/evaluation_artifacts.md @@ -72,8 +72,9 @@ a good point estimate with poor support is still blocked. When deployment is denied, the host does not grant `eval.recommend_deployment`; instead it invokes `eval.recommend_manual_review` with the reasons in the call -args. Because `ActionTrace.args` is preserved for non-memory capabilities, the -audit trace records *why* the action was downgraded: +args. Because the kernel only redacts args for `memory.`-prefixed capability ids, +these `eval.*` capabilities keep their args in `ActionTrace.args`, so the audit +trace records *why* the action was downgraded: ```python capability_id, action_id = await act_on_artifact(kernel, principal, artifact) diff --git a/examples/evaluation_artifact_policy.py b/examples/evaluation_artifact_policy.py index 139307b..bfc1ecb 100644 --- a/examples/evaluation_artifact_policy.py +++ b/examples/evaluation_artifact_policy.py @@ -257,7 +257,9 @@ async def act_on_artifact( return ("eval.recommend_deployment", frame.action_id) # Downgrade: record the reasons in the audit args so the trace explains why - # deployment was denied (ActionTrace.args is preserved for non-memory caps). + # deployment was denied. The kernel only redacts args for "memory."-prefixed + # capability ids, so these eval.* caps keep their args in ActionTrace.args + # (the driver_id="memory" InMemoryDriver above is unrelated to that redaction). request = CapabilityRequest( capability_id="eval.recommend_manual_review", goal="recommend manual review because the evaluation support is weak",