Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions configurer/worker
Original file line number Diff line number Diff line change
@@ -1,20 +1,4 @@
export default {
/*
|--------------------------------------------------------------------------
| Configurations for cls-rtracer plugin.
|--------------------------------------------------------------------------
|
| This values defines all the configurations for cls-rtracer plugins. Check
| the documentation for more information:
|
| https://github.com/puzpuzpuz/cls-rtracer
|
*/

rTracer: {
enabled: true
},

/*
|--------------------------------------------------------------------------
| Log worker tasks
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@athenna/queue",
"version": "5.30.0",
"version": "5.31.0",
"description": "The Athenna queue handler.",
"license": "MIT",
"author": "João Lenon <lenon@athenna.io>",
Expand Down
5 changes: 4 additions & 1 deletion src/drivers/AwsSqsDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { Is, Options, Uuid } from '@athenna/common'
import type { ConnectionOptions } from '#src/types'
import { ConnectionFactory } from '#src/factories/ConnectionFactory'
import { QueueExecutionScope } from '#src/worker/QueueExecutionScope'
import { QueueJobPropagationHelper } from '#src/helpers/QueueJobPropagationHelper'
import { AwsSqsDriverExceptionHandler } from '#src/handlers/AwsSqsDriverExceptionHandler'
import { NotFifoSqsQueueTypeException } from '#src/exceptions/NotFifoSqsQueueTypeException'

Expand Down Expand Up @@ -433,14 +434,16 @@ export class AwsSqsDriver extends Driver<SQSClient> {
data: job.data
}

const executionJob = QueueJobPropagationHelper.getJob(workerJob)

await this.runScopedQueueProcessor(
processor,
workerJob,
async () => {
try {
startHeartbeat()

await processor(workerJob)
await processor(executionJob)

stopHeartbeat()

Expand Down
4 changes: 3 additions & 1 deletion src/drivers/DatabaseDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { Is, Options } from '@athenna/common'
import type { ConnectionOptions } from '#src/types'
import type { DatabaseImpl } from '@athenna/database'
import { ConnectionFactory } from '#src/factories/ConnectionFactory'
import { QueueJobPropagationHelper } from '#src/helpers/QueueJobPropagationHelper'
import { DatabaseDriverExceptionHandler } from '#src/handlers/DatabaseDriverExceptionHandler'

export class DatabaseDriver extends Driver<DatabaseImpl> {
Expand Down Expand Up @@ -331,10 +332,11 @@ export class DatabaseDriver extends Driver<DatabaseImpl> {
attempts: job.attempts,
data: job.data
}
const executionJob = QueueJobPropagationHelper.getJob(workerJob)

await this.runScopedQueueProcessor(processor, workerJob, async () => {
try {
await processor(workerJob)
await processor(executionJob)

/**
* If the job still exists after processing, it means that the job was
Expand Down
37 changes: 30 additions & 7 deletions src/drivers/Driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { Is } from '@athenna/common'
import { Config } from '@athenna/config'
import type { Job, ConnectionOptions } from '#src/types'
import { QueueExecutionScope } from '#src/worker/QueueExecutionScope'
import { QueueJobPropagationHelper } from '#src/helpers/QueueJobPropagationHelper'

export const RUN_WITH_WORKER_CONTEXT = Symbol.for(
'@athenna/queue.runWithWorkerContext'
Expand Down Expand Up @@ -199,13 +200,19 @@ export abstract class Driver<Client = any> {
return runner(data, callback, captureScope)
}

const scope = new QueueExecutionScope<T>({
name: this.queueName,
connection: this.connection,
options: this.options,
traceId: null,
job: this.createContextJob(data)
})
const scope = new QueueExecutionScope<T>(
{
name: this.queueName,
connection: this.connection,
options: this.options,
traceId: null,
job: QueueJobPropagationHelper.getJob(this.createContextJob(data))
},
{
carrier: this.getJobCarrier(data),
currentContextValues: this.getJobCurrentContextValues(data)
}
)

captureScope?.(scope)

Expand All @@ -224,6 +231,22 @@ export abstract class Driver<Client = any> {
} as Job
}

private getJobCarrier<T>(data: T) {
if (!this.isJob(data)) {
return {}
}

return QueueJobPropagationHelper.getCarrier(data.data)
}

private getJobCurrentContextValues<T>(data: T) {
if (!this.isJob(data)) {
return {}
}

return QueueJobPropagationHelper.getCurrentContextValues(data.data)
}

private isJob(data: unknown): data is Job {
if (!data || !Is.Object(data)) {
return false
Expand Down
50 changes: 38 additions & 12 deletions src/drivers/FakeDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { Is, Json, Options } from '@athenna/common'
import type { Job, ConnectionOptions } from '#src/types'
import { ConnectionFactory } from '#src/factories/ConnectionFactory'
import { QueueExecutionScope } from '#src/worker/QueueExecutionScope'
import { QueueJobPropagationHelper } from '#src/helpers/QueueJobPropagationHelper'
import {
RUN_WITH_WORKER_CONTEXT,
type ScopedQueueProcessor
Expand Down Expand Up @@ -171,8 +172,8 @@ export class FakeDriver {
* const user = await Queue.pop()
* ```
*/
public static async pop(): Promise<any> {
return {}
public static async pop<T = any>(): Promise<T> {
return null as T
}

/**
Expand All @@ -185,8 +186,8 @@ export class FakeDriver {
* const user = await Queue.pop()
* ```
*/
public static async peek(): Promise<any> {
return {}
public static async peek<T = any>(): Promise<T> {
return null as T
}

/**
Expand Down Expand Up @@ -282,13 +283,19 @@ export class FakeDriver {
return runner(data, callback, captureScope)
}

const scope = new QueueExecutionScope<T>({
name: this.queueName,
connection: this.connection,
options: this.options,
traceId: null,
job: this.createContextJob(data)
})
const scope = new QueueExecutionScope<T>(
{
name: this.queueName,
connection: this.connection,
options: this.options,
traceId: null,
job: QueueJobPropagationHelper.getJob(this.createContextJob(data))
},
{
carrier: this.getJobCarrier(data),
currentContextValues: this.getJobCurrentContextValues(data)
}
)

captureScope?.(scope)

Expand All @@ -311,10 +318,13 @@ export class FakeDriver {
processor: (data: unknown) => any | Promise<any>
) {
const data = await this.pop()
const executionData = this.isJob(data)
? QueueJobPropagationHelper.getJob(data)
: data

await this.runScopedQueueProcessor(processor, data, async () => {
try {
await processor(data)
await processor(executionData)
} catch (err) {
Log.channelOrVanilla('exception').error({
msg: `failed to process job: ${err.message}`,
Expand All @@ -331,4 +341,20 @@ export class FakeDriver {
}
})
}

private static getJobCarrier<T>(data: T) {
if (!this.isJob(data)) {
return {}
}

return QueueJobPropagationHelper.getCarrier(data.data)
}

private static getJobCurrentContextValues<T>(data: T) {
if (!this.isJob(data)) {
return {}
}

return QueueJobPropagationHelper.getCurrentContextValues(data.data)
}
}
4 changes: 3 additions & 1 deletion src/drivers/MemoryDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { Driver } from '#src/drivers/Driver'
import { Options, Uuid } from '@athenna/common'
import type { ConnectionOptions } from '#src/types'
import { ConnectionFactory } from '#src/factories/ConnectionFactory'
import { QueueJobPropagationHelper } from '#src/helpers/QueueJobPropagationHelper'
import { MemoryDriverExceptionHandler } from '#src/handlers/MemoryDriverExceptionHandler'

export class MemoryDriver extends Driver {
Expand Down Expand Up @@ -269,10 +270,11 @@ export class MemoryDriver extends Driver {
attempts: job.attempts,
data: job.data
}
const executionJob = QueueJobPropagationHelper.getJob(workerJob)

await this.runScopedQueueProcessor(processor, workerJob, async () => {
try {
await processor(workerJob)
await processor(executionJob)

/**
* If the job still exists after processing, it means that the job was
Expand Down
Loading
Loading