-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathmain.py
More file actions
561 lines (484 loc) · 21.6 KB
/
main.py
File metadata and controls
561 lines (484 loc) · 21.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
# Stagehand + Browserbase + Extend: Download Expense Receipts and Parse with Extend AI
# See README.md for full documentation
import asyncio
import csv
import json
import os
import webbrowser
import zipfile
from pathlib import Path
from browserbase import APIStatusError, Browserbase
from dotenv import load_dotenv
from extend_ai import Extend
from stagehand import AsyncStagehand
# Load environment variables from .env file
# Required: BROWSERBASE_API_KEY
# Optional: EXTEND_API_KEY
load_dotenv()
# Receipt extraction config for Extend AI
# Uses extraction_light base extractor with parse_performance engine for low latency
RECEIPT_EXTRACTION_CONFIG = {
"baseProcessor": "extraction_light",
"baseVersion": "3.4.0",
"parseConfig": {
"engine": "parse_performance",
"target": "markdown",
"blockOptions": {
"text": {
"agentic": {"enabled": False},
"signatureDetectionEnabled": False,
},
"tables": {
"agentic": {"enabled": False},
"targetFormat": "markdown",
"cellBlocksEnabled": False,
"tableHeaderContinuationEnabled": False,
},
"figures": {
"enabled": False,
"figureImageClippingEnabled": False,
},
},
"engineVersion": "1.0.1",
"advancedOptions": {
"engine": "parse_performance",
"agenticOcrEnabled": False,
"pageBreaksEnabled": True,
"pageRotationEnabled": False,
"verticalGroupingThreshold": 1,
},
"chunkingStrategy": {"type": "document"},
},
"schema": {
"type": "object",
"required": [
"vendor_name",
"receipt_date",
"receipt_number",
"total_amount",
"subtotal_amount",
"tax_amount",
"line_items",
"payment_method",
],
"properties": {
"vendor_name": {
"type": ["string", "null"],
"description": "The name of the merchant or vendor on the receipt.",
},
"receipt_date": {
"type": ["string", "null"],
"description": "The date of the transaction shown on the receipt.",
"extend:type": "date",
},
"receipt_number": {
"type": ["string", "null"],
"description": "The receipt or transaction number, if present.",
},
"total_amount": {
"type": "object",
"required": ["amount", "iso_4217_currency_code"],
"properties": {
"amount": {"type": ["number", "null"]},
"iso_4217_currency_code": {"type": ["string", "null"]},
},
"description": "The total amount paid on the receipt.",
"extend:type": "currency",
"additionalProperties": False,
},
"subtotal_amount": {
"type": "object",
"required": ["amount", "iso_4217_currency_code"],
"properties": {
"amount": {"type": ["number", "null"]},
"iso_4217_currency_code": {"type": ["string", "null"]},
},
"description": "The subtotal before tax, if shown.",
"extend:type": "currency",
"additionalProperties": False,
},
"tax_amount": {
"type": "object",
"required": ["amount", "iso_4217_currency_code"],
"properties": {
"amount": {"type": ["number", "null"]},
"iso_4217_currency_code": {"type": ["string", "null"]},
},
"description": "The tax amount on the receipt.",
"extend:type": "currency",
"additionalProperties": False,
},
"line_items": {
"type": "array",
"items": {
"type": "object",
"required": ["description", "quantity", "unit_price", "amount"],
"properties": {
"description": {
"type": ["string", "null"],
"description": "Description of the item purchased.",
},
"quantity": {
"type": ["number", "null"],
"description": "Quantity of the item, if shown.",
},
"unit_price": {
"type": ["number", "null"],
"description": "Price per unit, if shown.",
},
"amount": {
"type": ["number", "null"],
"description": "Total amount for this line item.",
},
},
"additionalProperties": False,
},
"description": "Individual items on the receipt.",
},
"payment_method": {
"type": ["string", "null"],
"description": "The payment method used (e.g., cash, credit card, etc.).",
},
},
"additionalProperties": False,
},
"advancedOptions": {
"advancedMultimodalEnabled": False,
"citationsEnabled": True,
"arrayCitationStrategy": "item",
"pageRanges": [],
"chunkingOptions": {},
"advancedFigureParsingEnabled": True,
},
}
def open_in_browser(url: str) -> None:
"""Opens a URL in the default browser for live view and dashboard links."""
try:
webbrowser.open(url)
except Exception:
print(f"Could not auto-open: {url}")
# Polls Browserbase API for completed downloads with retry logic
async def save_downloads_with_retry(
bb: Browserbase, session_id: str, retry_for_seconds: int = 60
) -> int:
"""
Polls Browserbase API for downloads with timeout handling.
Browserbase stores downloaded files during a session and makes them available
via API. Files may take a few seconds to process, so this function implements
retry logic to wait for downloads to be ready before retrieving them.
Args:
bb: Browserbase client instance for API calls
session_id: The Browserbase session ID to retrieve downloads from
retry_for_seconds: Maximum time to wait for downloads (default: 60 seconds)
Returns:
int: The size of the downloaded ZIP file in bytes
Raises:
TimeoutError: If downloads aren't ready within the specified timeout
"""
print(f"Waiting up to {retry_for_seconds} seconds for downloads to complete...")
# Track elapsed time to implement timeout without using threading timers
start_time = asyncio.get_event_loop().time()
timeout = retry_for_seconds
while True:
elapsed = asyncio.get_event_loop().time() - start_time
# Check if we've exceeded the timeout period
if elapsed >= timeout:
raise TimeoutError("Download timeout exceeded")
try:
print("Checking for downloads...")
# Fetch downloads from Browserbase API and save to disk when ready
# Use asyncio.to_thread for synchronous Browserbase SDK calls
# This prevents blocking the event loop while waiting for API responses
response = await asyncio.to_thread(bb.sessions.downloads.list, session_id)
download_buffer = await asyncio.to_thread(response.read)
# Save downloads to disk when file size indicates content is available
# Empty zip files are ~22 bytes, so require at least 100 bytes for real content
if len(download_buffer) > 100:
print(f"Downloads ready! File size: {len(download_buffer)} bytes")
# Save the ZIP file containing all downloaded receipts to disk
with open("downloaded_files.zip", "wb") as f:
f.write(download_buffer)
print("Files saved as: downloaded_files.zip")
return len(download_buffer)
else:
print("Downloads not ready yet, retrying...")
except APIStatusError as e:
# Handle 404 (session not found) gracefully
if e.status_code == 404:
print("Session not found, returning empty result")
return 0
print(f"Error fetching downloads: {e}")
raise
except Exception as e:
# HTML error response - session may not be ready yet, keep retrying
error_message = str(e)
if "Unexpected token '<'" in error_message or "<html" in error_message:
print("Session not ready yet, retrying...")
await asyncio.sleep(2)
continue
print(f"Error fetching downloads: {e}")
raise
# Poll every 2 seconds to check if downloads are ready
await asyncio.sleep(2)
# Extracts receipt files from downloaded zip archive into output directories
def extract_files_from_zip(zip_path: str, output_dir: str = "output/documents") -> list[str]:
"""
Extract receipt files from a ZIP archive.
Args:
zip_path: Path to the ZIP file containing receipts
output_dir: Directory to extract files to (default: "output/documents")
Returns:
list[str]: Paths to all extracted files
Raises:
ValueError: If no files are found in the ZIP
"""
print(f"Extracting files from {zip_path}...")
# Create output directories for documents and results if they don't exist
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
Path("output/results").mkdir(parents=True, exist_ok=True)
extracted_files: list[str] = []
with zipfile.ZipFile(zip_path, "r") as zip_ref:
# Open zip file and iterate over entries
entries = [e for e in zip_ref.namelist() if not e.endswith("/")]
if len(entries) == 0:
raise ValueError("No files found in the downloaded zip")
# Extract all non-directory entries and collect file paths
for entry in entries:
zip_ref.extract(entry, output_dir)
extracted_path = output_path / entry
print(f"Extracted: {extracted_path}")
extracted_files.append(str(extracted_path))
print(f"\nTotal files extracted: {len(extracted_files)}")
return extracted_files
# Uploads receipt files to Extend AI, runs extraction, and saves results as JSON and CSV
async def parse_receipts_with_extend(file_paths: list[str]) -> None:
"""
Upload receipt files to Extend AI, run extraction, and save results.
Initializes the Extend client, uploads each file, runs synchronous extraction
using inline config (no need to pre-create an extractor resource), and saves
results as JSON and CSV.
Args:
file_paths: List of file paths to receipt documents
"""
# Skip parsing if Extend API key is not configured
extend_api_key = os.environ.get("EXTEND_API_KEY")
if not extend_api_key or extend_api_key == "YOUR_EXTEND_API_KEY_HERE":
print("\nWARNING: EXTEND_API_KEY not configured. Skipping receipt parsing.")
print(" Add your Extend API key to .env to enable automatic receipt parsing.")
return
print("\n=== Parsing Receipts with Extend AI ===\n")
# Initialize Extend AI client
extend_client = Extend(token=extend_api_key)
print(f"Processing {len(file_paths)} receipts with inline config...\n")
# Process all files with retry on rate limiting (429 errors)
results: list[dict] = []
# Uploads a single file to Extend and runs extraction with exponential backoff retry
async def process_with_retry(file_path: str, max_retries: int = 3) -> dict:
file_name = Path(file_path).name
for attempt in range(1, max_retries + 1):
try:
# Upload the file to Extend (multipart form upload)
# Pass as (filename, bytes) tuple so Extend knows the file name
with open(file_path, "rb") as f:
file_bytes = f.read()
upload_response = await asyncio.to_thread(
extend_client.files.upload,
file=(file_name, file_bytes),
)
file_id = upload_response.id
# Run extraction using inline config — no need to pre-create an extractor
result = await asyncio.to_thread(
extend_client.extract,
config=RECEIPT_EXTRACTION_CONFIG,
file={"id": file_id},
)
run_id = result.id
print(f" Parsed {file_name} (run: {run_id})")
# Convert to dict for JSON serialization
data = result
if hasattr(result, "to_dict"):
data = result.to_dict()
elif hasattr(result, "dict"):
data = result.dict()
return {
"file": file_name,
"runId": run_id,
"data": data,
}
except Exception as error:
error_msg = str(error)
is_retryable = (
"429" in error_msg or "rate" in error_msg or "disturbed or locked" in error_msg
)
if is_retryable and attempt < max_retries:
delay = 2**attempt # Exponential backoff: 2s, 4s, 8s
print(
f" Rate limited on {file_name}, retrying in {delay}s "
f"(attempt {attempt}/{max_retries})"
)
await asyncio.sleep(delay)
else:
print(f" Failed to parse {file_name}: {error_msg}")
return {"file": file_name, "data": {"error": error_msg}}
return {"file": file_name, "data": {"error": "Max retries exceeded"}}
# Process in batches of 9 to balance speed and reliability
for i in range(0, len(file_paths), 9):
batch = file_paths[i : i + 9]
batch_results = await asyncio.gather(*[process_with_retry(fp) for fp in batch])
results.extend(batch_results)
# Save results to JSON
json_path = "output/results/receipts.json"
with open(json_path, "w") as f:
json.dump(results, f, indent=2, default=str)
print(f"\nSaved JSON: {json_path}")
# Convert results to CSV for easy viewing in spreadsheet tools
csv_path = "output/results/receipts.csv"
with open(csv_path, "w", newline="") as f:
writer = csv.writer(f, quoting=csv.QUOTE_ALL)
writer.writerow(
[
"file",
"vendor_name",
"receipt_date",
"receipt_number",
"total_amount",
"currency",
"subtotal",
"tax",
"payment_method",
"line_items_count",
]
)
# Build CSV rows from extraction results
for result in results:
data = result.get("data", {})
output = {}
if isinstance(data, dict):
output = data.get("output", {}).get("value", {}) or {}
writer.writerow(
[
result.get("file", ""),
output.get("vendor_name", ""),
output.get("receipt_date", ""),
output.get("receipt_number", ""),
(output.get("total_amount") or {}).get("amount", ""),
(output.get("total_amount") or {}).get("iso_4217_currency_code", ""),
(output.get("subtotal_amount") or {}).get("amount", ""),
(output.get("tax_amount") or {}).get("amount", ""),
output.get("payment_method", ""),
len(output.get("line_items", []))
if isinstance(output.get("line_items"), list)
else 0,
]
)
print(f"Saved CSV: {csv_path}")
async def main() -> None:
"""
Main application entry point.
Orchestrates the entire receipt download and extraction automation process:
1. Initializes Browserbase and Stagehand clients
2. Navigates to the expense portal
3. Finds and clicks all individual receipt download buttons
4. Retrieves downloads from Browserbase and extracts files
5. Optionally parses receipts with Extend AI for structured data extraction
"""
print("Starting Expense Receipt Downloader...\n")
browserbase_api_key = os.environ.get("BROWSERBASE_API_KEY")
if not browserbase_api_key:
raise ValueError("BROWSERBASE_API_KEY is required")
# Initialize Browserbase SDK for session management and download retrieval
bb = Browserbase(api_key=browserbase_api_key)
# Initialize AsyncStagehand client (v3 BYOB architecture)
client = AsyncStagehand(
browserbase_api_key=browserbase_api_key,
)
# Start a Stagehand session (returns a response with session_id)
start_response = await client.sessions.start(
model_name="google/gemini-2.5-flash",
)
session_id = start_response.data.session_id
print(f"Stagehand session started: {session_id}")
try:
# Get live view URL for monitoring browser session in real-time
# Use asyncio.to_thread for synchronous Browserbase SDK calls
live_view_links = await asyncio.to_thread(bb.sessions.debug, session_id)
live_view_link = live_view_links.debuggerFullscreenUrl
print(f"Live View Link: {live_view_link}")
open_in_browser(live_view_link)
# Navigate to the expense portal where receipts are hosted
print("\nNavigating to expense portal...")
await client.sessions.navigate(
id=session_id,
url="https://v0-reimburse-me-expense-portal.vercel.app/",
)
# Use observe to find all individual download buttons (not the Download All button)
print("\nFinding all individual download buttons...")
observe_response = await client.sessions.observe(
id=session_id,
instruction="Find all the small Download links on individual receipt cards.",
)
download_buttons = observe_response.data.result
# Click each download button using observe -> act pattern
# Pass the observed action directly to act for precise element targeting
success_count = 0
for i, action in enumerate(download_buttons):
print(f"Downloading receipt {i + 1}/{len(download_buttons)}...")
# Convert observed action to dict for passing to act
action_dict = (
action.to_dict(exclude_none=True) if hasattr(action, "to_dict") else action
)
try:
await client.sessions.act(id=session_id, input=action_dict)
success_count += 1
except Exception:
# If click fails, scroll element into view and retry
print(f" Could not click download button {i + 1}, trying to scroll and retry...")
try:
await client.sessions.act(id=session_id, input="Scroll down slightly")
await client.sessions.act(id=session_id, input=action_dict)
success_count += 1
except Exception:
print(f" Skipping receipt {i + 1}")
# Scroll down periodically to ensure elements are in view
if (i + 1) % 4 == 0 and (i + 1) < len(download_buttons):
await client.sessions.act(id=session_id, input="Scroll down slightly")
print(f"\nDownload clicks completed! ({success_count}/{len(download_buttons)} successful)")
# End the Stagehand session before fetching downloads
await client.sessions.end(id=session_id)
print("Session closed successfully")
# Wait for session to finalize downloads before polling
await asyncio.sleep(2)
# Retrieve all downloads triggered during this session from Browserbase API
print("\nRetrieving downloads from Browserbase...")
download_size = await save_downloads_with_retry(bb, session_id, 60)
if download_size > 0:
# Extract receipt files from downloaded zip archive
extracted_files = extract_files_from_zip("downloaded_files.zip")
print("\n=== Download Summary ===")
print(f"Total files downloaded: {len(extracted_files)}")
print("Files saved to: ./output/documents/")
# Parse downloaded receipts with Extend AI for structured data extraction
await parse_receipts_with_extend(extracted_files)
else:
print("No downloads were captured")
print("\nExpense receipt download complete!")
except Exception as error:
print(f"Error during automation: {error}")
try:
await client.sessions.end(id=session_id)
except Exception:
# Ignore close errors during cleanup
pass
raise
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as err:
print(f"Application error: {err}")
print("Common issues:")
print(" - Check .env file has BROWSERBASE_API_KEY")
print(" - Add EXTEND_API_KEY to .env to enable receipt parsing with Extend AI")
print(" - Verify internet connection and expense portal accessibility")
print("Docs: https://docs.stagehand.dev/v3/first-steps/introduction")
exit(1)