Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@athenna/queue",
"version": "5.28.0",
"version": "5.29.0",
"description": "The Athenna queue handler.",
"license": "MIT",
"author": "João Lenon <lenon@athenna.io>",
Expand Down
52 changes: 28 additions & 24 deletions src/drivers/AwsSqsDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -413,32 +413,36 @@ export class AwsSqsDriver extends Driver<SQSClient> {
heartbeatTimeout = undefined
}

try {
startHeartbeat()

await processor({
id: job.id,
attempts: job.attempts,
data: job.data
})

stopHeartbeat()
const workerJob = {
id: job.id,
attempts: job.attempts,
data: job.data
}

if (!AwsSqsDriver.ackedIds.has(job.id)) {
await this.changeJobVisibility(
job.id,
this.msToS(this.noAckDelayMs + requeueJitterMs)
)
await this.runScopedQueueProcessor(processor, workerJob, async () => {
try {
startHeartbeat()

await processor(workerJob)

stopHeartbeat()

if (!AwsSqsDriver.ackedIds.has(job.id)) {
await this.changeJobVisibility(
job.id,
this.msToS(this.noAckDelayMs + requeueJitterMs)
)
}
} catch (error) {
await new AwsSqsDriverExceptionHandler().handle({
job,
error,
driver: this,
stopHeartbeat,
requeueJitterMs
})
}
} catch (error) {
await new AwsSqsDriverExceptionHandler().handle({
job,
error,
driver: this,
stopHeartbeat,
requeueJitterMs
})
}
})
}

/**
Expand Down
68 changes: 36 additions & 32 deletions src/drivers/DatabaseDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,38 +326,42 @@ export class DatabaseDriver extends Driver<DatabaseImpl> {

DatabaseDriver.ackedIds.delete(job.id)

try {
await processor({
id: job.id,
attempts: job.attempts,
data: job.data
})

/**
* If the job still exists after processing, it means that the job was
* not processed for some reason, so we need to make it available again
* after a delay.
*/
if (!DatabaseDriver.ackedIds.has(job.id)) {
job.reservedUntil = null
job.availableAt = Date.now() + this.noAckDelayMs + requeueJitterMs

await this.client
.table(this.table)
.where('queue', this.queueName)
.where('id', job.id)
.update({
availableAt: job.availableAt,
reservedUntil: job.reservedUntil
})
}
} catch (error) {
await new DatabaseDriverExceptionHandler().handle({
job,
error,
driver: this,
requeueJitterMs
})
const workerJob = {
id: job.id,
attempts: job.attempts,
data: job.data
}

await this.runScopedQueueProcessor(processor, workerJob, async () => {
try {
await processor(workerJob)

/**
* If the job still exists after processing, it means that the job was
* not processed for some reason, so we need to make it available again
* after a delay.
*/
if (!DatabaseDriver.ackedIds.has(job.id)) {
job.reservedUntil = null
job.availableAt = Date.now() + this.noAckDelayMs + requeueJitterMs

await this.client
.table(this.table)
.where('queue', this.queueName)
.where('id', job.id)
.update({
availableAt: job.availableAt,
reservedUntil: job.reservedUntil
})
}
} catch (error) {
await new DatabaseDriverExceptionHandler().handle({
job,
error,
driver: this,
requeueJitterMs
})
}
})
}
}
25 changes: 25 additions & 0 deletions src/drivers/Driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ import { Utils } from '#src/utils'
import { Config } from '@athenna/config'
import type { ConnectionOptions } from '#src/types'

export const RUN_WITH_WORKER_CONTEXT = Symbol.for(
'@athenna/queue.runWithWorkerContext'
)

export type ScopedQueueProcessor<T = unknown> = ((data: T) => any | Promise<any>) & {
[RUN_WITH_WORKER_CONTEXT]?: (
data: T,
callback: () => any | Promise<any>
) => any | Promise<any>
}

export abstract class Driver<Client = any> {
/**
* Set if this instance is connected.
Expand Down Expand Up @@ -164,6 +175,20 @@ export abstract class Driver<Client = any> {
return random
}

protected runScopedQueueProcessor<T>(
processor: ScopedQueueProcessor<T>,
data: T,
callback: () => any | Promise<any>
) {
const runner = processor[RUN_WITH_WORKER_CONTEXT]

if (runner) {
return runner(data, callback)
}

return callback()
}

/**
* Connect to client.
*/
Expand Down
15 changes: 15 additions & 0 deletions src/drivers/FakeDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { Log } from '@athenna/logger'
import { Json, Options } from '@athenna/common'
import type { ConnectionOptions } from '#src/types'
import { ConnectionFactory } from '#src/factories/ConnectionFactory'
import { RUN_WITH_WORKER_CONTEXT, type ScopedQueueProcessor } from '#src/drivers/Driver'

export class FakeDriver {
public constructor(connection?: string, client?: any) {
Expand Down Expand Up @@ -224,6 +225,20 @@ export class FakeDriver {
return 0
}

protected runScopedQueueProcessor<T>(
processor: ScopedQueueProcessor<T>,
data: T,
callback: () => any | Promise<any>
) {
const runner = processor[RUN_WITH_WORKER_CONTEXT]

if (runner) {
return runner(data, callback)
}

return callback()
}

/**
* Process the next job of the queue with a handler.
*
Expand Down
50 changes: 27 additions & 23 deletions src/drivers/MemoryDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,29 +264,33 @@ export class MemoryDriver extends Driver {
job.attempts--
job.reservedUntil = Date.now() + this.visibilityTimeout

try {
await processor({
id: job.id,
attempts: job.attempts,
data: job.data
})

/**
* If the job still exists after processing, it means that the job was
* not processed for some reason, so we need to make it available again
* after a delay.
*/
if (!MemoryDriver.ackedIds.has(job.id)) {
job.reservedUntil = null
job.availableAt = Date.now() + this.noAckDelayMs + requeueJitterMs
}
} catch (error) {
await new MemoryDriverExceptionHandler().handle({
job,
error,
driver: this,
requeueJitterMs
})
const workerJob = {
id: job.id,
attempts: job.attempts,
data: job.data
}

await this.runScopedQueueProcessor(processor, workerJob, async () => {
try {
await processor(workerJob)

/**
* If the job still exists after processing, it means that the job was
* not processed for some reason, so we need to make it available again
* after a delay.
*/
if (!MemoryDriver.ackedIds.has(job.id)) {
job.reservedUntil = null
job.availableAt = Date.now() + this.noAckDelayMs + requeueJitterMs
}
} catch (error) {
await new MemoryDriverExceptionHandler().handle({
job,
error,
driver: this,
requeueJitterMs
})
}
})
}
}
3 changes: 1 addition & 2 deletions src/queue/QueueImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ import type { Job, ConnectionOptions } from '#src/types'
import type { FakeDriver } from '#src/drivers/FakeDriver'
import type { AwsSqsDriver } from '#src/drivers/AwsSqsDriver'
import type { MemoryDriver } from '#src/drivers/MemoryDriver'
import type { Driver as DriverImpl } from '#src/drivers/Driver'
import type { DatabaseDriver } from '#src/drivers/DatabaseDriver'
import { ConnectionFactory } from '#src/factories/ConnectionFactory'

export class QueueImpl<Driver extends DriverImpl = any> extends Macroable {
export class QueueImpl<Driver = any> extends Macroable {
/**
* The connection name used for this instance.
*/
Expand Down
Loading
Loading