-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtutorial.py
More file actions
235 lines (206 loc) · 8.59 KB
/
tutorial.py
File metadata and controls
235 lines (206 loc) · 8.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
"""tutorial.py — "Secure your first MCP tool in 5 minutes" (offline edition).
The full written walkthrough lives in ``docs/tutorial.md``. This script is the
runnable companion: it covers every step a new reader will see, using only
the in-process :class:`InMemoryDriver` so it has zero external dependencies
and runs in CI.
What this demo proves end-to-end:
1. Registering a capability with a sensitivity tag and ``allowed_fields``.
2. Issuing a signed token for a principal that satisfies policy.
3. Invoking the capability and observing the Frame in three response modes
(``summary`` / ``table`` / ``handle_only``) — PII never appears in any
of them.
4. Expanding a Handle to retrieve filtered raw data on demand.
5. A policy denial: the same token model rejects a writer call from a
reader principal, and the denial carries a stable ``reason_code``.
6. Auditability: ``explain()`` returns the full :class:`ActionTrace`.
Run with: ``python examples/tutorial.py``
"""
from __future__ import annotations
import asyncio
import os
os.environ.setdefault("WEAVER_KERNEL_SECRET", "tutorial-secret-do-not-use-in-prod")
from weaver_kernel import (
Capability,
CapabilityRegistry,
HMACTokenProvider,
InMemoryDriver,
Kernel,
Principal,
SafetyClass,
SensitivityTag,
StaticRouter,
)
from weaver_kernel.drivers.base import ExecutionContext
from weaver_kernel.errors import HandleConstraintViolation, PolicyDenied
from weaver_kernel.models import CapabilityRequest, ImplementationRef
# A tiny, deterministic dataset that mixes safe and PII-bearing fields.
# Email is present on purpose: the firewall must keep it out of the LLM-safe
# Frame unless the capability declared it under ``allowed_fields``.
INVOICES: list[dict[str, object]] = [
{
"id": "INV-001",
"customer_name": "Alice",
"email": "alice@example.com",
"amount": 120.0,
"status": "paid",
},
{
"id": "INV-002",
"customer_name": "Bob",
"email": "bob@example.com",
"amount": 540.0,
"status": "unpaid",
},
{
"id": "INV-003",
"customer_name": "Carol",
"email": "carol@example.com",
"amount": 75.0,
"status": "paid",
},
]
def build_registry() -> CapabilityRegistry:
"""Register one READ capability (PII-tagged) and one WRITE capability."""
registry = CapabilityRegistry()
registry.register(
Capability(
capability_id="billing.invoices.list",
name="List Invoices",
description="List recent invoices",
safety_class=SafetyClass.READ,
sensitivity=SensitivityTag.PII,
# The Firewall will drop every column that isn't on this list.
allowed_fields=["id", "customer_name", "amount", "status"],
tags=["billing", "invoices", "list"],
impl=ImplementationRef(driver_id="memory", operation="list_invoices"),
)
)
registry.register(
Capability(
capability_id="billing.invoices.create",
name="Create Invoice",
description="Create a new invoice",
safety_class=SafetyClass.WRITE,
tags=["billing", "invoices", "create"],
impl=ImplementationRef(driver_id="memory", operation="create_invoice"),
)
)
return registry
def build_driver() -> InMemoryDriver:
"""A driver that returns the synthetic invoices dataset on read."""
driver = InMemoryDriver()
def list_invoices(_ctx: ExecutionContext) -> list[dict[str, object]]:
return list(INVOICES)
def create_invoice(_ctx: ExecutionContext) -> dict[str, object]:
return {"id": "INV-004", "status": "draft"}
driver.register_handler("list_invoices", list_invoices)
driver.register_handler("create_invoice", create_invoice)
return driver
async def main() -> None:
print("=== Step 1: Register capabilities ===")
registry = build_registry()
for cap in registry.list_all():
print(f" • {cap.capability_id} ({cap.safety_class.value})")
print("\n=== Step 2: Wire the kernel ===")
router = StaticRouter(
routes={
"billing.invoices.list": ["memory"],
"billing.invoices.create": ["memory"],
}
)
kernel = Kernel(
registry=registry,
token_provider=HMACTokenProvider(secret="tutorial-secret-do-not-use-in-prod"),
router=router,
)
kernel.register_driver(build_driver())
# PII-tagged capabilities require a ``tenant`` attribute on the principal.
reader = Principal(principal_id="alice", roles=["reader"], attributes={"tenant": "acme"})
print(f" principal: {reader.principal_id} roles={reader.roles}")
print("\n=== Step 3: Grant a token ===")
list_req = CapabilityRequest(
capability_id="billing.invoices.list", goal="list recent invoices"
)
token = kernel.get_token(list_req, reader, justification="")
print(f" token_id: {token.token_id}")
print(f" capability: {token.capability_id}")
print(f" expires_at: {token.expires_at.isoformat()}")
print("\n=== Step 4: Invoke in summary mode ===")
frame = await kernel.invoke(token, principal=reader, args={"operation": "list_invoices"})
print(f" mode: {frame.response_mode}")
print(" facts:")
for fact in frame.facts:
print(f" • {fact}")
if frame.handle:
print(f" handle: {frame.handle.handle_id}")
print("\n=== Step 5: Invoke in table mode (allowed_fields enforced) ===")
table_token = kernel.get_token(list_req, reader, justification="")
table_frame = await kernel.invoke(
table_token,
principal=reader,
args={"operation": "list_invoices"},
response_mode="table",
)
print(f" mode: {table_frame.response_mode}")
print(f" rows shown: {len(table_frame.table_preview)}")
print(" preview:")
for row in table_frame.table_preview[:2]:
print(f" {row}")
leaked = [row for row in table_frame.table_preview if "email" in row]
assert leaked == [], (
f"firewall regression: 'email' is not in allowed_fields but reached "
f"the table-mode Frame in {len(leaked)} row(s): {leaked}"
)
print(f" PII fields leaked into Frame: {len(leaked)} (asserted == 0)")
print("\n=== Step 6: Invoke in handle_only mode and expand ===")
handle_token = kernel.get_token(list_req, reader, justification="")
handle_frame = await kernel.invoke(
handle_token,
principal=reader,
args={"operation": "list_invoices"},
response_mode="handle_only",
)
assert handle_frame.handle is not None, "handle_only mode must return a Handle"
expanded = kernel.expand(
handle_frame.handle,
query={"offset": 0, "limit": 2, "fields": ["id", "amount"]},
principal=reader,
)
print(f" expanded rows: {len(expanded.table_preview)}")
for row in expanded.table_preview:
print(f" {row}")
# Prove the new grant-constraint enforcement (#76): requesting a field the
# grant doesn't allow must raise HandleConstraintViolation. Without this
# check a future regression on the expand path would silently leak data
# that the firewall already excluded from the summary/table previews.
try:
kernel.expand(
handle_frame.handle,
query={"fields": ["email"]},
principal=reader,
)
except HandleConstraintViolation as exc:
print(f" blocked disallowed field: reason_code={exc.reason_code}")
else: # pragma: no cover - defensive
raise SystemExit("Expected HandleConstraintViolation for disallowed field on expand")
print("\n=== Step 7: Watch policy enforcement deny a writer call ===")
create_req = CapabilityRequest(
capability_id="billing.invoices.create", goal="create an invoice"
)
try:
kernel.get_token(create_req, reader, justification="reader trying a write — should fail")
except PolicyDenied as exc:
print(f" denied: {exc}")
print(f" reason_code: {exc.reason_code}")
else: # pragma: no cover - defensive
raise SystemExit("Expected PolicyDenied for reader on a WRITE capability")
print("\n=== Step 8: Audit the read with explain() ===")
trace = kernel.explain(frame.action_id)
print(f" action_id: {trace.action_id}")
print(f" capability: {trace.capability_id}")
print(f" principal: {trace.principal_id}")
print(f" driver: {trace.driver_id}")
print(f" invoked_at: {trace.invoked_at.isoformat()}")
print("\n✓ tutorial.py complete.")
if __name__ == "__main__":
asyncio.run(main())