diff --git a/CHANGELOG.md b/CHANGELOG.md index 85db382..fea2a5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,31 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- `ActionTrace.result_summary` (#93): successful invocations now record a + redaction-safe summary of the firewalled `Frame` (`fact_count`, `row_count`, + `warning_count`, `has_handle` — counts/flags only, never raw driver data), so + an invocation's outcome is auditable directly via `Kernel.explain`. A + repository safety check's pass/block decision is now recorded in the audit + trail (`result_summary["row_count"] == 0` ⇒ passed), not merely inferred from + whether a later publish occurred. Failed runs keep `result_summary == None`. +- Ecosystem integration cookbook: a new `docs/integrations/` section with two + reference flows and runnable, offline companions. + - **contextweaver — policy before action** (#92): documents that context + routing is advisory and policy enforcement still happens before execution. + New [`docs/integrations/contextweaver.md`](docs/integrations/contextweaver.md) + and [`examples/contextweaver_policy_flow.py`](examples/contextweaver_policy_flow.py) + demonstrate the `allow` / `ask`-confirm / `deny` outcomes (the `ask` outcome + is derived from the recoverable `insufficient_justification` denial code). + - **Repository safety check as a policy-controlled capability** (#93): a + `RepositoryCheckDriver` that shells out to a local checker (e.g. VibeGuard) + gates a high-impact `repo.publish_artifact` action behind a passing + `repo.code_safety_check`, with both steps recorded in the audit trace. New + [`docs/integrations/repository_safety_check.md`](docs/integrations/repository_safety_check.md) + and [`examples/repository_safety_check.py`](examples/repository_safety_check.py). + - Both examples run in `make ci`; `docs/integrations.md` and the README link + the new pages. + ## [0.9.0] - 2026-05-29 ### Added diff --git a/Makefile b/Makefile index 76f5d35..b4886a2 100644 --- a/Makefile +++ b/Makefile @@ -21,5 +21,7 @@ example: python examples/http_driver_demo.py python examples/tutorial.py python examples/readme_quickstart.py + python examples/contextweaver_policy_flow.py + python examples/repository_safety_check.py ci: fmt-check lint type test example diff --git a/README.md b/README.md index ac91c54..b8d2d09 100644 --- a/README.md +++ b/README.md @@ -187,6 +187,8 @@ See [docs/agent-context/invariants.md](docs/agent-context/invariants.md) for the - [Architecture](docs/architecture.md) - [Security model](docs/security.md) - [Integrations (MCP, HTTPDriver)](docs/integrations.md) + - [contextweaver: policy before action](docs/integrations/contextweaver.md) + - [Repository safety checks as a capability](docs/integrations/repository_safety_check.md) - [Designing capabilities](docs/capabilities.md) - [Context Firewall](docs/context_firewall.md) diff --git a/docs/architecture.md b/docs/architecture.md index 4f6bb70..289e0ff 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -134,7 +134,7 @@ Transforms `RawResult → Frame`. Never exposes raw output to the LLM. Stores full results by opaque handle ID with TTL. `expand()` supports pagination, field selection, and basic equality filtering. ### TraceStore -Records every `ActionTrace`. `explain(action_id)` returns the full audit record. +Records every `ActionTrace`. `explain(action_id)` returns the full audit record. On a successful invocation the trace also carries a `result_summary` — a redaction-safe dict of counts/flags (`fact_count`, `row_count`, `warning_count`, `has_handle`) derived from the firewalled `Frame`, never from raw driver data — so an invocation's outcome is auditable directly (e.g. a repository safety check passed iff `result_summary["row_count"] == 0`). Failed runs have `result_summary == None`. ### Adapters (`agent_kernel.adapters`) Vendor-specific tool-format adapters that translate between `Capability` objects diff --git a/docs/integrations.md b/docs/integrations.md index f1ae165..dfbc8cd 100644 --- a/docs/integrations.md +++ b/docs/integrations.md @@ -362,3 +362,18 @@ instrument_kernel(kernel) `instrument_kernel` is idempotent — calling twice on the same kernel is a no-op. Use `agent_kernel.otel.reset_instrumentation(kernel)` in tests to re-instrument with a different provider. + +## Ecosystem integration patterns + +These reference flows show how agent-kernel composes with neighboring Weaver +projects and external checkers. Each has a runnable, offline companion under +`examples/`. + +- [contextweaver + agent-kernel: policy before action](integrations/contextweaver.md) + — context routing is advisory; selection is not permission. Demonstrates the + `allow` / `ask`-confirm / `deny` outcomes. + Companion: [`examples/contextweaver_policy_flow.py`](../examples/contextweaver_policy_flow.py). +- [Repository safety checks as a policy-controlled capability](integrations/repository_safety_check.md) + — gate a high-impact action behind a deterministic check that shells out to a + local command (e.g. VibeGuard), with the result recorded in the audit trace. + Companion: [`examples/repository_safety_check.py`](../examples/repository_safety_check.py). diff --git a/docs/integrations/contextweaver.md b/docs/integrations/contextweaver.md new file mode 100644 index 0000000..176ad50 --- /dev/null +++ b/docs/integrations/contextweaver.md @@ -0,0 +1,100 @@ +# contextweaver + agent-kernel: policy before action + +`contextweaver` and `agent-kernel` solve adjacent parts of the same runtime +problem and are designed to compose: + +- **`contextweaver`** narrows *what the model sees* — it compiles context and + produces a shortlist of candidate tool cards for a goal. +- **`agent-kernel`** enforces *what the agent may do* — capabilities, policy, + HMAC tokens, the context firewall, and the audit trace. + +The contract between them is simple and worth stating plainly: + +> **Routing is advisory. Selection is not permission.** A capability appearing +> on a contextweaver shortlist does not authorize it. Every selected action +> still flows through the full agent-kernel pipeline +> (policy → token → invoke → firewall → trace) before anything executes. + +This page describes the reference flow. The runnable companion is +[`examples/contextweaver_policy_flow.py`](../../examples/contextweaver_policy_flow.py), +which uses synthetic data only and runs offline (it does **not** depend on +`contextweaver`). + +## The flow + +``` +goal + │ + ▼ +contextweaver → tool-card shortlist (advisory candidate set) + │ + ▼ +host / model selects an intended action + │ + ▼ +agent-kernel → grant_capability() → policy decision + │ │ + │ ├─ allow → invoke() → firewall → Frame + ActionTrace + │ ├─ ask → recoverable denial; host confirms, retries + │ └─ deny → terminal denial; host must not retry + ▼ +ActionTrace (audit) +``` + +1. **contextweaver** builds a small candidate/tool-card shortlist for the goal. + The shortlist is deliberately permissive — it may include capabilities the + current principal is not allowed to use. +2. The host (or model) selects an intended action from the shortlist. +3. **agent-kernel** evaluates that action against capability/policy rules. +4. Only approved actions proceed to execution; the rest are surfaced as + `ask`/confirm or `deny`. + +## Three outcomes on a binary policy + +agent-kernel's `PolicyDecision` is binary (`allowed: bool`) — there is no +dedicated "ask" verdict. The three host-level outcomes are derived from the +stable `reason_code` on a denial: + +| Outcome | How it arises | Stable signal | Host behavior | +|---|---|---|---| +| **allow** | The principal satisfies policy. | `grant_capability()` returns a grant; `invoke()` returns a `Frame`. | Proceed; the action is audited. | +| **ask / confirm** | A WRITE action — or a DESTRUCTIVE action the principal is *otherwise authorized for* — is missing its justification. | `PolicyDenied` with `reason_code == DenialReason.INSUFFICIENT_JUSTIFICATION`. | *Recoverable.* Prompt the human to confirm and supply a justification, then re-grant. | +| **deny** | The principal lacks a required role (or another non-recoverable rule fails). | `PolicyDenied` with e.g. `reason_code == DenialReason.MISSING_ROLE`. | *Terminal.* Do not retry as-is; surface remediation from `Kernel.explain_denial()`. | + +Treating a missing justification as `ask` is the natural mapping: the +`DefaultPolicyEngine` already requires a justification of at least 15 characters +for WRITE/DESTRUCTIVE capabilities, so "ask the human to confirm and explain" +is exactly what unblocks the call. A missing role, by contrast, cannot be +satisfied by confirmation and is terminal for that principal. + +Note the ordering: role checks run **before** the justification check, so the +`ask` outcome is only reachable once the principal already satisfies the role +requirement. For the DESTRUCTIVE `tickets.delete` below, the support agent lacks +`admin`, so it surfaces as a terminal `missing_role` `deny` regardless of +justification — the example never reaches `ask` for that capability. + +## Why the shortlist is not a grant + +In the example, contextweaver shortlists `docs.search` (READ), +`tickets.update_status` (WRITE), **and** `tickets.delete` (DESTRUCTIVE) for a +support-agent goal. The agent principal holds `["reader", "writer"]` but not +`admin`, so `tickets.delete` is denied at grant time even though it was +"routed". This is the whole point: context selection improves relevance; it +never widens authority. + +## Mapping to Weaver concepts + +- A contextweaver tool card maps to a `Capability` (or its public + `CapabilityDescriptor`). +- "Selecting an action" maps to building a `CapabilityRequest` (carry the + machine-readable `intent` and `scope` so declarative policies can match + without parsing free text). +- Enforcement is `Kernel.grant_capability()` followed by `Kernel.invoke()`; + the resulting `ActionTrace` satisfies weaver-spec invariant **I-02** + (every execution is preceded by a policy decision and followed by a trace). + +## Related + +- `examples/contextweaver_policy_flow.py` — runnable, offline. +- [contextweaver](https://github.com/dgenio/contextweaver) +- [weaver-spec](https://github.com/dgenio/weaver-spec) diff --git a/docs/integrations/repository_safety_check.md b/docs/integrations/repository_safety_check.md new file mode 100644 index 0000000..1b6c9c3 --- /dev/null +++ b/docs/integrations/repository_safety_check.md @@ -0,0 +1,134 @@ +# Repository safety checks as a policy-controlled capability + +Agents that can write files, open PRs, or publish artifacts need a clear +pattern for running deterministic checks **before** a high-impact action. +agent-kernel already models policy enforcement, capabilities, firewall +redaction, and auditable tool calls — a repository-level check fits naturally +as a capability that is invoked under explicit policy and recorded in the audit +trace. + +This page describes the pattern. The runnable companion is +[`examples/repository_safety_check.py`](../../examples/repository_safety_check.py), +which is deterministic, offline, and depends on no specific checker. + +> agent-kernel does **not** implement scanning logic and does **not** depend on +> any particular checker. The check is an adapter that shells out to a local +> command. The example uses a tiny embedded scanner so it runs in CI; in +> production you point it at a real tool such as +> [VibeGuard](https://github.com/dgenio/vibeguard). + +## The pattern + +``` +agent wants to publish + │ + ▼ +repo.code_safety_check (READ capability → RepositoryCheckDriver shells out) + │ + ├─ clean → grant + invoke repo.publish_artifact (WRITE) + └─ findings → host blocks the publish; check result is still audited +``` + +Two capabilities: + +| Capability | Safety class | Backed by | Role | +|---|---|---|---| +| `repo.code_safety_check` | `READ` | `RepositoryCheckDriver` (shells out to a checker) | Runs a deterministic scan and returns findings. | +| `repo.publish_artifact` | `WRITE` | any execution driver | The high-impact action, gated behind a passing check. | + +## The shell-out adapter + +`RepositoryCheckDriver` implements the `Driver` protocol and runs the configured +command as `[*command, path]`: + +```python +class RepositoryCheckDriver: + def __init__(self, command: list[str], *, driver_id: str = "repo_safety") -> None: + self._command = list(command) + self._driver_id = driver_id + + @property + def driver_id(self) -> str: + return self._driver_id + + async def execute(self, ctx: ExecutionContext) -> RawResult: + path = ctx.args.get("path") + if not path: + raise DriverError("repository check requires a 'path' argument ...") + proc = await asyncio.create_subprocess_exec( + *self._command, str(path), + stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await proc.communicate() + if proc.returncode not in (0, 1): # checker itself failed + raise DriverError(f"...: {stderr.decode(errors='replace').strip()}") + findings = json.loads(stdout or b"[]") # [] = clean + return RawResult(capability_id=ctx.capability_id, data=findings, + metadata={"exit_code": proc.returncode}) +``` + +Conventions the adapter relies on (and most scanners follow): + +- **Exit `0`** = clean, **exit `1`** = findings present. Any other exit code is + treated as the checker *failing* and surfaces as a `DriverError`. +- Findings are a JSON array on stdout. Non-JSON output raises `DriverError` + rather than being silently treated as "clean". + +To use VibeGuard instead of the embedded scanner, construct the driver with its +command, e.g. `RepositoryCheckDriver(command=["vibeguard", "scan", "--json"])`. + +## Reading the verdict from the Frame + +The gate decision is made on the **firewalled Frame**, not on raw driver output. +Invoke the check in `table` mode and treat a non-empty preview as a block: + +```python +frame = await kernel.invoke(token, principal=principal, + args={"operation": "code_safety_check", "path": path}, + response_mode="table") +findings = list(frame.table_preview) +passed = not findings +``` + +When `passed` is `False`, the host simply does not grant +`repo.publish_artifact`. Because the check ran through `Kernel.invoke()`, it +produced an `ActionTrace` — so the decision to block is auditable via +`Kernel.explain(action_id)`, satisfying weaver-spec **I-02**. + +## Audit trail + +Both the check and (when it passes) the publish are recorded: + +```python +check_trace = kernel.explain(check_action_id) # always available +publish_trace = kernel.explain(publish_action_id) # only when the check passed +``` + +Each `ActionTrace` records the capability, principal, driver, and timestamp for +the step. It also carries a redaction-safe `result_summary` derived from the +firewalled `Frame` (counts and flags only — never raw findings), so the check's +pass/block **decision** is recorded in the audit trail itself, not merely +inferred from whether a later publish happened: + +```python +check_trace = kernel.explain(check_action_id) +blocked = (check_trace.result_summary or {}).get("row_count", 0) > 0 +``` + +`result_summary` is built only from the post-`Firewall` `Frame`, so recording it +never widens the I-01 boundary or leaks scanned content into the audit log. A +reviewer can therefore confirm both that the publish was preceded by a check and +what that check decided. + +## Non-goals + +- agent-kernel does not implement scanning logic. +- VibeGuard (or any checker) is never a required dependency. +- The check does not bypass existing policy enforcement — it is *additional* + to the normal grant/invoke pipeline. + +## Related + +- `examples/repository_safety_check.py` — runnable, offline. +- [VibeGuard](https://github.com/dgenio/vibeguard) +- [weaver-spec](https://github.com/dgenio/weaver-spec) diff --git a/docs/security.md b/docs/security.md index 880eb50..dea2ae8 100644 --- a/docs/security.md +++ b/docs/security.md @@ -15,6 +15,7 @@ | Handle scope escape (expand exceeds grant) | Handles persist grant constraints; `HandleStore.expand` rechecks `max_rows`, `allowed_fields`, `scope`, and principal binding (#76) | | Memory exfiltration via tool output | `SensitivityTag.MEMORY` capabilities gate sensitive reads and durable writes; `ActionTrace.args` redacts payload-like fields for `memory.*` capabilities (#75) | | Raw memory payload reaching audit log | Kernel strips `payload`/`content`/`value`/`memory`/`text`/`body` from `ActionTrace.args` for `memory.*` capabilities | +| Scanned content / raw result reaching audit log | `ActionTrace.result_summary` is built only from the post-firewall `Frame` (counts and flags, never raw driver data), so the audit trail records an invocation's outcome without re-introducing the data the firewall removed | ## Token scopes diff --git a/examples/contextweaver_policy_flow.py b/examples/contextweaver_policy_flow.py new file mode 100644 index 0000000..d380c61 --- /dev/null +++ b/examples/contextweaver_policy_flow.py @@ -0,0 +1,250 @@ +"""contextweaver_policy_flow.py — routing is advisory; policy still decides. + +The written walkthrough lives in ``docs/integrations/contextweaver.md``. This +script is the runnable companion. It demonstrates the Weaver-ecosystem +``policy-before-action`` contract: a context-compilation layer (``contextweaver``) +may *narrow* what the model considers, but it never *grants* permission. Every +selected action still flows through the agent-kernel pipeline (policy → token → +invoke → firewall → trace) before anything executes. + +``contextweaver`` is not a dependency of this example. The +``compile_tool_shortlist`` helper is a tiny, deterministic stand-in for the +tool-card shortlist a real contextweaver build would produce, so the demo runs +offline and in CI. + +What this demo proves end-to-end: + 1. The context layer shortlists candidate capabilities for a goal — including + one the principal is *not* allowed to use. Being on the shortlist is not + authorization. + 2. ``allow`` — a READ action the principal satisfies executes and is audited. + 3. ``ask`` — a WRITE action with no justification is denied with a + *recoverable* ``insufficient_justification`` code; the host treats this as + "ask the human to confirm", supplies the justification, and re-grants. + 4. ``deny`` — a DESTRUCTIVE action the principal can never satisfy is denied + with a *terminal* ``missing_role`` code; confirmation cannot recover it. + +Run with: ``python examples/contextweaver_policy_flow.py`` +""" + +from __future__ import annotations + +import asyncio + +from agent_kernel import ( + Capability, + CapabilityRegistry, + HMACTokenProvider, + InMemoryDriver, + Kernel, + Principal, + SafetyClass, + StaticRouter, +) +from agent_kernel.drivers.base import ExecutionContext +from agent_kernel.errors import PolicyDenied +from agent_kernel.models import CapabilityRequest, ImplementationRef +from agent_kernel.policy_reasons import DenialReason + +_SECRET = "example-secret-do-not-use-in-prod" + + +def build_registry() -> CapabilityRegistry: + """Register one capability per outcome the demo exercises.""" + registry = CapabilityRegistry() + registry.register( + Capability( + capability_id="docs.search", + name="Search Docs", + description="Search the internal knowledge base for an answer", + safety_class=SafetyClass.READ, + tags=["docs", "search", "knowledge", "answer"], + impl=ImplementationRef(driver_id="memory", operation="docs_search"), + ) + ) + registry.register( + Capability( + capability_id="tickets.update_status", + name="Update Ticket Status", + description="Change the status of a support ticket", + safety_class=SafetyClass.WRITE, + tags=["tickets", "update", "status", "support"], + impl=ImplementationRef(driver_id="memory", operation="update_status"), + ) + ) + registry.register( + Capability( + capability_id="tickets.delete", + name="Delete Ticket", + description="Permanently delete a support ticket", + safety_class=SafetyClass.DESTRUCTIVE, + tags=["tickets", "delete", "support"], + impl=ImplementationRef(driver_id="memory", operation="delete_ticket"), + ) + ) + return registry + + +def build_driver() -> InMemoryDriver: + """A driver whose handlers return tiny, synthetic, PII-free payloads.""" + driver = InMemoryDriver() + + def docs_search(ctx: ExecutionContext) -> list[dict[str, object]]: + query = str(ctx.args.get("query", "")) + return [ + {"doc": "refunds.md", "score": 0.92, "matched": query}, + {"doc": "billing-faq.md", "score": 0.71, "matched": query}, + ] + + def update_status(ctx: ExecutionContext) -> dict[str, object]: + return {"ticket": ctx.args.get("ticket_id", "T-000"), "status": "resolved"} + + def delete_ticket( + ctx: ExecutionContext, + ) -> dict[str, object]: # pragma: no cover - never reached + return {"ticket": ctx.args.get("ticket_id", "T-000"), "deleted": True} + + driver.register_handler("docs_search", docs_search) + driver.register_handler("update_status", update_status) + driver.register_handler("delete_ticket", delete_ticket) + return driver + + +def compile_tool_shortlist(goal: str, capabilities: list[Capability]) -> list[Capability]: + """Stand-in for a contextweaver tool-card shortlist. + + Ranks capabilities by deterministic keyword overlap with *goal* (no + randomness — the Weaver ecosystem forbids non-deterministic routing) and + returns the matches in descending score order. This mirrors what a real + contextweaver build would hand the model: a *narrowed* candidate set. It is + deliberately permissive — it may include capabilities the principal is not + authorized to use, because routing does not equal permission. + """ + goal_terms = {term for term in goal.lower().split() if term} + scored: list[tuple[int, int, Capability]] = [] + for index, cap in enumerate(capabilities): + overlap = len(goal_terms & {tag.lower() for tag in cap.tags}) + if overlap: + # Sort key: higher overlap first, then registration order for stability. + scored.append((-overlap, index, cap)) + scored.sort() + return [cap for _, _, cap in scored] + + +def build_kernel() -> Kernel: + """Wire a kernel with explicit routes for the three demo capabilities.""" + router = StaticRouter( + routes={ + "docs.search": ["memory"], + "tickets.update_status": ["memory"], + "tickets.delete": ["memory"], + } + ) + kernel = Kernel( + registry=build_registry(), + token_provider=HMACTokenProvider(secret=_SECRET), + router=router, + ) + kernel.register_driver(build_driver()) + return kernel + + +async def main() -> None: + kernel = build_kernel() + # A support agent: may read and write, but is not an admin. + agent = Principal(principal_id="support-agent-7", roles=["reader", "writer"]) + + print("=== Step 1: contextweaver compiles a tool-card shortlist ===") + goal = "resolve a support ticket and search billing docs" + shortlist = compile_tool_shortlist(goal, kernel.list_capabilities()) + for cap in shortlist: + print(f" • {cap.capability_id} ({cap.safety_class.value})") + # Routing is advisory: the destructive capability is shortlisted even though + # this principal can never invoke it. The shortlist is a suggestion, not a grant. + shortlisted_ids = {cap.capability_id for cap in shortlist} + assert "tickets.delete" in shortlisted_ids, ( + "demo expects the destructive capability on the shortlist to prove " + "that contextweaver routing does not imply authorization" + ) + print(" (note: a DESTRUCTIVE capability is shortlisted — routing is not permission)") + + print("\n=== Step 2: ALLOW — a READ action the principal satisfies ===") + search_req = CapabilityRequest(capability_id="docs.search", goal=goal) + token = kernel.get_token(search_req, agent, justification="") + frame = await kernel.invoke( + token, principal=agent, args={"operation": "docs_search", "query": "refund"} + ) + print(f" outcome: allow ({frame.response_mode})") + for fact in frame.facts: + print(f" • {fact}") + trace = kernel.explain(frame.action_id) + assert trace.capability_id == "docs.search" + print(f" audited: action_id={trace.action_id} driver={trace.driver_id}") + + print("\n=== Step 3: ASK/CONFIRM — a WRITE action missing its justification ===") + update_req = CapabilityRequest( + capability_id="tickets.update_status", + goal="mark the customer's ticket resolved", + ) + try: + kernel.get_token(update_req, agent, justification="") + except PolicyDenied as exc: + # A missing/short justification is a *recoverable* gate: the host should + # ask the human to confirm and supply a reason, then retry. This is how + # an "ask/confirm" outcome is represented on top of a binary policy. + assert exc.reason_code == DenialReason.INSUFFICIENT_JUSTIFICATION + outcome = _classify(exc.reason_code) + print(f" outcome: {outcome} (reason_code={exc.reason_code})") + print(" host action: prompt the human, capture confirmation + justification") + else: # pragma: no cover - defensive + raise SystemExit("Expected PolicyDenied for a WRITE call with no justification") + + # Human confirmed — re-grant with the supplied justification. + confirmed = "agent confirmed: customer verified, closing as resolved per call notes" + token = kernel.get_token(update_req, agent, justification=confirmed) + frame = await kernel.invoke( + token, + principal=agent, + args={"operation": "update_status", "ticket_id": "T-4821"}, + ) + print(f" after confirm: allow ({frame.response_mode})") + for fact in frame.facts: + print(f" • {fact}") + + print("\n=== Step 4: DENY — a DESTRUCTIVE action the principal cannot satisfy ===") + delete_req = CapabilityRequest( + capability_id="tickets.delete", goal="delete the duplicate ticket" + ) + try: + kernel.get_token( + delete_req, + agent, + justification="duplicate ticket cleanup requested by the customer", + ) + except PolicyDenied as exc: + # A missing role is *terminal*: no amount of confirmation lets a + # non-admin run a DESTRUCTIVE capability. The host must not retry. + assert exc.reason_code == DenialReason.MISSING_ROLE + outcome = _classify(exc.reason_code) + print(f" outcome: {outcome} (reason_code={exc.reason_code})") + explanation = kernel.explain_denial(delete_req, agent, justification="cleanup") + print(f" remediation: {explanation.remediation[0]}") + else: # pragma: no cover - defensive + raise SystemExit("Expected PolicyDenied for a non-admin on a DESTRUCTIVE capability") + + print("\n✓ contextweaver_policy_flow.py complete.") + + +def _classify(reason_code: str | None) -> str: + """Map a denial reason code to a host-level outcome. + + Only ``insufficient_justification`` is treated as a recoverable + ``ask``/confirm gate; every other denial is ``deny`` (terminal for this + request as-is). + """ + if reason_code == DenialReason.INSUFFICIENT_JUSTIFICATION: + return "ask" + return "deny" + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/repository_safety_check.py b/examples/repository_safety_check.py new file mode 100644 index 0000000..3b0580a --- /dev/null +++ b/examples/repository_safety_check.py @@ -0,0 +1,292 @@ +"""repository_safety_check.py — gate a high-impact action behind a code check. + +The written walkthrough lives in +``docs/integrations/repository_safety_check.md``. This script is the runnable +companion. It shows the Weaver-ecosystem pattern for running a deterministic +repository check *as a policy-controlled capability* before an agent performs a +high-impact action (publishing an artifact): + + 1. ``repo.code_safety_check`` is a capability backed by a driver that *shells + out* to a local command. agent-kernel records the command result in an + audit trace, exactly like any other capability. + 2. ``repo.publish_artifact`` is the high-impact WRITE action. The host refuses + to grant it until a fresh safety check has passed. + 3. A clean change passes the check and publishes. + 4. A risky change produces findings; the host blocks the publish and the + check result is still auditable. + +The checker here is a tiny embedded scanner run via ``sys.executable`` so the +demo is deterministic, offline, and dependency-free. In production you would +point ``RepositoryCheckDriver`` at a real command such as ``vibeguard`` (e.g. +``["vibeguard", "scan", "--json"]``); agent-kernel does not depend on any +specific checker. + +Run with: ``python examples/repository_safety_check.py`` +""" + +from __future__ import annotations + +import asyncio +import json +import os +import sys +import tempfile + +from agent_kernel import ( + Capability, + CapabilityRegistry, + HMACTokenProvider, + InMemoryDriver, + Kernel, + Principal, + SafetyClass, + StaticRouter, +) +from agent_kernel.drivers.base import ExecutionContext +from agent_kernel.errors import DriverError +from agent_kernel.models import CapabilityRequest, ImplementationRef, RawResult + +_SECRET = "example-secret-do-not-use-in-prod" + +# Embedded stand-in for a real checker (e.g. VibeGuard). Reads a file path from +# argv[1], scans for a few banned patterns, prints findings as a JSON array on +# stdout, and exits 1 when it finds anything (0 when clean) — the exit-code +# convention most repo scanners follow. Kept deliberately small and +# deterministic so this example runs in CI without a real scanner installed. +_CHECKER_SOURCE = r""" +import json +import sys + +BANNED = { + "os.system(": "use of os.system — prefer a sandboxed driver", + "eval(": "use of eval on untrusted input", + "subprocess.call(": "unchecked subprocess call", + "AKIA": "hardcoded AWS access key id", +} + +path = sys.argv[1] +with open(path, encoding="utf-8") as handle: + lines = handle.readlines() + +findings = [] +for number, text in enumerate(lines, start=1): + for needle, message in BANNED.items(): + if needle in text: + findings.append( + {"rule": needle.rstrip("("), "severity": "high", "line": number, "message": message} + ) + +json.dump(findings, sys.stdout) +sys.exit(1 if findings else 0) +""" + +# Two synthetic, in-repo "staged changes". One is clean; one trips the scanner. +_CLEAN_SNIPPET = '''\ +def total_price(items): + """Sum the price of every item.""" + return sum(item["price"] for item in items) +''' + +_RISKY_SNIPPET = """\ +import os + +def purge_cache(): + os.system("rm -rf /tmp/build-cache") # blocking finding + +AWS_KEY = "AKIAIOSFODNN7EXAMPLE" # blocking finding +""" + + +class RepositoryCheckDriver: + """Driver that runs a repository safety check by shelling out to a command. + + The command is invoked as ``[*command, path]`` and is expected to print a + JSON array of findings to stdout and exit non-zero when findings exist + (exit ``0`` = clean, ``1`` = findings). Any other exit code is treated as + the checker itself failing and surfaces as a :class:`DriverError`. + """ + + def __init__(self, command: list[str], *, driver_id: str = "repo_safety") -> None: + self._command = list(command) + self._driver_id = driver_id + + @property + def driver_id(self) -> str: + """Unique identifier for this driver.""" + return self._driver_id + + async def execute(self, ctx: ExecutionContext) -> RawResult: + """Run the configured checker against ``ctx.args['path']``.""" + path = ctx.args.get("path") + if not path: + raise DriverError( + "repository check requires a 'path' argument naming the file to scan" + ) + proc = await asyncio.create_subprocess_exec( + *self._command, + str(path), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await proc.communicate() + if proc.returncode not in (0, 1): + detail = stderr.decode(errors="replace").strip() + raise DriverError( + f"repository check command exited {proc.returncode}: {detail or ''}" + ) + try: + findings = json.loads(stdout or b"[]") + except json.JSONDecodeError as exc: + raise DriverError(f"repository check produced non-JSON output: {stdout!r}") from exc + return RawResult( + capability_id=ctx.capability_id, + data=findings, + metadata={"exit_code": proc.returncode}, + ) + + +def build_kernel() -> Kernel: + """Wire a kernel with the check capability and the gated publish action.""" + registry = CapabilityRegistry() + registry.register( + Capability( + capability_id="repo.code_safety_check", + name="Repository Safety Check", + description="Run a deterministic safety scan over staged changes", + safety_class=SafetyClass.READ, + tags=["repo", "safety", "check", "scan"], + impl=ImplementationRef(driver_id="repo_safety", operation="code_safety_check"), + ) + ) + registry.register( + Capability( + capability_id="repo.publish_artifact", + name="Publish Artifact", + description="Publish a build artifact to the release channel", + safety_class=SafetyClass.WRITE, + tags=["repo", "publish", "release", "artifact"], + impl=ImplementationRef(driver_id="memory", operation="publish"), + ) + ) + + publish_driver = InMemoryDriver() + + def publish(ctx: ExecutionContext) -> dict[str, object]: + return {"artifact": ctx.args.get("artifact", "build.tar.gz"), "published": True} + + publish_driver.register_handler("publish", publish) + + router = StaticRouter( + routes={ + "repo.code_safety_check": ["repo_safety"], + "repo.publish_artifact": ["memory"], + } + ) + kernel = Kernel( + registry=registry, + token_provider=HMACTokenProvider(secret=_SECRET), + router=router, + ) + kernel.register_driver(RepositoryCheckDriver(command=[sys.executable, "-c", _CHECKER_SOURCE])) + kernel.register_driver(publish_driver) + return kernel + + +async def run_safety_check( + kernel: Kernel, principal: Principal, path: str +) -> tuple[bool, list[dict[str, object]], str]: + """Run the policy-controlled safety check and return ``(passed, findings, action_id)``. + + Reads findings from the firewalled Frame in ``table`` mode: an empty preview + means no findings (the change is clean). This keeps the gate decision on the + audited Frame rather than on raw driver output. + """ + request = CapabilityRequest( + capability_id="repo.code_safety_check", + goal="scan staged changes before publishing", + ) + token = kernel.get_token(request, principal, justification="") + frame = await kernel.invoke( + token, + principal=principal, + args={"operation": "code_safety_check", "path": path}, + response_mode="table", + ) + findings = list(frame.table_preview) + return (not findings, findings, frame.action_id) + + +async def publish_artifact(kernel: Kernel, principal: Principal, artifact: str) -> str: + """Grant + invoke the gated publish action and return its audit ``action_id``.""" + request = CapabilityRequest( + capability_id="repo.publish_artifact", + goal="publish the build artifact to the release channel", + ) + token = kernel.get_token( + request, + principal, + justification="release approved: safety check passed, publishing tagged build", + ) + frame = await kernel.invoke( + token, principal=principal, args={"operation": "publish", "artifact": artifact} + ) + return frame.action_id + + +async def attempt_publish(kernel: Kernel, principal: Principal, label: str, snippet: str) -> None: + """Run the check-then-publish gate over one staged change.""" + print(f"\n=== Change: {label} ===") + with tempfile.TemporaryDirectory() as tmpdir: + path = os.path.join(tmpdir, "staged_change.py") + with open(path, "w", encoding="utf-8") as handle: + handle.write(snippet) + + passed, findings, check_action = await run_safety_check(kernel, principal, path) + print(f" check: {'PASS' if passed else 'BLOCK'} ({len(findings)} finding(s))") + for finding in findings: + print( + f" • line {finding.get('line')}: {finding.get('rule')} — {finding.get('message')}" + ) + # The safety check is always auditable, whether it passed or blocked. + # The trace records a redaction-safe result summary derived from the + # firewalled Frame, so the pass/block *decision* — not just the fact a + # check ran — is reconstructable from the audit trail alone. + check_trace = kernel.explain(check_action) + summary = check_trace.result_summary or {} + audited_block = summary.get("row_count", 0) > 0 + print( + f" audited check: action_id={check_trace.action_id} " + f"driver={check_trace.driver_id} result_summary={summary}" + ) + assert audited_block == (not passed), ( + "the audit trace must record the check decision: a blocked check has " + "findings rows in result_summary, a passing one has zero" + ) + + if not passed: + # Policy-controlled gate: do NOT grant the publish capability when + # the safety check found blocking issues. + print(" publish: SKIPPED — safety check returned blocking findings") + assert findings, "block path must carry findings" + return + + publish_action = await publish_artifact(kernel, principal, artifact=f"{label}.tar.gz") + publish_trace = kernel.explain(publish_action) + assert not findings, "pass path must have zero findings" + print(f" publish: DONE — action_id={publish_trace.action_id}") + + +async def main() -> None: + kernel = build_kernel() + # A release agent: may read (run checks) and write (publish), not an admin. + agent = Principal(principal_id="release-bot", roles=["reader", "writer"]) + + print("=== Repository safety check as a policy-controlled capability ===") + await attempt_publish(kernel, agent, "clean-change", _CLEAN_SNIPPET) + await attempt_publish(kernel, agent, "risky-change", _RISKY_SNIPPET) + + print("\n✓ repository_safety_check.py complete.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/agent_kernel/kernel/_invoke.py b/src/agent_kernel/kernel/_invoke.py index 9bfbe9f..bc14a3e 100644 --- a/src/agent_kernel/kernel/_invoke.py +++ b/src/agent_kernel/kernel/_invoke.py @@ -66,6 +66,24 @@ def _redact_args_for_trace(capability_id: str, args: dict[str, Any]) -> dict[str } +def _frame_result_summary(frame: Frame) -> dict[str, Any]: + """Build a redaction-safe result summary from a *firewalled* Frame. + + Records only counts and flags taken from the already-transformed Frame — + never raw driver data — so it preserves the I-01 boundary the Firewall + enforces and keeps sensitive payloads out of the audit trail. Stored on + :attr:`~agent_kernel.models.ActionTrace.result_summary` so an invocation's + outcome (e.g. a safety check's pass/block decision) is auditable via + :meth:`~agent_kernel.Kernel.explain`. + """ + return { + "fact_count": len(frame.facts), + "row_count": len(frame.table_preview), + "warning_count": len(frame.warnings), + "has_handle": frame.handle is not None, + } + + def resolve_effective_mode( *, response_mode: ResponseMode, @@ -159,6 +177,7 @@ def record_success_trace( response_mode: ResponseMode, driver_id: str, handle_id: str | None, + result_summary: dict[str, Any] | None, trace_store: TraceStore, ) -> None: """Persist an :class:`ActionTrace` for a successful invocation.""" @@ -173,6 +192,7 @@ def record_success_trace( response_mode=response_mode, driver_id=driver_id, handle_id=handle_id, + result_summary=result_summary, ) ) @@ -294,6 +314,7 @@ async def perform_invoke( response_mode=frame.response_mode, driver_id=used_driver_id, handle_id=handle.handle_id if handle else None, + result_summary=_frame_result_summary(frame), trace_store=kernel._traces, ) logger.info( diff --git a/src/agent_kernel/kernel/_stream.py b/src/agent_kernel/kernel/_stream.py index 5d5a27c..cfdcc3a 100644 --- a/src/agent_kernel/kernel/_stream.py +++ b/src/agent_kernel/kernel/_stream.py @@ -34,7 +34,7 @@ RoutePlan, ) from ..tokens import CapabilityToken -from ._invoke import _redact_args_for_trace, resolve_effective_mode +from ._invoke import _frame_result_summary, _redact_args_for_trace, resolve_effective_mode if TYPE_CHECKING: # pragma: no cover from . import Kernel @@ -151,6 +151,7 @@ async def invoke_stream_impl( response_mode=(last_frame.response_mode if last_frame else initial_mode), driver_id=fallback_driver_id, handle_id=handle.handle_id if handle else None, + result_summary=(_frame_result_summary(last_frame) if last_frame else None), error=None if yielded_any else "stream produced no chunks", ) ) diff --git a/src/agent_kernel/models.py b/src/agent_kernel/models.py index 93ca5a1..4d47512 100644 --- a/src/agent_kernel/models.py +++ b/src/agent_kernel/models.py @@ -415,6 +415,17 @@ class ActionTrace: driver_id: str handle_id: str | None = None error: str | None = None + result_summary: dict[str, Any] | None = None + """Redaction-safe summary of the firewalled :class:`Frame` this invocation + produced (``None`` for failed runs, which have no Frame). + + Derived **only** from the post-Firewall Frame — counts and flags, never raw + driver data — so recording it cannot widen the I-01 boundary or leak + sensitive payloads into the audit trail. It lets a reviewer reconstruct an + invocation's outcome directly from :meth:`~agent_kernel.Kernel.explain`; for + example, a repository safety check passed iff ``result_summary["row_count"] + == 0``. + """ # ── Policy explanation ──────────────────────────────────────────────────────── diff --git a/tests/test_kernel.py b/tests/test_kernel.py index de43cad..7a79147 100644 --- a/tests/test_kernel.py +++ b/tests/test_kernel.py @@ -82,6 +82,35 @@ async def test_invoke_handle_only_mode(kernel: Kernel, reader_principal: Princip assert frame.handle is not None +@pytest.mark.asyncio +async def test_trace_records_result_summary(kernel: Kernel, reader_principal: Principal) -> None: + """A successful invocation records a redaction-safe result summary. + + The summary is derived from the firewalled Frame (counts/flags only), so the + invocation's outcome is auditable directly from ``explain`` without + re-introducing raw driver data. + """ + req = CapabilityRequest(capability_id="billing.list_invoices", goal="summary") + token = kernel.get_token(req, reader_principal, justification="") + frame = await kernel.invoke( + token, + principal=reader_principal, + args={"operation": "billing.list_invoices"}, + response_mode="table", + ) + trace = kernel.explain(frame.action_id) + assert trace.result_summary is not None + assert set(trace.result_summary) == { + "fact_count", + "row_count", + "warning_count", + "has_handle", + } + # Counts mirror the firewalled Frame the caller received. + assert trace.result_summary["row_count"] == len(frame.table_preview) + assert trace.result_summary["has_handle"] == (frame.handle is not None) + + # ── Denial flow ──────────────────────────────────────────────────────────────── diff --git a/tests/test_trace.py b/tests/test_trace.py index 5804059..9a4b498 100644 --- a/tests/test_trace.py +++ b/tests/test_trace.py @@ -56,3 +56,10 @@ def test_explain_returns_consistent_data() -> None: assert result.principal_id == "u1" assert result.driver_id == "memory" assert result.args == {"a": 1} + + +def test_result_summary_defaults_none() -> None: + # Backward-compatible: traces built without an explicit result_summary + # (e.g. failure traces, or callers constructing ActionTrace directly) keep + # it unset rather than fabricating a summary. + assert _trace("act-default").result_summary is None