diff --git a/docs/developer/ts-adapter.md b/docs/developer/ts-adapter.md index 4b79802d..fb3d183c 100644 --- a/docs/developer/ts-adapter.md +++ b/docs/developer/ts-adapter.md @@ -65,6 +65,25 @@ The `page` parameter provides browser interaction methods: - `page.click(selector)` — Click an element - `page.type(selector, text)` — Type text into an input +### Streaming API Interception + +For capturing streaming responses (SSE / chunked transfer) — works in background tabs where DOM rendering is throttled but fetch streams and XHR progress events are not. + +```typescript +await page.installStreamingInterceptor('StreamGenerate'); +// ... trigger the streaming request ... +await page.waitForStreamCapture(60, { minChars: 100, waitForDone: true }); +const { text, events, done, errors } = await page.getStreamedResponses(); +// Or peek without clearing: +const { text } = await page.getStreamedResponses({ clear: false }); +``` + +| Method | Description | +|--------|-------------| +| `installStreamingInterceptor(pattern)` | Patch fetch + XHR to capture streaming responses | +| `waitForStreamCapture(timeout, opts?)` | Poll until `minChars` reached and/or stream `done` | +| `getStreamedResponses(opts?)` | Read captured text/events; pass `{ clear: false }` to peek | + ## The `kwargs` Object Contains parsed CLI arguments as key-value pairs. Always destructure with defaults: diff --git a/docs/developer/yaml-adapter.md b/docs/developer/yaml-adapter.md index 17af8a2e..7011eadd 100644 --- a/docs/developer/yaml-adapter.md +++ b/docs/developer/yaml-adapter.md @@ -100,6 +100,27 @@ Download media files. ``` ::: +### `stream-intercept` +Capture streaming responses (SSE / chunked transfer) incrementally. Unlike `intercept`, this reads the response body as a stream — ideal for AI chat endpoints that stream replies, and works in background tabs where DOM rendering is throttled. Requires `browser: true`. + +::: v-pre +```yaml +- stream-intercept: + capture: "StreamGenerate" + trigger: "click:@send" + timeout: 60 + waitForDone: true +``` +::: + +| Param | Default | Description | +|-------|---------|-------------| +| `capture` | (required) | URL substring to match | +| `trigger` | `""` | Action before capture: `click:@ref`, `navigate:url`, `evaluate:js`, `scroll` | +| `timeout` | `60` | Max seconds to wait for stream data | +| `waitForDone` | `true` | Wait until the stream completes (not just first bytes) | +| `returnEvents` | `false` | Return parsed SSE events instead of raw text | + ## Template Expressions ::: v-pre diff --git a/src/browser/base-page.ts b/src/browser/base-page.ts index e7e4ace7..1543cc58 100644 --- a/src/browser/base-page.ts +++ b/src/browser/base-page.ts @@ -18,6 +18,7 @@ import { waitForTextJs, waitForCaptureJs, waitForSelectorJs, + waitForStreamCaptureJs, scrollJs, autoScrollJs, networkRequestsJs, @@ -159,6 +160,22 @@ export abstract class BasePage implements IPage { await this.evaluate(waitForCaptureJs(maxMs)); } + async installStreamingInterceptor(pattern: string): Promise { + const { generateStreamingInterceptorJs } = await import('../interceptor.js'); + await this.evaluate(generateStreamingInterceptorJs(JSON.stringify(pattern))); + } + + async getStreamedResponses(opts?: { clear?: boolean }): Promise<{ text: string; events: any[]; done: boolean; errors: any[] }> { + const { generateReadStreamJs } = await import('../interceptor.js'); + const clear = opts?.clear !== false; // default true for backwards compat + return (await this.evaluate(generateReadStreamJs('__opencli_stream', clear))) as { text: string; events: any[]; done: boolean; errors: any[] }; + } + + async waitForStreamCapture(timeout: number = 30, opts?: { minChars?: number; waitForDone?: boolean }): Promise { + const maxMs = timeout * 1000; + await this.evaluate(waitForStreamCaptureJs(maxMs, opts)); + } + /** Fallback basic snapshot */ protected async _basicSnapshot(opts: Pick = {}): Promise { const maxDepth = Math.max(1, Math.min(Number(opts.maxDepth) || 50, 200)); diff --git a/src/browser/cdp.ts b/src/browser/cdp.ts index 94e23a77..7e5d8ce7 100644 --- a/src/browser/cdp.ts +++ b/src/browser/cdp.ts @@ -15,7 +15,7 @@ import type { BrowserCookie, IPage, ScreenshotOptions } from '../types.js'; import type { IBrowserFactory } from '../runtime.js'; import { wrapForEval } from './utils.js'; import { generateStealthJs } from './stealth.js'; -import { waitForDomStableJs } from './dom-helpers.js'; +import { waitForDomStableJs, waitForCaptureJs } from './dom-helpers.js'; import { isRecord, saveBase64ToFile } from '../utils.js'; import { getAllElectronApps } from '../electron-apps.js'; import { BasePage } from './base-page.js'; @@ -234,6 +234,33 @@ class CDPPage extends BasePage { async selectTab(_index: number): Promise { // Not supported in direct CDP mode } + + async consoleMessages(_level?: string): Promise { + return []; + } + + async getCurrentUrl(): Promise { + return this._lastUrl; + } + + async installInterceptor(pattern: string): Promise { + const { generateInterceptorJs } = await import('../interceptor.js'); + await this.evaluate(generateInterceptorJs(JSON.stringify(pattern), { + arrayName: '__opencli_xhr', + patchGuard: '__opencli_interceptor_patched', + })); + } + + async getInterceptedRequests(): Promise { + const { generateReadInterceptedJs } = await import('../interceptor.js'); + const result = await this.evaluate(generateReadInterceptedJs('__opencli_xhr')); + return Array.isArray(result) ? result : []; + } + + async waitForCapture(timeout: number = 10): Promise { + const maxMs = timeout * 1000; + await this.evaluate(waitForCaptureJs(maxMs)); + } } function isCookie(value: unknown): value is BrowserCookie { diff --git a/src/browser/daemon-client.ts b/src/browser/daemon-client.ts index 8f004689..a77b4858 100644 --- a/src/browser/daemon-client.ts +++ b/src/browser/daemon-client.ts @@ -114,6 +114,7 @@ export async function isExtensionConnected(): Promise { export async function sendCommand( action: DaemonCommand['action'], params: Omit = {}, + timeoutMs: number = 30000, ): Promise { const maxRetries = 4; @@ -126,7 +127,7 @@ export async function sendCommand( method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(command), - timeout: 30000, + timeout: timeoutMs, }); const result = (await res.json()) as DaemonResult; diff --git a/src/browser/dom-helpers.test.ts b/src/browser/dom-helpers.test.ts index e14a2c24..bf9a2b1f 100644 --- a/src/browser/dom-helpers.test.ts +++ b/src/browser/dom-helpers.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect } from 'vitest'; -import { autoScrollJs, waitForCaptureJs, waitForSelectorJs } from './dom-helpers.js'; +import { autoScrollJs, waitForCaptureJs, waitForSelectorJs, waitForStreamCaptureJs } from './dom-helpers.js'; describe('autoScrollJs', () => { it('returns early without error when document.body is null', async () => { @@ -112,3 +112,64 @@ describe('waitForSelectorJs', () => { delete g.MutationObserver; }); }); + +describe('waitForStreamCaptureJs', () => { + it('returns a non-empty string with default prefix', () => { + const code = waitForStreamCaptureJs(1000); + expect(typeof code).toBe('string'); + expect(code.length).toBeGreaterThan(0); + expect(code).toContain('__opencli_stream_text'); + expect(code).toContain('__opencli_stream_done'); + }); + + it('generates code that resolves when minChars is reached', async () => { + const g = globalThis as any; + g.__opencli_stream_text = ''; + g.__opencli_stream_done = false; + g.window = g; + const code = waitForStreamCaptureJs(1000, { minChars: 5 }); + const promise = eval(code) as Promise; + // Simulate data arriving + g.__opencli_stream_text = 'hello world'; + await expect(promise).resolves.not.toThrow(); + delete g.__opencli_stream_text; + delete g.__opencli_stream_done; + delete g.window; + }); + + it('generates code that resolves when done flag is set', async () => { + const g = globalThis as any; + g.__opencli_stream_text = ''; + g.__opencli_stream_done = false; + g.window = g; + const code = waitForStreamCaptureJs(1000, { waitForDone: true }); + const promise = eval(code) as Promise; + // Simulate stream completion — need both minChars AND done + g.__opencli_stream_text = 'data arrived'; + g.__opencli_stream_done = true; + await expect(promise).resolves.not.toThrow(); + delete g.__opencli_stream_text; + delete g.__opencli_stream_done; + delete g.window; + }); + + it('generates code that rejects on timeout', async () => { + const g = globalThis as any; + g.__opencli_stream_text = ''; + g.__opencli_stream_done = false; + g.window = g; + const code = waitForStreamCaptureJs(50, { minChars: 100, waitForDone: true }); + const promise = eval(code) as Promise; + await expect(promise).rejects.toThrow(); + delete g.__opencli_stream_text; + delete g.__opencli_stream_done; + delete g.window; + }); + + it('uses custom prefix when provided', () => { + const code = waitForStreamCaptureJs(1000, { prefix: '__my_prefix' }); + expect(code).toContain('__my_prefix_text'); + expect(code).toContain('__my_prefix_done'); + expect(code).not.toContain('__opencli_stream'); + }); +}); diff --git a/src/browser/dom-helpers.ts b/src/browser/dom-helpers.ts index 7bfbfa63..dcd52ffe 100644 --- a/src/browser/dom-helpers.ts +++ b/src/browser/dom-helpers.ts @@ -200,6 +200,35 @@ export function waitForCaptureJs(maxMs: number): string { `; } +/** + * Generate JS to wait until the streaming interceptor has captured data. + * Polls window.__opencli_stream_text length. Optionally waits for stream completion. + * 50ms interval, rejects after maxMs. + */ +export function waitForStreamCaptureJs( + maxMs: number, + opts: { minChars?: number; waitForDone?: boolean; prefix?: string } = {}, +): string { + const minChars = opts.minChars ?? 1; + const waitForDone = opts.waitForDone ?? false; + const prefix = opts.prefix ?? '__opencli_stream'; + return ` + new Promise((resolve, reject) => { + const deadline = Date.now() + ${maxMs}; + const check = () => { + const text = window.${prefix}_text || ''; + const done = window.${prefix}_done || false; + if (text.length >= ${minChars} && (${waitForDone} ? done : true)) { + return resolve('captured'); + } + if (Date.now() > deadline) return reject(new Error('Stream capture timeout')); + setTimeout(check, 50); + }; + check(); + }) + `; +} + /** * Generate JS to wait until document.querySelector(selector) returns a match. * Uses MutationObserver for near-instant resolution; falls back to reject after timeoutMs. diff --git a/src/browser/page.ts b/src/browser/page.ts index 1274e61b..79e8f3d1 100644 --- a/src/browser/page.ts +++ b/src/browser/page.ts @@ -15,9 +15,9 @@ import { sendCommand } from './daemon-client.js'; import { wrapForEval } from './utils.js'; import { saveBase64ToFile } from '../utils.js'; import { generateStealthJs } from './stealth.js'; -import { waitForDomStableJs } from './dom-helpers.js'; -import { BasePage } from './base-page.js'; +import { waitForDomStableJs, waitForCaptureJs, waitForStreamCaptureJs } from './dom-helpers.js'; +import { BasePage } from './base-page.js'; export function isRetryableSettleError(err: unknown): boolean { const message = err instanceof Error ? err.message : String(err); return message.includes('Inspected target navigated or closed') @@ -179,6 +179,15 @@ export class Page extends BasePage { throw new Error('setFileInput returned no count — command may not be supported by the extension'); } } + + // Override: waitForStreamCapture needs longer HTTP timeout for long-running browser promises + async waitForStreamCapture(timeout: number = 30, opts?: { minChars?: number; waitForDone?: boolean }): Promise { + const maxMs = timeout * 1000; + await sendCommand('exec', { + code: waitForStreamCaptureJs(maxMs, opts), + ...this._cmdOpts(), + }, maxMs + 10000); // HTTP timeout = browser timeout + 10s buffer + } } // (End of file) diff --git a/src/clis/douyin/draft.test.ts b/src/clis/douyin/draft.test.ts index 21f1deac..fe92ce2b 100644 --- a/src/clis/douyin/draft.test.ts +++ b/src/clis/douyin/draft.test.ts @@ -96,7 +96,9 @@ function createPageMock( installInterceptor: vi.fn().mockResolvedValue(undefined), getInterceptedRequests: vi.fn().mockResolvedValue([]), waitForCapture: vi.fn().mockResolvedValue(undefined), - screenshot: vi.fn().mockResolvedValue(''), + installStreamingInterceptor: vi.fn().mockResolvedValue(undefined), + getStreamedResponses: vi.fn().mockResolvedValue({ text: '', events: [], done: false, errors: [] }), + waitForStreamCapture: vi.fn().mockResolvedValue(undefined), screenshot: vi.fn().mockResolvedValue(''), setFileInput: vi.fn().mockResolvedValue(undefined), ...overrides, }; diff --git a/src/clis/facebook/search.test.ts b/src/clis/facebook/search.test.ts index 5a2a183d..f7a4ac37 100644 --- a/src/clis/facebook/search.test.ts +++ b/src/clis/facebook/search.test.ts @@ -42,7 +42,9 @@ function createMockPage(): IPage { installInterceptor: vi.fn(), getInterceptedRequests: vi.fn().mockResolvedValue([]), waitForCapture: vi.fn().mockResolvedValue(undefined), - screenshot: vi.fn().mockResolvedValue(''), + installStreamingInterceptor: vi.fn().mockResolvedValue(undefined), + getStreamedResponses: vi.fn().mockResolvedValue({ text: '', events: [], done: false, errors: [] }), + waitForStreamCapture: vi.fn().mockResolvedValue(undefined), screenshot: vi.fn().mockResolvedValue(''), }; } diff --git a/src/clis/substack/utils.test.ts b/src/clis/substack/utils.test.ts index 476f4aa4..e40a35eb 100644 --- a/src/clis/substack/utils.test.ts +++ b/src/clis/substack/utils.test.ts @@ -26,7 +26,9 @@ function createPageMock(evaluateResult: unknown): IPage { getCookies: vi.fn().mockResolvedValue([]), screenshot: vi.fn().mockResolvedValue(''), waitForCapture: vi.fn().mockResolvedValue(undefined), - }; + installStreamingInterceptor: vi.fn().mockResolvedValue(undefined), + getStreamedResponses: vi.fn().mockResolvedValue({ text: '', events: [], done: false, errors: [] }), + waitForStreamCapture: vi.fn().mockResolvedValue(undefined), }; } describe('substack utils wait selectors', () => { diff --git a/src/clis/xiaohongshu/comments.test.ts b/src/clis/xiaohongshu/comments.test.ts index ba340c84..f45a4369 100644 --- a/src/clis/xiaohongshu/comments.test.ts +++ b/src/clis/xiaohongshu/comments.test.ts @@ -27,7 +27,9 @@ function createPageMock(evaluateResult: any): IPage { getCookies: vi.fn().mockResolvedValue([]), screenshot: vi.fn().mockResolvedValue(''), waitForCapture: vi.fn().mockResolvedValue(undefined), - }; + installStreamingInterceptor: vi.fn().mockResolvedValue(undefined), + getStreamedResponses: vi.fn().mockResolvedValue({ text: '', events: [], done: false, errors: [] }), + waitForStreamCapture: vi.fn().mockResolvedValue(undefined), }; } describe('xiaohongshu comments', () => { diff --git a/src/clis/xiaohongshu/creator-note-detail.test.ts b/src/clis/xiaohongshu/creator-note-detail.test.ts index a58590a5..26853e37 100644 --- a/src/clis/xiaohongshu/creator-note-detail.test.ts +++ b/src/clis/xiaohongshu/creator-note-detail.test.ts @@ -34,7 +34,9 @@ function createPageMock(evaluateResult: any): IPage { getCookies: vi.fn().mockResolvedValue([]), screenshot: vi.fn().mockResolvedValue(''), waitForCapture: vi.fn().mockResolvedValue(undefined), - }; + installStreamingInterceptor: vi.fn().mockResolvedValue(undefined), + getStreamedResponses: vi.fn().mockResolvedValue({ text: '', events: [], done: false, errors: [] }), + waitForStreamCapture: vi.fn().mockResolvedValue(undefined), }; } describe('xiaohongshu creator-note-detail', () => { diff --git a/src/clis/xiaohongshu/creator-notes.test.ts b/src/clis/xiaohongshu/creator-notes.test.ts index 68cba62d..f37c5abf 100644 --- a/src/clis/xiaohongshu/creator-notes.test.ts +++ b/src/clis/xiaohongshu/creator-notes.test.ts @@ -38,7 +38,9 @@ function createPageMock(evaluateResult: any, interceptedRequests: any[] = []): I getCookies: vi.fn().mockResolvedValue([]), screenshot: vi.fn().mockResolvedValue(''), waitForCapture: vi.fn().mockResolvedValue(undefined), - }; + installStreamingInterceptor: vi.fn().mockResolvedValue(undefined), + getStreamedResponses: vi.fn().mockResolvedValue({ text: '', events: [], done: false, errors: [] }), + waitForStreamCapture: vi.fn().mockResolvedValue(undefined), }; } describe('xiaohongshu creator-notes', () => { diff --git a/src/clis/xiaohongshu/download.test.ts b/src/clis/xiaohongshu/download.test.ts index 43b66f19..5afa1e10 100644 --- a/src/clis/xiaohongshu/download.test.ts +++ b/src/clis/xiaohongshu/download.test.ts @@ -41,7 +41,9 @@ function createPageMock(evaluateResult: any): IPage { getCookies: vi.fn().mockResolvedValue([{ name: 'sid', value: 'secret', domain: '.xiaohongshu.com' }]), screenshot: vi.fn().mockResolvedValue(''), waitForCapture: vi.fn().mockResolvedValue(undefined), - }; + installStreamingInterceptor: vi.fn().mockResolvedValue(undefined), + getStreamedResponses: vi.fn().mockResolvedValue({ text: '', events: [], done: false, errors: [] }), + waitForStreamCapture: vi.fn().mockResolvedValue(undefined), }; } describe('xiaohongshu download', () => { diff --git a/src/clis/xiaohongshu/note.test.ts b/src/clis/xiaohongshu/note.test.ts index 5ec00b65..8d1fbcca 100644 --- a/src/clis/xiaohongshu/note.test.ts +++ b/src/clis/xiaohongshu/note.test.ts @@ -28,7 +28,9 @@ function createPageMock(evaluateResult: any): IPage { getCookies: vi.fn().mockResolvedValue([]), screenshot: vi.fn().mockResolvedValue(''), waitForCapture: vi.fn().mockResolvedValue(undefined), - }; + installStreamingInterceptor: vi.fn().mockResolvedValue(undefined), + getStreamedResponses: vi.fn().mockResolvedValue({ text: '', events: [], done: false, errors: [] }), + waitForStreamCapture: vi.fn().mockResolvedValue(undefined), }; } describe('parseNoteId', () => { diff --git a/src/clis/xiaohongshu/publish.test.ts b/src/clis/xiaohongshu/publish.test.ts index 55ead889..ad42e3ae 100644 --- a/src/clis/xiaohongshu/publish.test.ts +++ b/src/clis/xiaohongshu/publish.test.ts @@ -37,7 +37,9 @@ function createPageMock(evaluateResults: any[], overrides: Partial = {}): getCookies: vi.fn().mockResolvedValue([]), screenshot: vi.fn().mockResolvedValue(''), waitForCapture: vi.fn().mockResolvedValue(undefined), - ...overrides, + installStreamingInterceptor: vi.fn().mockResolvedValue(undefined), + getStreamedResponses: vi.fn().mockResolvedValue({ text: '', events: [], done: false, errors: [] }), + waitForStreamCapture: vi.fn().mockResolvedValue(undefined), ...overrides, }; } diff --git a/src/clis/xiaohongshu/search.test.ts b/src/clis/xiaohongshu/search.test.ts index 15d8799e..6bfd4131 100644 --- a/src/clis/xiaohongshu/search.test.ts +++ b/src/clis/xiaohongshu/search.test.ts @@ -32,7 +32,9 @@ function createPageMock(evaluateResults: any[]): IPage { getCookies: vi.fn().mockResolvedValue([]), screenshot: vi.fn().mockResolvedValue(''), waitForCapture: vi.fn().mockResolvedValue(undefined), - }; + installStreamingInterceptor: vi.fn().mockResolvedValue(undefined), + getStreamedResponses: vi.fn().mockResolvedValue({ text: '', events: [], done: false, errors: [] }), + waitForStreamCapture: vi.fn().mockResolvedValue(undefined), }; } describe('xiaohongshu search', () => { diff --git a/src/interceptor.test.ts b/src/interceptor.test.ts index fc843b92..70d8fce2 100644 --- a/src/interceptor.test.ts +++ b/src/interceptor.test.ts @@ -3,7 +3,7 @@ */ import { describe, it, expect } from 'vitest'; -import { generateInterceptorJs, generateReadInterceptedJs, generateTapInterceptorJs } from './interceptor.js'; +import { generateInterceptorJs, generateReadInterceptedJs, generateTapInterceptorJs, generateStreamingInterceptorJs, generateReadStreamJs } from './interceptor.js'; describe('generateInterceptorJs', () => { it('generates valid JavaScript function source', () => { @@ -92,3 +92,132 @@ describe('generateTapInterceptorJs', () => { expect(tap.xhrPatch).toContain('!captured'); }); }); + +describe('generateStreamingInterceptorJs', () => { + it('generates valid JavaScript function source', () => { + const js = generateStreamingInterceptorJs('"api/stream"'); + expect(js.trim()).toMatch(/^\(\)\s*=>/); + expect(js).toContain('"api/stream"'); + }); + + it('initializes all streaming state variables', () => { + const js = generateStreamingInterceptorJs('"test"'); + expect(js).toContain('__opencli_stream_text'); + expect(js).toContain('__opencli_stream_events'); + expect(js).toContain('__opencli_stream_done'); + expect(js).toContain('__opencli_stream_errors'); + expect(js).toContain('__opencli_stream_sse_buf'); + }); + + it('resets SSE buffer on install', () => { + const js = generateStreamingInterceptorJs('"test"'); + // SSE buffer should be reset at initialization time + expect(js).toContain('__opencli_stream_sse_buf'); + expect(js).toContain("= ''"); + }); + + it('resets all state including SSE buffer on each new matching fetch', () => { + const js = generateStreamingInterceptorJs('"test"'); + // Inside the fetch patch, SSE buffer should be cleared for new request + const fetchResetCount = (js.match(/__opencli_stream_sse_buf/g) || []).length; + // Should appear at least twice: init + fetch reset + XHR reset + expect(fetchResetCount).toBeGreaterThanOrEqual(2); + }); + + it('patches both fetch and XHR', () => { + const js = generateStreamingInterceptorJs('"test"'); + expect(js).toContain('window.fetch'); + expect(js).toContain('XMLHttpRequest.prototype'); + expect(js).toContain('onprogress'); + expect(js).toContain('readystatechange'); + }); + + it('normalizes CRLF in SSE parsing', () => { + const js = generateStreamingInterceptorJs('"test"'); + // SSE parser should normalize \r\n to \n + expect(js).toContain(String.raw`replace(/\r\n/g, '\n')`); + }); + + it('normalizes CRLF in fetch SSE boundary detection', () => { + const js = generateStreamingInterceptorJs('"test"'); + // Fetch path should normalize CRLF before SSE boundary splitting + expect(js).toContain('sseBuffer'); + expect(js).toContain(String.raw`replace(/\r\n/g, '\n')`); + }); + + it('normalizes CRLF in XHR SSE boundary detection', () => { + const js = generateStreamingInterceptorJs('"test"'); + // XHR progress path should normalize CRLF + expect(js).toContain(String.raw`chunk.replace(/\r\n/g, '\n')`); + }); + + it('includes SSE parser function', () => { + const js = generateStreamingInterceptorJs('"test"'); + expect(js).toContain('__parseSse'); + expect(js).toContain('event:'); + expect(js).toContain('data:'); + }); + + it('uses custom prefix and patch guard', () => { + const js = generateStreamingInterceptorJs('"test"', { + arrayName: '__my_stream', + patchGuard: '__my_stream_guard', + }); + expect(js).toContain('__my_stream_text'); + expect(js).toContain('__my_stream_events'); + expect(js).toContain('__my_stream_guard'); + expect(js).not.toContain('__opencli_stream'); + }); + + it('uses custom maxChunks', () => { + const js = generateStreamingInterceptorJs('"test"', { maxChunks: 100 }); + expect(js).toContain('100'); + }); + + it('XHR readystatechange uses authoritative responseText overwrite', () => { + const js = generateStreamingInterceptorJs('"test"'); + // readystatechange readyState=4 should overwrite with full responseText + expect(js).toContain('readyState === 4'); + expect(js).toContain('window.__opencli_stream_text = full'); + }); + + it('XHR load event is guarded by settled flag', () => { + const js = generateStreamingInterceptorJs('"test"'); + expect(js).toContain('if (settled) return'); + }); +}); + +describe('generateReadStreamJs', () => { + it('generates valid JavaScript to read streaming state', () => { + const js = generateReadStreamJs(); + expect(js.trim()).toMatch(/^\(\)\s*=>/); + expect(js).toContain('__opencli_stream_text'); + expect(js).toContain('__opencli_stream_events'); + expect(js).toContain('__opencli_stream_done'); + expect(js).toContain('__opencli_stream_errors'); + }); + + it('clears all state including SSE buffer by default', () => { + const js = generateReadStreamJs(); + expect(js).toContain('__opencli_stream_text'); + expect(js).toContain('__opencli_stream_sse_buf'); + expect(js).toContain("= ''"); + expect(js).toContain('= []'); + }); + + it('does not clear state when clear=false (peek mode)', () => { + const js = generateReadStreamJs('__opencli_stream', false); + // Should NOT contain clearing statements + expect(js).not.toContain("window.__opencli_stream_text = ''"); + expect(js).not.toContain("window.__opencli_stream_sse_buf = ''"); + // But should still contain the read statements + expect(js).toContain('__opencli_stream_text'); + }); + + it('uses custom prefix', () => { + const js = generateReadStreamJs('__my_prefix'); + expect(js).toContain('__my_prefix_text'); + expect(js).toContain('__my_prefix_events'); + expect(js).not.toContain('__opencli_stream'); + }); +}); diff --git a/src/interceptor.ts b/src/interceptor.ts index 76f3f6a7..e5908d88 100644 --- a/src/interceptor.ts +++ b/src/interceptor.ts @@ -131,6 +131,268 @@ export function generateReadInterceptedJs(arrayName: string = '__opencli_interce `; } +/** + * Generate JavaScript source that installs a streaming-capable fetch interceptor. + * Unlike generateInterceptorJs (which awaits full JSON), this reads the response + * body as a ReadableStream and accumulates chunks incrementally — ideal for + * SSE / streaming endpoints where the response never terminates during use. + * + * Captured data: + * - window.__opencli_stream_text — accumulated decoded text + * - window.__opencli_sse_events — parsed SSE events (if content is SSE) + * - window.__opencli_stream_done — true when the stream ends + * - window.__opencli_stream_errors — array of {url, error} + * + * @param patternExpr - JS expression resolving to a URL substring to match + * @param opts.arrayName - Prefix for globals (default: '__opencli_stream') + * @param opts.patchGuard - Guard name to prevent double-patching + * @param opts.maxChunks - Max chunks to buffer before dropping oldest (default 5000) + */ +export function generateStreamingInterceptorJs( + patternExpr: string, + opts: { arrayName?: string; patchGuard?: string; maxChunks?: number } = {}, +): string { + const prefix = opts.arrayName ?? '__opencli_stream'; + const guard = opts.patchGuard ?? '__opencli_stream_patched'; + const maxChunks = opts.maxChunks ?? 5000; + const patternVar = `${guard}_pattern`; + + return ` + () => { + ${DEFINE_HIDDEN} + ${DISGUISE_FN} + + // Reset all capture state (including SSE buffer) so previous data + // never leaks into a new interception session. + __defHidden(window, '${prefix}_text', ''); + __defHidden(window, '${prefix}_events', []); + __defHidden(window, '${prefix}_done', false); + __defHidden(window, '${prefix}_errors', []); + __defHidden(window, '${prefix}_sse_buf', ''); + __defHidden(window, '${patternVar}', ${patternExpr}); + const __checkMatch = (url) => window.${patternVar} && url.includes(window.${patternVar}); + + if (!window.${guard}) { + const __origFetch = window.fetch; + window.fetch = __disguise(async function(...args) { + const reqUrl = typeof args[0] === 'string' ? args[0] + : (args[0] && args[0].url) || ''; + const response = await __origFetch.apply(this, args); + if (__checkMatch(reqUrl)) { + // Reset all state for new request (including SSE buffer) + window.${prefix}_text = ''; + window.${prefix}_events = []; + window.${prefix}_done = false; + window.${prefix}_sse_buf = ''; + + try { + const clone = response.clone(); + const reader = clone.body && clone.body.getReader ? clone.body.getReader() : null; + if (reader) { + (async () => { + const decoder = new TextDecoder(); + let sseBuffer = ''; + let chunkCount = 0; + try { + while (true) { + const { done, value } = await reader.read(); + if (done) { + window.${prefix}_done = true; + // flush remaining SSE buffer + if (sseBuffer.trim()) __parseSse(sseBuffer, window.${prefix}_events); + break; + } + const text = decoder.decode(value, { stream: true }); + window.${prefix}_text += text; + chunkCount++; + if (chunkCount > ${maxChunks}) { + window.${prefix}_text = window.${prefix}_text.slice( + Math.floor(window.${prefix}_text.length / 2) + ); + chunkCount = Math.floor(chunkCount / 2); + } + // SSE parsing: normalize CRLF, accumulate buffer, emit complete events + sseBuffer += text.replace(/\\r\\n/g, '\\n'); + let boundary; + while ((boundary = sseBuffer.indexOf('\\n\\n')) !== -1) { + const block = sseBuffer.slice(0, boundary); + sseBuffer = sseBuffer.slice(boundary + 2); + __parseSse(block, window.${prefix}_events); + } + } + } catch(e) { + window.${prefix}_errors.push({ url: reqUrl, error: String(e) }); + window.${prefix}_done = true; + } + })(); + } else { + // No ReadableStream — fall back to text() + try { + window.${prefix}_text = await clone.text(); + window.${prefix}_done = true; + } catch(e) { + window.${prefix}_errors.push({ url: reqUrl, error: String(e) }); + window.${prefix}_done = true; + } + } + } catch(e) { + window.${prefix}_errors.push({ url: reqUrl, error: String(e) }); + window.${prefix}_done = true; + } + } + return response; + }, 'fetch'); + + // ── SSE parser helper (normalizes CRLF → LF) ── + function __parseSse(block, events) { + let currentEvent = ''; + let dataLines = []; + for (const line of block.replace(/\\r\\n/g, '\\n').split('\\n')) { + if (line.startsWith('event:')) { + currentEvent = line.slice(6).trim(); + } else if (line.startsWith('data:')) { + dataLines.push(line.slice(5).trimStart()); + } else if (line.startsWith(':')) { + // SSE comment, ignore + } else if (line === '') { + // end of event (shouldn't happen since we split on \\n\\n, but handle gracefully) + } + } + if (dataLines.length > 0) { + const eventData = dataLines.join('\\n'); + events.push({ event: currentEvent || 'message', data: eventData }); + } + } + + // ── Patch XMLHttpRequest (streaming via onprogress + complete via readystatechange) ── + const __XHR = XMLHttpRequest.prototype; + const __origXhrOpen = __XHR.open; + const __origXhrSend = __XHR.send; + __XHR.open = __disguise(function(method, url) { + Object.defineProperty(this, '__${guard}_url', { + value: String(url), writable: true, enumerable: false, configurable: true + }); + return __origXhrOpen.apply(this, arguments); + }, 'open'); + __XHR.send = __disguise(function() { + if (__checkMatch(this.__${guard}_url)) { + // Reset all capture state for new XHR request + window.${prefix}_text = ''; + window.${prefix}_events = []; + window.${prefix}_done = false; + window.${prefix}_sse_buf = ''; + + const xhr = this; + let lastLen = 0; + let settled = false; + + // onprogress: incremental capture for real-time streaming + xhr.addEventListener('progress', function() { + try { + const full = xhr.responseText || ''; + const chunk = full.slice(lastLen); + lastLen = full.length; + if (chunk.length > 0) { + window.${prefix}_text += chunk; + const buf = window.${prefix}_sse_buf || ''; + let combined = buf + chunk.replace(/\\r\\n/g, '\\n'); + let boundary; + while ((boundary = combined.indexOf('\\n\\n')) !== -1) { + const block = combined.slice(0, boundary); + combined = combined.slice(boundary + 2); + __parseSse(block, window.${prefix}_events); + } + __defHidden(window, '${prefix}_sse_buf', combined); + } + } catch(e) {} + }); + + // readystatechange readyState=4: authoritative complete responseText + // This is the MOST reliable way to get the full response. + // It fires as a fallback if the load event doesn't fire. + xhr.addEventListener('readystatechange', function() { + if (xhr.readyState === 4 && !settled) { + settled = true; + try { + const full = xhr.responseText || ''; + // OVERWRITE with complete responseText — guarantees completeness + // even if onprogress missed chunks between its last fire and completion + if (full.length > 0) { + window.${prefix}_text = full; + } + } catch(e) {} + window.${prefix}_done = true; + const buf = window.${prefix}_sse_buf || ''; + if (buf.trim()) __parseSse(buf, window.${prefix}_events); + __defHidden(window, '${prefix}_sse_buf', ''); + } + }); + + // load event: secondary fallback (some browsers fire load but not readystatechange=4) + xhr.addEventListener('load', function() { + if (settled) return; // already handled by readystatechange + settled = true; + try { + const full = xhr.responseText || ''; + if (full.length > 0) { + window.${prefix}_text = full; + } + } catch(e) {} + window.${prefix}_done = true; + const buf = window.${prefix}_sse_buf || ''; + if (buf.trim()) __parseSse(buf, window.${prefix}_events); + __defHidden(window, '${prefix}_sse_buf', ''); + }); + + xhr.addEventListener('error', function() { + if (!settled) { + settled = true; + window.${prefix}_errors.push({ url: xhr.__${guard}_url, error: 'XHR error' }); + window.${prefix}_done = true; + } + }); + } + return __origXhrSend.apply(this, arguments); + }, 'send'); + + __defHidden(window, '${guard}', true); + } + } + `; +} + +/** + * Generate JavaScript source to read (and optionally clear) streaming interceptor state. + * Returns { text, events, done, errors }. + * + * @param prefix - Global variable prefix (default: '__opencli_stream') + * @param clear - Whether to clear state after reading (default: true for backwards compat) + */ +export function generateReadStreamJs( + prefix: string = '__opencli_stream', + clear: boolean = true, +): string { + const clearStmt = clear ? ` + window.${prefix}_text = ''; + window.${prefix}_events = []; + window.${prefix}_done = false; + window.${prefix}_errors = []; + window.${prefix}_sse_buf = ''; + ` : ''; + return ` + () => { + const result = { + text: window.${prefix}_text || '', + events: window.${prefix}_events || [], + done: window.${prefix}_done || false, + errors: window.${prefix}_errors || [], + }; + ${clearStmt} + return result; + } + `; +} + /** * Generate a self-contained tap interceptor for store-action bridge. * Unlike the global interceptor, this one: diff --git a/src/pipeline/executor.test.ts b/src/pipeline/executor.test.ts index ca7ad555..a00ccdd1 100644 --- a/src/pipeline/executor.test.ts +++ b/src/pipeline/executor.test.ts @@ -32,7 +32,9 @@ function createMockPage(overrides: Partial = {}): IPage { getInterceptedRequests: vi.fn().mockResolvedValue([]), screenshot: vi.fn().mockResolvedValue(''), waitForCapture: vi.fn().mockResolvedValue(undefined), - ...overrides, + installStreamingInterceptor: vi.fn().mockResolvedValue(undefined), + getStreamedResponses: vi.fn().mockResolvedValue({ text: '', events: [], done: false, errors: [] }), + waitForStreamCapture: vi.fn().mockResolvedValue(undefined), ...overrides, }; } diff --git a/src/pipeline/registry.ts b/src/pipeline/registry.ts index ac52a00f..35310fd4 100644 --- a/src/pipeline/registry.ts +++ b/src/pipeline/registry.ts @@ -12,6 +12,7 @@ import { stepSelect, stepMap, stepFilter, stepSort, stepLimit } from './steps/tr import { stepIntercept } from './steps/intercept.js'; import { stepTap } from './steps/tap.js'; import { stepDownload } from './steps/download.js'; +import { stepStreamIntercept } from './steps/stream-intercept.js'; /** * Step handler: all pipeline steps conform to this generic interface. @@ -60,3 +61,4 @@ registerStep('limit', stepLimit); registerStep('intercept', stepIntercept); registerStep('tap', stepTap); registerStep('download', stepDownload); +registerStep('stream-intercept', stepStreamIntercept); diff --git a/src/pipeline/steps/download.test.ts b/src/pipeline/steps/download.test.ts index bf177eb5..9e17100a 100644 --- a/src/pipeline/steps/download.test.ts +++ b/src/pipeline/steps/download.test.ts @@ -45,7 +45,9 @@ function createMockPage(getCookies: IPage['getCookies']): IPage { getInterceptedRequests: vi.fn().mockResolvedValue([]), screenshot: vi.fn().mockResolvedValue(''), waitForCapture: vi.fn().mockResolvedValue(undefined), - }; + installStreamingInterceptor: vi.fn().mockResolvedValue(undefined), + getStreamedResponses: vi.fn().mockResolvedValue({ text: '', events: [], done: false, errors: [] }), + waitForStreamCapture: vi.fn().mockResolvedValue(undefined), }; } describe('stepDownload', () => { diff --git a/src/pipeline/steps/stream-intercept.ts b/src/pipeline/steps/stream-intercept.ts new file mode 100644 index 00000000..9ac917f8 --- /dev/null +++ b/src/pipeline/steps/stream-intercept.ts @@ -0,0 +1,60 @@ +/** + * Pipeline step: stream-intercept — declarative streaming response capture. + * + * Unlike `intercept` (which awaits full JSON), this reads the response body + * as a ReadableStream and accumulates chunks incrementally. Ideal for SSE / + * streaming endpoints where the response never terminates during use, and + * works in background tabs where rAF is throttled but fetch streams are not. + * + * YAML usage: + * - stream-intercept: + * capture: "generativelanguage.googleapis.com" + * trigger: "click:@42" + * timeout: 60 + * waitForDone: true + * returnEvents: false + */ + +import type { IPage } from '../../types.js'; +import { render, normalizeEvaluateSource } from '../template.js'; + +export async function stepStreamIntercept( + page: IPage | null, params: any, data: any, args: Record, +): Promise { + const cfg = typeof params === 'object' ? params : {}; + const trigger = cfg.trigger ?? ''; + const capturePattern = cfg.capture ?? ''; + const timeout = cfg.timeout ?? 60; + const waitForDone = cfg.waitForDone ?? true; + const returnEvents = cfg.returnEvents ?? false; + + if (!capturePattern || !page) return data; + + // Step 1: Install streaming interceptor BEFORE trigger + await page.installStreamingInterceptor(capturePattern); + + // Step 2: Execute the trigger action + if (trigger.startsWith('navigate:')) { + const url = render(trigger.slice('navigate:'.length), { args, data }); + await page.goto(String(url)); + } else if (trigger.startsWith('evaluate:')) { + const js = trigger.slice('evaluate:'.length); + await page.evaluate(normalizeEvaluateSource(render(js, { args, data }) as string)); + } else if (trigger.startsWith('click:')) { + const ref = render(trigger.slice('click:'.length), { args, data }); + await page.click(String(ref).replace(/^@/, '')); + } else if (trigger === 'scroll') { + await page.scroll('down'); + } + + // Step 3: Wait for streaming data + await page.waitForStreamCapture(timeout, { waitForDone }); + + // Step 4: Read accumulated data + const result = await page.getStreamedResponses(); + + if (returnEvents && result.events.length > 0) { + return result.events; + } + return result.text; +} diff --git a/src/types.ts b/src/types.ts index 412fe7cc..e412e50a 100644 --- a/src/types.ts +++ b/src/types.ts @@ -66,6 +66,9 @@ export interface IPage { installInterceptor(pattern: string): Promise; getInterceptedRequests(): Promise; waitForCapture(timeout?: number): Promise; + installStreamingInterceptor(pattern: string): Promise; + getStreamedResponses(opts?: { clear?: boolean }): Promise<{ text: string; events: any[]; done: boolean; errors: any[] }>; + waitForStreamCapture(timeout?: number, opts?: { minChars?: number; waitForDone?: boolean }): Promise; screenshot(options?: ScreenshotOptions): Promise; /** * Set local file paths on a file input element via CDP DOM.setFileInputFiles.