Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/lib/workers/parquetWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type { ChunkMessage, ClientMessage, CompleteMessage, PageMessage, Parquet
import { fromToAsyncBuffer } from './utils.js'

const cache = new Map<string, Promise<AsyncBuffer>>()
const aborted = new Set<number>()

function postCompleteMessage ({ queryId, rows }: Omit<CompleteMessage, 'kind'>) {
self.postMessage({ kind: 'onComplete', queryId, rows })
Expand All @@ -29,30 +30,41 @@ function postParquetQueryResultMessage ({ queryId, rows }: Omit<ParquetQueryReso
}

self.onmessage = async ({ data }: { data: ClientMessage }) => {
if (data.kind === 'abort') {
aborted.add(data.queryId)
return
}
const { queryId, from, kind, options } = data
const file = await fromToAsyncBuffer(from, cache)
try {
if (kind === 'parquetReadObjects') {
const rows = (await parquetReadObjects({ ...options, rowFormat: 'object', file, compressors, onChunk, onPage })) as Rows
if (aborted.delete(queryId)) return
postParquetReadObjectsResultMessage({ queryId, rows })
} else if (kind === 'parquetQuery') {
const rows = await parquetQuery({ ...options, file, compressors, onChunk, onPage })
if (aborted.delete(queryId)) return
postParquetQueryResultMessage({ queryId, rows })
} else {
await parquetRead({ ...options, rowFormat: 'object', file, compressors, onComplete, onChunk, onPage })
if (aborted.delete(queryId)) return
postParquetReadResultMessage({ queryId })
}
} catch (error) {
if (aborted.delete(queryId)) return
postErrorMessage({ error: error as Error, queryId })
}

function onComplete(rows: Rows) {
if (aborted.has(queryId)) return
postCompleteMessage({ queryId, rows })
}
function onChunk(chunk: ColumnData) {
if (aborted.has(queryId)) return
postChunkMessage({ chunk, queryId })
}
function onPage(page: SubColumnData) {
if (aborted.has(queryId)) return
postPageMessage({ page, queryId })
}
}
28 changes: 25 additions & 3 deletions src/lib/workers/parquetWorkerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,25 @@ function getWorker() {
return worker
}

/** Wires an AbortSignal to the queryId: posts an abort message to the worker
* and rejects the promise. The worker will suppress the result for this id. */
function wireAbort(worker: Worker, queryId: number, signal: AbortSignal | undefined, reject: (e: Error) => void): boolean {
if (!signal) return false
if (signal.aborted) {
pendingAgents.delete(queryId)
worker.postMessage({ queryId, kind: 'abort' } satisfies ClientMessage)
reject(new DOMException('Aborted', 'AbortError'))
return true
}
signal.addEventListener('abort', () => {
if (!pendingAgents.has(queryId)) return
pendingAgents.delete(queryId)
worker.postMessage({ queryId, kind: 'abort' } satisfies ClientMessage)
reject(new DOMException('Aborted', 'AbortError'))
}, { once: true })
return false
}

/**
* Presents almost the same interface as parquetRead, but runs in a worker.
* This is useful for reading large parquet files without blocking the main thread.
Expand All @@ -78,11 +97,12 @@ function getWorker() {
* Note that it only supports 'rowFormat: object' (the default).
*/
export function parquetReadWorker(options: ParquetReadWorkerOptions): Promise<void> {
const { onComplete, onChunk, onPage, from, ...serializableOptions } = options
const { onComplete, onChunk, onPage, from, signal, ...serializableOptions } = options
return new Promise((resolve, reject) => {
const queryId = nextQueryId++
pendingAgents.set(queryId, { parquetReadResolve: resolve, reject, onComplete, onChunk, onPage })
const worker = getWorker()
if (wireAbort(worker, queryId, signal, reject)) return
const message: ClientMessage = { queryId, from, kind: 'parquetRead', options: serializableOptions }
worker.postMessage(message)
})
Expand All @@ -98,11 +118,12 @@ export function parquetReadWorker(options: ParquetReadWorkerOptions): Promise<vo
* Note that it only supports 'rowFormat: object' (the default).
*/
export function parquetReadObjectsWorker(options: ParquetReadObjectsWorkerOptions): Promise<Rows> {
const { onChunk, onPage, from, ...serializableOptions } = options
const { onChunk, onPage, from, signal, ...serializableOptions } = options
return new Promise((resolve, reject) => {
const queryId = nextQueryId++
pendingAgents.set(queryId, { parquetReadObjectsResolve: resolve, reject, onChunk, onPage })
const worker = getWorker()
if (wireAbort(worker, queryId, signal, reject)) return
const message: ClientMessage = { queryId, from, kind: 'parquetReadObjects', options: serializableOptions }
worker.postMessage(message)
})
Expand All @@ -118,11 +139,12 @@ export function parquetReadObjectsWorker(options: ParquetReadObjectsWorkerOption
* Note that it only supports 'rowFormat: object' (the default).
*/
export function parquetQueryWorker(options: ParquetQueryWorkerOptions): Promise<Rows> {
const { onComplete, onChunk, onPage, from, ...serializableOptions } = options
const { onComplete, onChunk, onPage, from, signal, ...serializableOptions } = options
return new Promise((resolve, reject) => {
const queryId = nextQueryId++
pendingAgents.set(queryId, { parquetQueryResolve: resolve, reject, onComplete, onChunk, onPage })
const worker = getWorker()
if (wireAbort(worker, queryId, signal, reject)) return
const message: ClientMessage = { queryId, from, kind: 'parquetQuery', options: serializableOptions }
worker.postMessage(message)
})
Expand Down
17 changes: 13 additions & 4 deletions src/lib/workers/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ export interface ParquetReadWorkerOptions extends Omit<ParquetReadOptions, 'comp
// rowFormat 'array' is not supported in the worker.
rowFormat?: 'object'
onComplete?: (rows: Rows) => void
/**
* Aborting the signal posts an abort message to the worker. The in-flight
* read continues to completion (hyparquet has no AbortSignal support), but
* the result is suppressed and the returned promise rejects with AbortError.
*/
signal?: AbortSignal
}
/**
* Options for the worker version of parquetReadObjects
Expand Down Expand Up @@ -64,17 +70,20 @@ export interface From {
}
export interface ParquetReadClientMessage extends QueryId, From {
kind: 'parquetRead'
options: Omit<ParquetReadWorkerOptions, 'onComplete' | 'onChunk' | 'onPage' | 'from'>
options: Omit<ParquetReadWorkerOptions, 'onComplete' | 'onChunk' | 'onPage' | 'from' | 'signal'>
}
export interface ParquetReadObjectsClientMessage extends QueryId, From {
kind: 'parquetReadObjects'
options: Omit<ParquetReadObjectsWorkerOptions, 'onChunk' | 'onPage'| 'from'>
options: Omit<ParquetReadObjectsWorkerOptions, 'onChunk' | 'onPage'| 'from' | 'signal'>
}
export interface ParquetQueryClientMessage extends QueryId, From {
kind: 'parquetQuery'
options: Omit<ParquetQueryWorkerOptions, 'onComplete' | 'onChunk' | 'onPage'| 'from'>
options: Omit<ParquetQueryWorkerOptions, 'onComplete' | 'onChunk' | 'onPage'| 'from' | 'signal'>
}
export type ClientMessage = ParquetQueryClientMessage | ParquetReadObjectsClientMessage | ParquetReadClientMessage
export interface AbortClientMessage extends QueryId {
kind: 'abort'
}
export type ClientMessage = ParquetQueryClientMessage | ParquetReadObjectsClientMessage | ParquetReadClientMessage | AbortClientMessage

/**
* Messages sent by the worker to the client
Expand Down