From b782bfa538246af3fcfa9fbc06c1b8351c37bfff Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Thu, 12 Feb 2026 10:26:37 -0700 Subject: [PATCH 01/34] fix(LogFailedJobsInterceptor): Insert nulls when no exception information --- interceptors/LogFailedJobsInterceptor.cfc | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/interceptors/LogFailedJobsInterceptor.cfc b/interceptors/LogFailedJobsInterceptor.cfc index 3cbcb9c..26f1057 100644 --- a/interceptors/LogFailedJobsInterceptor.cfc +++ b/interceptors/LogFailedJobsInterceptor.cfc @@ -44,7 +44,12 @@ component { "null" : ( arguments.data.exception.type ?: "" ) == "", "nulls" : ( arguments.data.exception.type ?: "" ) == "" }, - "exceptionMessage" : arguments.data.exception.message, + "exceptionMessage" : { + "value": arguments.data.exception.message ?: "", + "cfsqltype" : "CF_SQL_VARCHAR", + "null" : ( arguments.data.exception.message ?: "" ) == "", + "nulls" : ( arguments.data.exception.message ?: "" ) == "" + }, "exceptionDetail" : { "value" : arguments.data.exception.detail ?: "", "cfsqltype" : "CF_SQL_VARCHAR", @@ -57,8 +62,13 @@ component { "null" : ( arguments.data.exception.extendedInfo ?: "" ) == "", "nulls" : ( arguments.data.exception.extendedInfo ?: "" ) == "" }, - "exceptionStackTrace" : arguments.data.exception.stackTrace, - "exception" : serializeJSON( arguments.data.exception ), + "exceptionStackTrace" : { + "value": arguments.data.exception.stackTrace ?: "", + "cfsqltype" : "CF_SQL_VARCHAR", + "null" : ( arguments.data.exception.stackTrace ?: "" ) == "", + "nulls" : ( arguments.data.exception.stackTrace ?: "" ) == "" + }, + "exception" : isNull( arguments.data.exception ) ? javacast( "null", "" ) : serializeJSON( arguments.data.exception ), "failedDate" : { "value": getCurrentUnixTimestamp(), "cfsqltype": "CF_SQL_BIGINT" }, "originalId" : { "value": arguments.data.job.getId(), "cfsqltype": "CF_SQL_VARCHAR" } }; From f61250da9e44b5e216e714d9a0ccd461b433456a Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Thu, 12 Feb 2026 10:27:11 -0700 Subject: [PATCH 02/34] fix(ColdBoxAsyncProvider): Compose the `marshalJob` future with the delay future --- models/Providers/ColdBoxAsyncProvider.cfc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models/Providers/ColdBoxAsyncProvider.cfc b/models/Providers/ColdBoxAsyncProvider.cfc index 2e82802..e511ea0 100644 --- a/models/Providers/ColdBoxAsyncProvider.cfc +++ b/models/Providers/ColdBoxAsyncProvider.cfc @@ -23,7 +23,7 @@ component accessors="true" extends="AbstractQueueProvider" { sleep( delay * 1000 ); return true; }, workerPool.getExecutor() ) - .then( function() { + .thenCompose( function() { job.setId( createUUID() ); if ( !isNull( arguments.currentAttempt ) ) { job.setCurrentAttempt( attempts ); From a48980234c82fe3f4785ebd362aa73ca3f4269dc Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Thu, 12 Feb 2026 10:55:13 -0700 Subject: [PATCH 03/34] test: reproduce missing batch finally dispatch on terminal failure --- tests/specs/integration/BatchFinallySpec.cfc | 62 ++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 tests/specs/integration/BatchFinallySpec.cfc diff --git a/tests/specs/integration/BatchFinallySpec.cfc b/tests/specs/integration/BatchFinallySpec.cfc new file mode 100644 index 0000000..4d93093 --- /dev/null +++ b/tests/specs/integration/BatchFinallySpec.cfc @@ -0,0 +1,62 @@ +component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { + + function run() { + describe( "batch finally dispatching", function() { + beforeEach( function() { + structDelete( application, "jobBeforeCalled" ); + structDelete( application, "jobAfterCalled" ); + + param application.jobBeforeCalled = false; + param application.jobAfterCalled = false; + } ); + + it( "dispatches the finally job when the last job fails", function() { + var cbq = getWireBox().getInstance( "@cbq" ); + registerSyncConnectionAndWorkerPool(); + + var successJob = cbq.job( "SendWelcomeEmailJob" ); + var failingJob = cbq.job( + job = "ReleaseTestJob", + maxAttempts = 1 + ); + + var pendingBatch = cbq + .batch( [ successJob, failingJob ] ) + .onConnection( "syncBatch" ) + .onComplete( + job = "BeforeAndAfterJob", + connection = "syncBatch" + ); + + try { + pendingBatch.dispatch(); + } catch ( any e ) { + // The sync provider rethrows the terminal failure. + } + + expect( application.jobAfterCalled ) + .toBeTrue( "The `finally` job should dispatch even when the last job fails." ); + } ); + } ); + } + + private void function registerSyncConnectionAndWorkerPool() { + var config = getWireBox().getInstance( "Config@cbq" ); + + if ( !config.getConnections().keyExists( "syncBatch" ) ) { + config.registerConnection( + name = "syncBatch", + provider = getWireBox().getInstance( "SyncProvider@cbq" ).setProperties( {} ) + ); + } + + if ( !config.getWorkerPools().keyExists( "syncBatch" ) ) { + config.registerWorkerPool( + name = "syncBatch", + connectionName = "syncBatch", + maxAttempts = 1 + ); + } + } + +} From 9038762a97121c9989ff4acc391eae75df820915 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Thu, 12 Feb 2026 10:59:22 -0700 Subject: [PATCH 04/34] fix: complete batches correctly when jobs end in failure --- models/Jobs/Batch.cfc | 1 + models/Jobs/DBBatchRepository.cfc | 49 ++++++++++++------- ...dd_successfulJobs_to_cbq_batches_table.cfc | 15 ++++++ tests/specs/integration/BatchFinallySpec.cfc | 22 ++++++++- 4 files changed, 69 insertions(+), 18 deletions(-) create mode 100644 resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc diff --git a/models/Jobs/Batch.cfc b/models/Jobs/Batch.cfc index e12d82f..b0ec34d 100644 --- a/models/Jobs/Batch.cfc +++ b/models/Jobs/Batch.cfc @@ -10,6 +10,7 @@ component accessors="true" { property name="totalJobs" type="numeric"; property name="pendingJobs" type="numeric"; property name="failedJobs" type="numeric"; + property name="successfulJobs" type="numeric"; property name="failedJobIds" type="array"; property name="options" type="struct"; property name="createdDate" type="numeric"; diff --git a/models/Jobs/DBBatchRepository.cfc b/models/Jobs/DBBatchRepository.cfc index 98f5282..f112c6c 100644 --- a/models/Jobs/DBBatchRepository.cfc +++ b/models/Jobs/DBBatchRepository.cfc @@ -11,7 +11,11 @@ component singleton accessors="true" { property name="batchTableName" default="cbq_batches"; public DBBatchRepository function init() { - variables.timeBasedUUIDGenerator = createObject( "java", "com.fasterxml.uuid.Generators" ).timeBasedGenerator(); + try { + variables.timeBasedUUIDGenerator = createObject( "java", "com.fasterxml.uuid.Generators" ).timeBasedGenerator(); + } catch ( any e ) { + variables.timeBasedUUIDGenerator = javacast( "null", "" ); + } variables.defaultQueryOptions = {}; return this; } @@ -51,7 +55,9 @@ component singleton accessors="true" { } public Batch function store( required PendingBatch batch ) { - var id = variables.timeBasedUUIDGenerator.generate().toString(); + var id = isNull( variables.timeBasedUUIDGenerator ) ? createUUID() : variables.timeBasedUUIDGenerator + .generate() + .toString(); qb.table( variables.batchTableName ) .insert( @@ -102,23 +108,29 @@ component singleton accessors="true" { throw( type = "cbq.BatchNotFound", message = "No batch found for id [#arguments.batchId#]" ); } + var updatedValues = { + "pendingJobs" : data.pendingJobs - 1, + "failedJobs" : data.failedJobs, + "failedJobIds" : serializeJSON( + deserializeJSON( data.failedJobIds ).filter( ( failedJobId ) => failedJobId != jobId ) + ) + }; + + if ( data.keyExists( "successfulJobs" ) ) { + updatedValues[ "successfulJobs" ] = data.successfulJobs + 1; + } + qb.table( variables.batchTableName ) .where( "id", arguments.batchId ) .update( - values = { - "pendingJobs" : data.pendingJobs - 1, - "failedJobs" : data.failedJobs, - "failedJobIds" : serializeJSON( - deserializeJSON( data.failedJobIds ).filter( ( failedJobId ) => failedJobId != jobId ) - ) - }, + values = updatedValues, options = variables.defaultQueryOptions ); return { "pendingJobs" : data.pendingJobs - 1, "failedJobs" : data.failedJobs, - "allJobsHaveRanExactlyOnce" : ( data.pendingJobs - 1 ) - data.failedJobs == 0 + "allJobsHaveRanExactlyOnce" : ( data.pendingJobs - 1 ) == 0 }; } } @@ -135,21 +147,23 @@ component singleton accessors="true" { throw( type = "cbq.BatchNotFound", message = "No batch found for id [#arguments.batchId#]" ); } + var updatedValues = { + "pendingJobs" : data.pendingJobs - 1, + "failedJobs" : data.failedJobs + 1, + "failedJobIds" : serializeJSON( deserializeJSON( data.failedJobIds ).append( arguments.jobId ) ) + }; + qb.table( variables.batchTableName ) .where( "id", arguments.batchId ) .update( - values = { - "pendingJobs" : data.pendingJobs, - "failedJobs" : data.failedJobs + 1, - "failedJobIds" : serializeJSON( deserializeJSON( data.failedJobIds ).append( arguments.jobId ) ) - }, + values = updatedValues, options = variables.defaultQueryOptions ); return { - "pendingJobs" : data.pendingJobs, + "pendingJobs" : data.pendingJobs - 1, "failedJobs" : data.failedJobs + 1, - "allJobsHaveRanExactlyOnce" : data.pendingJobs - ( data.failedJobs + 1 ) == 0 + "allJobsHaveRanExactlyOnce" : ( data.pendingJobs - 1 ) == 0 }; } } @@ -191,6 +205,7 @@ component singleton accessors="true" { batch.setTotalJobs( data.totalJobs ); batch.setPendingJobs( data.pendingJobs ); batch.setFailedJobs( data.failedJobs ); + batch.setSuccessfulJobs( data.keyExists( "successfulJobs" ) ? data.successfulJobs : 0 ); batch.setFailedJobIds( deserializeJSON( data.failedJobIds ) ); batch.setOptions( deserializeJSON( data.options ) ); batch.setCreatedDate( data.createdDate ); diff --git a/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc b/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc new file mode 100644 index 0000000..7f3cef7 --- /dev/null +++ b/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc @@ -0,0 +1,15 @@ +component { + + function up( schema ) { + schema.alter( "cbq_batches", ( t ) => { + t.unsignedInteger( "successfulJobs" ).default( 0 ); + } ); + } + + function down( schema ) { + schema.alter( "cbq_batches", ( t ) => { + t.dropColumn( "successfulJobs" ); + } ); + } + +} diff --git a/tests/specs/integration/BatchFinallySpec.cfc b/tests/specs/integration/BatchFinallySpec.cfc index 4d93093..7612cee 100644 --- a/tests/specs/integration/BatchFinallySpec.cfc +++ b/tests/specs/integration/BatchFinallySpec.cfc @@ -27,16 +27,36 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { job = "BeforeAndAfterJob", connection = "syncBatch" ); + pendingBatch.setName( "sync-failing-finally" ); try { pendingBatch.dispatch(); - } catch ( any e ) { + } catch ( cbq.MaxAttemptsReached e ) { // The sync provider rethrows the terminal failure. } expect( application.jobAfterCalled ) .toBeTrue( "The `finally` job should dispatch even when the last job fails." ); } ); + + it( "dispatches the finally job when all jobs succeed", function() { + var cbq = getWireBox().getInstance( "@cbq" ); + registerSyncConnectionAndWorkerPool(); + + var pendingBatch = cbq + .batch( [ cbq.job( "SendWelcomeEmailJob" ), cbq.job( "SendWelcomeEmailJob" ) ] ) + .onConnection( "syncBatch" ) + .onComplete( + job = "BeforeAndAfterJob", + connection = "syncBatch" + ); + pendingBatch.setName( "sync-success-finally" ); + + pendingBatch.dispatch(); + + expect( application.jobAfterCalled ) + .toBeTrue( "The `finally` job should dispatch when all batch jobs succeed." ); + } ); } ); } From 3d013832419523c8472ac9788f304fef6d8400e7 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Thu, 12 Feb 2026 11:09:48 -0700 Subject: [PATCH 05/34] fix: make batch name optional and nullable --- models/Jobs/DBBatchRepository.cfc | 6 +++++- ...1_01_000009_make_cbq_batches_name_nullable.cfc | 15 +++++++++++++++ tests/specs/integration/BatchFinallySpec.cfc | 2 -- 3 files changed, 20 insertions(+), 3 deletions(-) create mode 100644 resources/database/migrations/2000_01_01_000009_make_cbq_batches_name_nullable.cfc diff --git a/models/Jobs/DBBatchRepository.cfc b/models/Jobs/DBBatchRepository.cfc index f112c6c..eea433e 100644 --- a/models/Jobs/DBBatchRepository.cfc +++ b/models/Jobs/DBBatchRepository.cfc @@ -58,12 +58,16 @@ component singleton accessors="true" { var id = isNull( variables.timeBasedUUIDGenerator ) ? createUUID() : variables.timeBasedUUIDGenerator .generate() .toString(); + local.batchName = arguments.batch.getName(); + if ( isNull( local.batchName ) || !isSimpleValue( local.batchName ) ) { + local.batchName = ""; + } qb.table( variables.batchTableName ) .insert( values = { "id" : id, - "name" : arguments.batch.getName(), + "name" : local.batchName, "totalJobs" : 0, "pendingJobs" : 0, "failedJobs" : 0, diff --git a/resources/database/migrations/2000_01_01_000009_make_cbq_batches_name_nullable.cfc b/resources/database/migrations/2000_01_01_000009_make_cbq_batches_name_nullable.cfc new file mode 100644 index 0000000..7a2101f --- /dev/null +++ b/resources/database/migrations/2000_01_01_000009_make_cbq_batches_name_nullable.cfc @@ -0,0 +1,15 @@ +component { + + function up( schema, qb ) { + schema.alter( "cbq_batches", ( t ) => { + t.modifyColumn( "name", t.string( "name" ).nullable() ); + } ); + } + + function down( schema, qb ) { + schema.alter( "cbq_batches", ( t ) => { + t.modifyColumn( "name", t.string( "name" ) ); + } ); + } + +} diff --git a/tests/specs/integration/BatchFinallySpec.cfc b/tests/specs/integration/BatchFinallySpec.cfc index 7612cee..cd20e25 100644 --- a/tests/specs/integration/BatchFinallySpec.cfc +++ b/tests/specs/integration/BatchFinallySpec.cfc @@ -27,7 +27,6 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { job = "BeforeAndAfterJob", connection = "syncBatch" ); - pendingBatch.setName( "sync-failing-finally" ); try { pendingBatch.dispatch(); @@ -50,7 +49,6 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { job = "BeforeAndAfterJob", connection = "syncBatch" ); - pendingBatch.setName( "sync-success-finally" ); pendingBatch.dispatch(); From 96ed25ce58535e8bc0ea55e269b0dd306df04514 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Thu, 12 Feb 2026 11:16:55 -0700 Subject: [PATCH 06/34] test: load lib jars in test app and require time UUID generator --- models/Jobs/DBBatchRepository.cfc | 18 ++++++------------ tests/Application.cfc | 4 ++++ 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/models/Jobs/DBBatchRepository.cfc b/models/Jobs/DBBatchRepository.cfc index eea433e..da50225 100644 --- a/models/Jobs/DBBatchRepository.cfc +++ b/models/Jobs/DBBatchRepository.cfc @@ -11,11 +11,7 @@ component singleton accessors="true" { property name="batchTableName" default="cbq_batches"; public DBBatchRepository function init() { - try { - variables.timeBasedUUIDGenerator = createObject( "java", "com.fasterxml.uuid.Generators" ).timeBasedGenerator(); - } catch ( any e ) { - variables.timeBasedUUIDGenerator = javacast( "null", "" ); - } + variables.timeBasedUUIDGenerator = createObject( "java", "com.fasterxml.uuid.Generators" ).timeBasedGenerator(); variables.defaultQueryOptions = {}; return this; } @@ -55,19 +51,17 @@ component singleton accessors="true" { } public Batch function store( required PendingBatch batch ) { - var id = isNull( variables.timeBasedUUIDGenerator ) ? createUUID() : variables.timeBasedUUIDGenerator - .generate() - .toString(); - local.batchName = arguments.batch.getName(); - if ( isNull( local.batchName ) || !isSimpleValue( local.batchName ) ) { - local.batchName = ""; + var id = variables.timeBasedUUIDGenerator.generate().toString(); + var batchName = arguments.batch.getName(); + if ( isNull( batchName ) || !isSimpleValue( batchName ) ) { + batchName = ""; } qb.table( variables.batchTableName ) .insert( values = { "id" : id, - "name" : local.batchName, + "name" : batchName, "totalJobs" : 0, "pendingJobs" : 0, "failedJobs" : 0, diff --git a/tests/Application.cfc b/tests/Application.cfc index 63187e5..c8b5042 100644 --- a/tests/Application.cfc +++ b/tests/Application.cfc @@ -21,6 +21,10 @@ component { this.mappings[ "/testbox" ] = rootPath & "/testbox"; this.datasource = "cbq"; + this.javaSettings = { + "loadPaths" : [ rootPath & "/lib" ], + "reloadOnChange" : false + }; function onRequestStart() { createObject( "java", "java.lang.System" ).setProperty( "ENVIRONMENT", "testing" ); From cc7313197028eca2ae5ffd5b5cdfce433e5b9a73 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Thu, 12 Feb 2026 11:46:02 -0700 Subject: [PATCH 07/34] breaking: require successfulJobs and add batch count coverage --- models/Jobs/DBBatchRepository.cfc | 8 +- ...dd_successfulJobs_to_cbq_batches_table.cfc | 2 +- .../app/models/Jobs/AlwaysErrorJob.cfc | 10 ++ .../Jobs/RequestScopeBeforeAndAfterJob.cfc | 15 +++ tests/specs/integration/BatchFinallySpec.cfc | 16 +-- .../DBBatchRepositoryCountsSpec.cfc | 124 ++++++++++++++++++ 6 files changed, 161 insertions(+), 14 deletions(-) create mode 100644 tests/resources/app/models/Jobs/AlwaysErrorJob.cfc create mode 100644 tests/resources/app/models/Jobs/RequestScopeBeforeAndAfterJob.cfc create mode 100644 tests/specs/integration/DBBatchRepositoryCountsSpec.cfc diff --git a/models/Jobs/DBBatchRepository.cfc b/models/Jobs/DBBatchRepository.cfc index da50225..6ea7336 100644 --- a/models/Jobs/DBBatchRepository.cfc +++ b/models/Jobs/DBBatchRepository.cfc @@ -64,6 +64,7 @@ component singleton accessors="true" { "name" : batchName, "totalJobs" : 0, "pendingJobs" : 0, + "successfulJobs" : 0, "failedJobs" : 0, "failedJobIds" : "[]", "options" : serializeJSON( arguments.batch.getOptions() ), @@ -108,16 +109,13 @@ component singleton accessors="true" { var updatedValues = { "pendingJobs" : data.pendingJobs - 1, + "successfulJobs" : data.successfulJobs + 1, "failedJobs" : data.failedJobs, "failedJobIds" : serializeJSON( deserializeJSON( data.failedJobIds ).filter( ( failedJobId ) => failedJobId != jobId ) ) }; - if ( data.keyExists( "successfulJobs" ) ) { - updatedValues[ "successfulJobs" ] = data.successfulJobs + 1; - } - qb.table( variables.batchTableName ) .where( "id", arguments.batchId ) .update( @@ -203,7 +201,7 @@ component singleton accessors="true" { batch.setTotalJobs( data.totalJobs ); batch.setPendingJobs( data.pendingJobs ); batch.setFailedJobs( data.failedJobs ); - batch.setSuccessfulJobs( data.keyExists( "successfulJobs" ) ? data.successfulJobs : 0 ); + batch.setSuccessfulJobs( data.successfulJobs ); batch.setFailedJobIds( deserializeJSON( data.failedJobIds ) ); batch.setOptions( deserializeJSON( data.options ) ); batch.setCreatedDate( data.createdDate ); diff --git a/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc b/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc index 7f3cef7..17d0e9f 100644 --- a/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc +++ b/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc @@ -2,7 +2,7 @@ component { function up( schema ) { schema.alter( "cbq_batches", ( t ) => { - t.unsignedInteger( "successfulJobs" ).default( 0 ); + t.addColumn( t.unsignedInteger( "successfulJobs" ).default( 0 ) ); } ); } diff --git a/tests/resources/app/models/Jobs/AlwaysErrorJob.cfc b/tests/resources/app/models/Jobs/AlwaysErrorJob.cfc new file mode 100644 index 0000000..4bfcaf5 --- /dev/null +++ b/tests/resources/app/models/Jobs/AlwaysErrorJob.cfc @@ -0,0 +1,10 @@ +component extends="cbq.models.Jobs.AbstractJob" { + + function handle() { + throw( + type = "cbq.tests.AlwaysErrorJob", + message = "This job always errors for testing." + ); + } + +} diff --git a/tests/resources/app/models/Jobs/RequestScopeBeforeAndAfterJob.cfc b/tests/resources/app/models/Jobs/RequestScopeBeforeAndAfterJob.cfc new file mode 100644 index 0000000..b2b714d --- /dev/null +++ b/tests/resources/app/models/Jobs/RequestScopeBeforeAndAfterJob.cfc @@ -0,0 +1,15 @@ +component extends="cbq.models.Jobs.AbstractJob" { + + function handle() { + // do nothing + } + + function before() { + request.jobBeforeCalled = true; + } + + function after() { + request.jobAfterCalled = true; + } + +} diff --git a/tests/specs/integration/BatchFinallySpec.cfc b/tests/specs/integration/BatchFinallySpec.cfc index cd20e25..773d2b1 100644 --- a/tests/specs/integration/BatchFinallySpec.cfc +++ b/tests/specs/integration/BatchFinallySpec.cfc @@ -3,11 +3,11 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { function run() { describe( "batch finally dispatching", function() { beforeEach( function() { - structDelete( application, "jobBeforeCalled" ); - structDelete( application, "jobAfterCalled" ); + structDelete( request, "jobBeforeCalled" ); + structDelete( request, "jobAfterCalled" ); - param application.jobBeforeCalled = false; - param application.jobAfterCalled = false; + param request.jobBeforeCalled = false; + param request.jobAfterCalled = false; } ); it( "dispatches the finally job when the last job fails", function() { @@ -24,7 +24,7 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { .batch( [ successJob, failingJob ] ) .onConnection( "syncBatch" ) .onComplete( - job = "BeforeAndAfterJob", + job = "RequestScopeBeforeAndAfterJob", connection = "syncBatch" ); @@ -34,7 +34,7 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { // The sync provider rethrows the terminal failure. } - expect( application.jobAfterCalled ) + expect( request.jobAfterCalled ) .toBeTrue( "The `finally` job should dispatch even when the last job fails." ); } ); @@ -46,13 +46,13 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { .batch( [ cbq.job( "SendWelcomeEmailJob" ), cbq.job( "SendWelcomeEmailJob" ) ] ) .onConnection( "syncBatch" ) .onComplete( - job = "BeforeAndAfterJob", + job = "RequestScopeBeforeAndAfterJob", connection = "syncBatch" ); pendingBatch.dispatch(); - expect( application.jobAfterCalled ) + expect( request.jobAfterCalled ) .toBeTrue( "The `finally` job should dispatch when all batch jobs succeed." ); } ); } ); diff --git a/tests/specs/integration/DBBatchRepositoryCountsSpec.cfc b/tests/specs/integration/DBBatchRepositoryCountsSpec.cfc new file mode 100644 index 0000000..7505483 --- /dev/null +++ b/tests/specs/integration/DBBatchRepositoryCountsSpec.cfc @@ -0,0 +1,124 @@ +component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { + + function run() { + describe( "DBBatchRepository counts", function() { + it( "initializes successfulJobs for newly stored batches", function() { + var repository = getWireBox().getInstance( "DBBatchRepository@cbq" ); + var batch = repository.store( + getWireBox() + .getInstance( "@cbq" ) + .batch( [] ) + .allowFailures() + ); + + expect( batch.getSuccessfulJobs() ).toBe( 0 ); + } ); + + it( "successful jobs increment successfulJobs and decrement pendingJobs", function() { + var repository = getWireBox().getInstance( "DBBatchRepository@cbq" ); + var config = registerSyncConnectionAndWorkerPool(); + var batch = createTrackedBatch( repository, 1 ); + var provider = config.getConnection( "syncBatchCounts" ).getProvider(); + var pool = config.getWorkerPool( "syncBatchCounts" ); + + var job = getWireBox() + .getInstance( "@cbq" ) + .job( "SendWelcomeEmailJob" ) + .setId( createUUID() ) + .withBatchId( batch.getId() ); + + provider.marshalJob( job, pool ); + + var updatedBatch = repository.find( batch.getId() ); + + expect( updatedBatch.getPendingJobs() ).toBe( 0 ); + expect( updatedBatch.getFailedJobs() ).toBe( 0 ); + expect( updatedBatch.getSuccessfulJobs() ).toBe( 1 ); + } ); + + it( "retryable errors do not change pending, successful, or failed counts", function() { + var repository = getWireBox().getInstance( "DBBatchRepository@cbq" ); + var config = registerSyncConnectionAndWorkerPool(); + var batch = createTrackedBatch( repository, 1 ); + var provider = config.getConnection( "syncBatchCounts" ).getProvider(); + var pool = config.getWorkerPool( "syncBatchCounts" ); + + var job = getWireBox() + .getInstance( "@cbq" ) + .job( "AlwaysErrorJob" ) + .setId( createUUID() ) + .withBatchId( batch.getId() ) + .setCurrentAttempt( 1 ) + .setMaxAttempts( 2 ); + + expect( () => provider.marshalJob( job, pool ) ).toThrow( "cbq.SyncProviderJobFailed" ); + + var updatedBatch = repository.find( batch.getId() ); + + expect( updatedBatch.getPendingJobs() ).toBe( 1 ); + expect( updatedBatch.getSuccessfulJobs() ).toBe( 0 ); + expect( updatedBatch.getFailedJobs() ).toBe( 0 ); + expect( updatedBatch.getFailedJobIds() ).toBeEmpty(); + } ); + + it( "failed jobs increment failedJobs, append failedJobIds, and decrement pendingJobs", function() { + var repository = getWireBox().getInstance( "DBBatchRepository@cbq" ); + var config = registerSyncConnectionAndWorkerPool(); + var batch = createTrackedBatch( repository, 1 ); + var provider = config.getConnection( "syncBatchCounts" ).getProvider(); + var pool = config.getWorkerPool( "syncBatchCounts" ); + var failedJobId = createUUID(); + + var job = getWireBox() + .getInstance( "@cbq" ) + .job( "AlwaysErrorJob" ) + .setId( failedJobId ) + .withBatchId( batch.getId() ) + .setCurrentAttempt( 1 ) + .setMaxAttempts( 1 ); + + expect( () => provider.marshalJob( job, pool ) ).toThrow(); + + var updatedBatch = repository.find( batch.getId() ); + + expect( updatedBatch.getPendingJobs() ).toBe( 0 ); + expect( updatedBatch.getSuccessfulJobs() ).toBe( 0 ); + expect( updatedBatch.getFailedJobs() ).toBe( 1 ); + expect( updatedBatch.getFailedJobIds() ).toHaveLength( 1 ); + expect( updatedBatch.getFailedJobIds()[ 1 ] ).toBe( failedJobId ); + } ); + } ); + } + + private any function registerSyncConnectionAndWorkerPool() { + var config = getWireBox().getInstance( "Config@cbq" ); + + if ( !config.getConnections().keyExists( "syncBatchCounts" ) ) { + config.registerConnection( + name = "syncBatchCounts", + provider = getWireBox().getInstance( "SyncProvider@cbq" ).setProperties( {} ) + ); + } + + if ( !config.getWorkerPools().keyExists( "syncBatchCounts" ) ) { + config.registerWorkerPool( + name = "syncBatchCounts", + connectionName = "syncBatchCounts", + maxAttempts = 2 + ); + } + + return config; + } + + private any function createTrackedBatch( required any repository, required numeric totalJobs ) { + var pendingBatch = getWireBox() + .getInstance( "@cbq" ) + .batch( [] ) + .allowFailures(); + var batch = arguments.repository.store( pendingBatch ); + arguments.repository.incrementTotalJobs( batch.getId(), arguments.totalJobs ); + return arguments.repository.find( batch.getId() ); + } + +} From 772d034d6cc4c8ac162c5ea386417753a15b837c Mon Sep 17 00:00:00 2001 From: elpete <2583646+elpete@users.noreply.github.com> Date: Thu, 12 Feb 2026 18:48:14 +0000 Subject: [PATCH 08/34] Apply cfformat changes --- models/Jobs/DBBatchRepository.cfc | 10 ++------ tests/specs/integration/BatchFinallySpec.cfc | 26 +++++++------------- 2 files changed, 11 insertions(+), 25 deletions(-) diff --git a/models/Jobs/DBBatchRepository.cfc b/models/Jobs/DBBatchRepository.cfc index 6ea7336..4a6b193 100644 --- a/models/Jobs/DBBatchRepository.cfc +++ b/models/Jobs/DBBatchRepository.cfc @@ -118,10 +118,7 @@ component singleton accessors="true" { qb.table( variables.batchTableName ) .where( "id", arguments.batchId ) - .update( - values = updatedValues, - options = variables.defaultQueryOptions - ); + .update( values = updatedValues, options = variables.defaultQueryOptions ); return { "pendingJobs" : data.pendingJobs - 1, @@ -151,10 +148,7 @@ component singleton accessors="true" { qb.table( variables.batchTableName ) .where( "id", arguments.batchId ) - .update( - values = updatedValues, - options = variables.defaultQueryOptions - ); + .update( values = updatedValues, options = variables.defaultQueryOptions ); return { "pendingJobs" : data.pendingJobs - 1, diff --git a/tests/specs/integration/BatchFinallySpec.cfc b/tests/specs/integration/BatchFinallySpec.cfc index 773d2b1..9463d1a 100644 --- a/tests/specs/integration/BatchFinallySpec.cfc +++ b/tests/specs/integration/BatchFinallySpec.cfc @@ -15,18 +15,12 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { registerSyncConnectionAndWorkerPool(); var successJob = cbq.job( "SendWelcomeEmailJob" ); - var failingJob = cbq.job( - job = "ReleaseTestJob", - maxAttempts = 1 - ); + var failingJob = cbq.job( job = "ReleaseTestJob", maxAttempts = 1 ); var pendingBatch = cbq .batch( [ successJob, failingJob ] ) .onConnection( "syncBatch" ) - .onComplete( - job = "RequestScopeBeforeAndAfterJob", - connection = "syncBatch" - ); + .onComplete( job = "RequestScopeBeforeAndAfterJob", connection = "syncBatch" ); try { pendingBatch.dispatch(); @@ -34,8 +28,7 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { // The sync provider rethrows the terminal failure. } - expect( request.jobAfterCalled ) - .toBeTrue( "The `finally` job should dispatch even when the last job fails." ); + expect( request.jobAfterCalled ).toBeTrue( "The `finally` job should dispatch even when the last job fails." ); } ); it( "dispatches the finally job when all jobs succeed", function() { @@ -43,17 +36,16 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { registerSyncConnectionAndWorkerPool(); var pendingBatch = cbq - .batch( [ cbq.job( "SendWelcomeEmailJob" ), cbq.job( "SendWelcomeEmailJob" ) ] ) + .batch( [ + cbq.job( "SendWelcomeEmailJob" ), + cbq.job( "SendWelcomeEmailJob" ) + ] ) .onConnection( "syncBatch" ) - .onComplete( - job = "RequestScopeBeforeAndAfterJob", - connection = "syncBatch" - ); + .onComplete( job = "RequestScopeBeforeAndAfterJob", connection = "syncBatch" ); pendingBatch.dispatch(); - expect( request.jobAfterCalled ) - .toBeTrue( "The `finally` job should dispatch when all batch jobs succeed." ); + expect( request.jobAfterCalled ).toBeTrue( "The `finally` job should dispatch when all batch jobs succeed." ); } ); } ); } From 95aa831644fd8123e1d1f7bb1f575d67eeb1ed6a Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Thu, 12 Feb 2026 12:25:35 -0700 Subject: [PATCH 09/34] Do not change `failedJobIds` except for incrementing failed jobs --- models/Jobs/DBBatchRepository.cfc | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/models/Jobs/DBBatchRepository.cfc b/models/Jobs/DBBatchRepository.cfc index 4a6b193..b7ffee1 100644 --- a/models/Jobs/DBBatchRepository.cfc +++ b/models/Jobs/DBBatchRepository.cfc @@ -110,10 +110,7 @@ component singleton accessors="true" { var updatedValues = { "pendingJobs" : data.pendingJobs - 1, "successfulJobs" : data.successfulJobs + 1, - "failedJobs" : data.failedJobs, - "failedJobIds" : serializeJSON( - deserializeJSON( data.failedJobIds ).filter( ( failedJobId ) => failedJobId != jobId ) - ) + "failedJobs" : data.failedJobs }; qb.table( variables.batchTableName ) From 0c03791de17f20984d95f3e5059a2f205028469d Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Thu, 12 Feb 2026 13:47:17 -0700 Subject: [PATCH 10/34] v6.0.0-beta.1 --- box.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/box.json b/box.json index 6aeff2f..961e920 100644 --- a/box.json +++ b/box.json @@ -1,6 +1,6 @@ { "name":"cbq", - "version":"5.0.7", + "version":"6.0.0-beta.1", "author":"Eric Peterson ", "location":"forgeboxStorage", "homepage":"https://github.com/coldbox-modules/cbq", From 19a871e8e828a9b88a1b7ef961cb57e13cb9fc4b Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Mon, 6 Apr 2026 10:29:45 -0600 Subject: [PATCH 11/34] fix: handle complex stackTrace objects in LogFailedJobsInterceptor Lucee can return the `stackTrace` property on an exception as a Java array of StackTraceElement objects rather than a simple string. Guard against this by checking isSimpleValue() before comparing to "" and by serializing complex values to JSON before inserting. Co-Authored-By: Claude Sonnet 4.6 --- interceptors/LogFailedJobsInterceptor.cfc | 40 ++++++++++++++--------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/interceptors/LogFailedJobsInterceptor.cfc b/interceptors/LogFailedJobsInterceptor.cfc index 26f1057..fa6386f 100644 --- a/interceptors/LogFailedJobsInterceptor.cfc +++ b/interceptors/LogFailedJobsInterceptor.cfc @@ -45,7 +45,7 @@ component { "nulls" : ( arguments.data.exception.type ?: "" ) == "" }, "exceptionMessage" : { - "value": arguments.data.exception.message ?: "", + "value" : arguments.data.exception.message ?: "", "cfsqltype" : "CF_SQL_VARCHAR", "null" : ( arguments.data.exception.message ?: "" ) == "", "nulls" : ( arguments.data.exception.message ?: "" ) == "" @@ -63,27 +63,37 @@ component { "nulls" : ( arguments.data.exception.extendedInfo ?: "" ) == "" }, "exceptionStackTrace" : { - "value": arguments.data.exception.stackTrace ?: "", + "value" : isNull( arguments.data.exception.stackTrace ) ? "" : ( + isSimpleValue( arguments.data.exception.stackTrace ) ? arguments.data.exception.stackTrace : serializeJSON( + arguments.data.exception.stackTrace + ) + ), "cfsqltype" : "CF_SQL_VARCHAR", - "null" : ( arguments.data.exception.stackTrace ?: "" ) == "", - "nulls" : ( arguments.data.exception.stackTrace ?: "" ) == "" + "null" : isNull( arguments.data.exception.stackTrace ) || ( + isSimpleValue( arguments.data.exception.stackTrace ) && arguments.data.exception.stackTrace == "" + ), + "nulls" : isNull( arguments.data.exception.stackTrace ) || ( + isSimpleValue( arguments.data.exception.stackTrace ) && arguments.data.exception.stackTrace == "" + ) }, - "exception" : isNull( arguments.data.exception ) ? javacast( "null", "" ) : serializeJSON( arguments.data.exception ), - "failedDate" : { "value": getCurrentUnixTimestamp(), "cfsqltype": "CF_SQL_BIGINT" }, - "originalId" : { "value": arguments.data.job.getId(), "cfsqltype": "CF_SQL_VARCHAR" } + "exception" : isNull( arguments.data.exception ) ? javacast( "null", "" ) : serializeJSON( + arguments.data.exception + ), + "failedDate" : { + "value" : getCurrentUnixTimestamp(), + "cfsqltype" : "CF_SQL_BIGINT" + }, + "originalId" : { + "value" : arguments.data.job.getId(), + "cfsqltype" : "CF_SQL_VARCHAR" + } }; try { qb.table( variables.settings.logFailedJobsProperties.tableName ) - .insert( - values = logData, - options = options - ); + .insert( values = logData, options = options ); } catch ( any e ) { - log.error( "Failed to log failed job: #e.message#", { - "log": logData, - "exception": e - } ); + log.error( "Failed to log failed job: #e.message#", { "log" : logData, "exception" : e } ); rethrow; } } From d776aa0fc88d7b4a23555c8fb6b62cf0c5cb909a Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Mon, 6 Apr 2026 10:36:44 -0600 Subject: [PATCH 12/34] fix: use availableDate instead of reservedDate for timeout watcher reservedDate compared against pool.getTimeout() always used the pool's fixed 60s window, causing jobs with longer per-job timeouts (e.g. 300s) to be re-grabbed and have attempts incremented while still running. availableDate is already set to now + jobTimeout at reservation time, so comparing it against now correctly respects each job's actual timeout. Fixes both fetchPotentiallyOpenRecords and tryToLockRecords. Co-Authored-By: Claude Sonnet 4.6 --- models/Providers/DBProvider.cfc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/models/Providers/DBProvider.cfc b/models/Providers/DBProvider.cfc index 0b47a03..0b08c20 100644 --- a/models/Providers/DBProvider.cfc +++ b/models/Providers/DBProvider.cfc @@ -394,11 +394,11 @@ component accessors="true" extends="AbstractQueueProvider" { variables.getCurrentUnixTimestamp() ); } ); - // past the reserved date + // past the job's own timeout (availableDate was set to now + jobTimeout at reservation time) q1.orWhere( - "reservedDate", + "availableDate", "<=", - variables.getCurrentUnixTimestamp() - pool.getTimeout() + variables.getCurrentUnixTimestamp() ); // reserved by a worker but never released q1.orWhere( ( q3 ) => { @@ -449,9 +449,9 @@ component accessors="true" extends="AbstractQueueProvider" { q2.whereNull( "reservedDate" ); } ) .orWhere( - "reservedDate", + "availableDate", "<=", - variables.getCurrentUnixTimestamp() - pool.getTimeout() + variables.getCurrentUnixTimestamp() ); } ) .update( From 8b96166d727e9b8c8973ea0497e828b71dcc8265 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Mon, 6 Apr 2026 10:50:32 -0600 Subject: [PATCH 13/34] chore: add interceptors to cfformat scripts Co-Authored-By: Claude Sonnet 4.6 --- box.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/box.json b/box.json index 961e920..b2f1b58 100644 --- a/box.json +++ b/box.json @@ -31,9 +31,9 @@ "cfmigrations":"modules/cfmigrations/" }, "scripts":{ - "format":"cfformat run ModuleConfig.cfc,models/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc --overwrite", - "format:check":"cfformat check ModuleConfig.cfc,models/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc --verbose", - "format:watch":"cfformat watch ModuleConfig.cfc,models/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc", + "format":"cfformat run ModuleConfig.cfc,models/**/*.cfc,interceptors/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc --overwrite", + "format:check":"cfformat check ModuleConfig.cfc,models/**/*.cfc,interceptors/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc --verbose", + "format:watch":"cfformat watch ModuleConfig.cfc,models/**/*.cfc,interceptors/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc", "install:2021":"cfpm install document,feed,zip" }, "ignore":[ From 464126147f8e162ee2b0d0f219774f1583eb6483 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Mon, 6 Apr 2026 11:42:35 -0600 Subject: [PATCH 14/34] v6.0.0-beta.2 --- box.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/box.json b/box.json index b2f1b58..5dda323 100644 --- a/box.json +++ b/box.json @@ -1,6 +1,6 @@ { "name":"cbq", - "version":"6.0.0-beta.1", + "version":"6.0.0-beta.2", "author":"Eric Peterson ", "location":"forgeboxStorage", "homepage":"https://github.com/coldbox-modules/cbq", From d57e90778965b2556bef4f6a518b7258c6f90b1c Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Mon, 6 Apr 2026 15:54:04 -0600 Subject: [PATCH 15/34] test: verify timeout watcher respects job-specific timeout over pool timeout Three specs covering the core bug fix in DBProvider: - A reserved job within its own timeout is not re-grabbed - A reserved job whose timeout has expired is re-grabbed - A job past the pool timeout but within the job timeout is not re-grabbed Uses TestBox makePublic() to expose fetchPotentiallyOpenRecords for direct testing. Co-Authored-By: Claude Sonnet 4.6 --- .../DBProviderTimeoutWatcherSpec.cfc | 107 ++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 tests/specs/integration/Providers/DBProviderTimeoutWatcherSpec.cfc diff --git a/tests/specs/integration/Providers/DBProviderTimeoutWatcherSpec.cfc b/tests/specs/integration/Providers/DBProviderTimeoutWatcherSpec.cfc new file mode 100644 index 0000000..84a7d8a --- /dev/null +++ b/tests/specs/integration/Providers/DBProviderTimeoutWatcherSpec.cfc @@ -0,0 +1,107 @@ +component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { + + function run() { + describe( "DBProvider timeout watcher", function() { + beforeEach( function() { + variables.provider = getWireBox().getInstance( "DBProvider@cbq" ).setProperties( {} ); + makePublic( variables.provider, "fetchPotentiallyOpenRecords" ); + variables.pool = makeWorkerPool( variables.provider ); + // clean up any leftover test records + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .delete(); + } ); + + afterEach( function() { + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .delete(); + } ); + + it( "does not re-grab a reserved job that is still within its job-specific timeout", function() { + var job = getWireBox().getInstance( "SendWelcomeEmailJob" ); + variables.provider.push( "default", job ); + + // Simulate the job being reserved by another worker with a 300s job timeout + var otherWorkerUUID = createUUID(); + var now = javacast( "long", getTickCount() / 1000 ); + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .update( { + "reservedBy" : otherWorkerUUID, + "reservedDate" : now, + "availableDate" : now + 300 + } ); + + var ids = variables.provider.fetchPotentiallyOpenRecords( capacity = 10, pool = variables.pool ); + + expect( ids ).toBeEmpty( "A reserved job still within its job-specific timeout should not be re-grabbed" ); + } ); + + it( "re-grabs a reserved job whose job-specific timeout has expired", function() { + var job = getWireBox().getInstance( "SendWelcomeEmailJob" ); + variables.provider.push( "default", job ); + + // Simulate the job being reserved 310s ago with a 300s job timeout (availableDate now in the past) + var otherWorkerUUID = createUUID(); + var now = javacast( "long", getTickCount() / 1000 ); + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .update( { + "reservedBy" : otherWorkerUUID, + "reservedDate" : now - 310, + "availableDate" : now - 10 + } ); + + var ids = variables.provider.fetchPotentiallyOpenRecords( capacity = 10, pool = variables.pool ); + + expect( ids ).toHaveLength( + 1, + "A reserved job whose job-specific timeout has expired should be re-grabbed" + ); + } ); + + it( "does not re-grab a job using the pool timeout when the job-specific timeout is longer", function() { + // Core bug fix: pool timeout is 60s, job timeout is 300s. + // After 65s (past pool timeout, within job timeout), job should NOT be re-grabbed. + var job = getWireBox().getInstance( "SendWelcomeEmailJob" ); + variables.provider.push( "default", job ); + + var otherWorkerUUID = createUUID(); + var now = javacast( "long", getTickCount() / 1000 ); + // Reserved 65s ago, job timeout is 300s, so availableDate is still 235s in the future + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .update( { + "reservedBy" : otherWorkerUUID, + "reservedDate" : now - 65, + "availableDate" : now + 235 + } ); + + var ids = variables.provider.fetchPotentiallyOpenRecords( capacity = 10, pool = variables.pool ); + + expect( ids ).toBeEmpty( + "A job past the pool timeout but still within its job-specific timeout should not be re-grabbed" + ); + } ); + } ); + } + + private any function makeWorkerPool( required any provider ) { + var connection = getInstance( "QueueConnection@cbq" ) + .setName( "TestTimeoutWatcherConnection" ) + .setProvider( arguments.provider ); + + return getInstance( "WorkerPool@cbq" ) + .setName( "TestTimeoutWatcherPool" ) + .setConnection( connection ) + .setConnectionName( connection.getName() ) + .startWorkers(); + } + +} From e1af7a8868389054d0e123e73ed88fd4f5bf290c Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Mon, 6 Apr 2026 16:01:09 -0600 Subject: [PATCH 16/34] fix: set job attempt count in ColdBoxAsyncProvider and tighten tryToLockRecords guard ColdBoxAsyncProvider: The thenCompose closure referenced arguments.currentAttempt which does not exist in the closure scope, so setCurrentAttempt() never executed for retried jobs. Changed to check the captured attempts variable instead. DBProvider.tryToLockRecords: Added whereNotNull(reservedDate) guard to the availableDate OR branch, consistent with the same fix already applied to fetchPotentiallyOpenRecords. Ensures only genuinely timed-out reserved jobs are re-locked. Co-Authored-By: Claude Opus 4.6 --- models/Providers/ColdBoxAsyncProvider.cfc | 2 +- models/Providers/DBProvider.cfc | 26 ++++++++++++++--------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/models/Providers/ColdBoxAsyncProvider.cfc b/models/Providers/ColdBoxAsyncProvider.cfc index e511ea0..f8ea080 100644 --- a/models/Providers/ColdBoxAsyncProvider.cfc +++ b/models/Providers/ColdBoxAsyncProvider.cfc @@ -25,7 +25,7 @@ component accessors="true" extends="AbstractQueueProvider" { }, workerPool.getExecutor() ) .thenCompose( function() { job.setId( createUUID() ); - if ( !isNull( arguments.currentAttempt ) ) { + if ( attempts > 0 ) { job.setCurrentAttempt( attempts ); } return marshalJob( job, workerPool ); diff --git a/models/Providers/DBProvider.cfc b/models/Providers/DBProvider.cfc index 0b08c20..c32cb42 100644 --- a/models/Providers/DBProvider.cfc +++ b/models/Providers/DBProvider.cfc @@ -395,11 +395,14 @@ component accessors="true" extends="AbstractQueueProvider" { ); } ); // past the job's own timeout (availableDate was set to now + jobTimeout at reservation time) - q1.orWhere( - "availableDate", - "<=", - variables.getCurrentUnixTimestamp() - ); + q1.orWhere( ( q2 ) => { + q2.whereNotNull( "reservedDate" ) + .where( + "availableDate", + "<=", + variables.getCurrentUnixTimestamp() + ); + } ); // reserved by a worker but never released q1.orWhere( ( q3 ) => { q3.whereNull( "reservedDate" ) @@ -448,11 +451,14 @@ component accessors="true" extends="AbstractQueueProvider" { q2.whereNull( "reservedBy" ); q2.whereNull( "reservedDate" ); } ) - .orWhere( - "availableDate", - "<=", - variables.getCurrentUnixTimestamp() - ); + .orWhere( ( q2 ) => { + q2.whereNotNull( "reservedDate" ) + .where( + "availableDate", + "<=", + variables.getCurrentUnixTimestamp() + ); + } ); } ) .update( values = { From 96450f2d6dac770febcd82dc400803f051c50493 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Mon, 6 Apr 2026 16:03:23 -0600 Subject: [PATCH 17/34] 6.0.0-beta.3 --- box.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/box.json b/box.json index 5dda323..67b69f5 100644 --- a/box.json +++ b/box.json @@ -1,6 +1,6 @@ { "name":"cbq", - "version":"6.0.0-beta.2", + "version":"6.0.0-beta.3", "author":"Eric Peterson ", "location":"forgeboxStorage", "homepage":"https://github.com/coldbox-modules/cbq", From 0f504c61062115c63b599ab87049cbb4f9bf7c2f Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Mon, 6 Apr 2026 16:08:54 -0600 Subject: [PATCH 18/34] fix: remove skip locked from DB timeout watcher query --- models/Providers/DBProvider.cfc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models/Providers/DBProvider.cfc b/models/Providers/DBProvider.cfc index c32cb42..09158f9 100644 --- a/models/Providers/DBProvider.cfc +++ b/models/Providers/DBProvider.cfc @@ -377,7 +377,7 @@ component accessors="true" extends="AbstractQueueProvider" { var ids = newQuery() .from( variables.tableName ) .limit( arguments.capacity ) - .lockForUpdate( skipLocked = true ) + .lockForUpdate() .when( !shouldWorkAllQueues( arguments.pool ), ( q ) => q.whereIn( "queue", pool.getQueue() ) ) .where( ( q ) => { q.whereNull( "completedDate" ); From a57d7c4b14c39999f28cc73abf8bfb5f93c37f47 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Mon, 6 Apr 2026 16:12:04 -0600 Subject: [PATCH 19/34] chore: upgrade CI to MySQL 8 and re-enable skip locked --- .github/workflows/cron.yml | 6 +++--- .github/workflows/pr.yml | 6 +++--- .github/workflows/release.yml | 4 ++-- models/Providers/DBProvider.cfc | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/.github/workflows/cron.yml b/.github/workflows/cron.yml index 04d03e7..f6391ab 100644 --- a/.github/workflows/cron.yml +++ b/.github/workflows/cron.yml @@ -25,7 +25,7 @@ jobs: experimental: true services: mysql: - image: mysql:5.7 + image: mysql:8.0 env: MYSQL_RANDOM_ROOT_PASSWORD: yes MYSQL_USER: cbq @@ -33,7 +33,7 @@ jobs: MYSQL_DATABASE: cbq ports: - 3306 - options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 + options: --default-authentication-plugin=mysql_native_password --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 steps: - name: Checkout Repository uses: actions/checkout@v4.2.2 @@ -78,4 +78,4 @@ jobs: DB_PASSWORD: cbq DB_DATABASE: cbq DB_SCHEMA: cbq - run: box testbox run \ No newline at end of file + run: box testbox run diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index b66507b..5e9fa16 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -22,7 +22,7 @@ jobs: cfengine: ["lucee@5", "lucee@6", "adobe@2021", "adobe@2023", "adobe@2025"] services: mysql: - image: mysql:5.7 + image: mysql:8.0 env: MYSQL_RANDOM_ROOT_PASSWORD: yes MYSQL_USER: cbq @@ -30,7 +30,7 @@ jobs: MYSQL_DATABASE: cbq ports: - 3306 - options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 + options: --default-authentication-plugin=mysql_native_password --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 steps: - name: Checkout Repository uses: actions/checkout@v4.2.2 @@ -101,4 +101,4 @@ jobs: - name: Commit Format Changes uses: stefanzweifel/git-auto-commit-action@v5.2.0 with: - commit_message: Apply cfformat changes \ No newline at end of file + commit_message: Apply cfformat changes diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 15b250f..4b748d3 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -17,7 +17,7 @@ jobs: cfengine: ["lucee@5", "lucee@6", "adobe@2021", "adobe@2023", "adobe@2025"] services: mysql: - image: mysql:5.7 + image: mysql:8.0 env: MYSQL_RANDOM_ROOT_PASSWORD: yes MYSQL_USER: cbq @@ -25,7 +25,7 @@ jobs: MYSQL_DATABASE: cbq ports: - 3306 - options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 + options: --default-authentication-plugin=mysql_native_password --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 steps: - name: Checkout Repository uses: actions/checkout@v4.2.2 diff --git a/models/Providers/DBProvider.cfc b/models/Providers/DBProvider.cfc index 09158f9..c32cb42 100644 --- a/models/Providers/DBProvider.cfc +++ b/models/Providers/DBProvider.cfc @@ -377,7 +377,7 @@ component accessors="true" extends="AbstractQueueProvider" { var ids = newQuery() .from( variables.tableName ) .limit( arguments.capacity ) - .lockForUpdate() + .lockForUpdate( skipLocked = true ) .when( !shouldWorkAllQueues( arguments.pool ), ( q ) => q.whereIn( "queue", pool.getQueue() ) ) .where( ( q ) => { q.whereNull( "completedDate" ); From 6d30e6a54736a803622c3b1f9704f2c2a5e76cc4 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Mon, 6 Apr 2026 16:13:28 -0600 Subject: [PATCH 20/34] fix: remove invalid mysql docker flag in workflow services --- .github/workflows/cron.yml | 2 +- .github/workflows/pr.yml | 2 +- .github/workflows/release.yml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/cron.yml b/.github/workflows/cron.yml index f6391ab..321e311 100644 --- a/.github/workflows/cron.yml +++ b/.github/workflows/cron.yml @@ -33,7 +33,7 @@ jobs: MYSQL_DATABASE: cbq ports: - 3306 - options: --default-authentication-plugin=mysql_native_password --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 + options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 steps: - name: Checkout Repository uses: actions/checkout@v4.2.2 diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 5e9fa16..aad042d 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -30,7 +30,7 @@ jobs: MYSQL_DATABASE: cbq ports: - 3306 - options: --default-authentication-plugin=mysql_native_password --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 + options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 steps: - name: Checkout Repository uses: actions/checkout@v4.2.2 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 4b748d3..9903eb1 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -25,7 +25,7 @@ jobs: MYSQL_DATABASE: cbq ports: - 3306 - options: --default-authentication-plugin=mysql_native_password --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 + options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 steps: - name: Checkout Repository uses: actions/checkout@v4.2.2 From f51594fd3b97a5d9e8dc350722dad2cc2a939d6b Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Mon, 6 Apr 2026 16:18:30 -0600 Subject: [PATCH 21/34] fix: set mysql8 test user auth plugin via init script --- .github/mysql/init/01-auth-plugin.sql | 2 ++ .github/workflows/cron.yml | 2 ++ .github/workflows/pr.yml | 2 ++ .github/workflows/release.yml | 2 ++ 4 files changed, 8 insertions(+) create mode 100644 .github/mysql/init/01-auth-plugin.sql diff --git a/.github/mysql/init/01-auth-plugin.sql b/.github/mysql/init/01-auth-plugin.sql new file mode 100644 index 0000000..2dc2c52 --- /dev/null +++ b/.github/mysql/init/01-auth-plugin.sql @@ -0,0 +1,2 @@ +ALTER USER 'cbq'@'%' IDENTIFIED WITH mysql_native_password BY 'cbq'; +FLUSH PRIVILEGES; diff --git a/.github/workflows/cron.yml b/.github/workflows/cron.yml index 321e311..1c84746 100644 --- a/.github/workflows/cron.yml +++ b/.github/workflows/cron.yml @@ -31,6 +31,8 @@ jobs: MYSQL_USER: cbq MYSQL_PASSWORD: cbq MYSQL_DATABASE: cbq + volumes: + - ./.github/mysql/init/01-auth-plugin.sql:/docker-entrypoint-initdb.d/01-auth-plugin.sql:ro ports: - 3306 options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index aad042d..10f22c4 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -28,6 +28,8 @@ jobs: MYSQL_USER: cbq MYSQL_PASSWORD: cbq MYSQL_DATABASE: cbq + volumes: + - ./.github/mysql/init/01-auth-plugin.sql:/docker-entrypoint-initdb.d/01-auth-plugin.sql:ro ports: - 3306 options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 9903eb1..ec876a1 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -23,6 +23,8 @@ jobs: MYSQL_USER: cbq MYSQL_PASSWORD: cbq MYSQL_DATABASE: cbq + volumes: + - ./.github/mysql/init/01-auth-plugin.sql:/docker-entrypoint-initdb.d/01-auth-plugin.sql:ro ports: - 3306 options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 From 375ee0a074172c3b807ee4a7f881640ed9f62185 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Mon, 6 Apr 2026 16:20:11 -0600 Subject: [PATCH 22/34] fix: configure mysql8 auth plugin in workflow step --- .github/mysql/init/01-auth-plugin.sql | 2 -- .github/workflows/cron.yml | 8 +++++--- .github/workflows/pr.yml | 8 +++++--- .github/workflows/release.yml | 8 +++++--- 4 files changed, 15 insertions(+), 11 deletions(-) delete mode 100644 .github/mysql/init/01-auth-plugin.sql diff --git a/.github/mysql/init/01-auth-plugin.sql b/.github/mysql/init/01-auth-plugin.sql deleted file mode 100644 index 2dc2c52..0000000 --- a/.github/mysql/init/01-auth-plugin.sql +++ /dev/null @@ -1,2 +0,0 @@ -ALTER USER 'cbq'@'%' IDENTIFIED WITH mysql_native_password BY 'cbq'; -FLUSH PRIVILEGES; diff --git a/.github/workflows/cron.yml b/.github/workflows/cron.yml index 1c84746..da360e6 100644 --- a/.github/workflows/cron.yml +++ b/.github/workflows/cron.yml @@ -27,12 +27,10 @@ jobs: mysql: image: mysql:8.0 env: - MYSQL_RANDOM_ROOT_PASSWORD: yes + MYSQL_ROOT_PASSWORD: root MYSQL_USER: cbq MYSQL_PASSWORD: cbq MYSQL_DATABASE: cbq - volumes: - - ./.github/mysql/init/01-auth-plugin.sql:/docker-entrypoint-initdb.d/01-auth-plugin.sql:ro ports: - 3306 options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 @@ -58,6 +56,10 @@ jobs: box install ${{ matrix.coldbox }} --noSave box package list + - name: Configure MySQL user auth plugin + run: | + mysql -h 127.0.0.1 -P ${{ job.services.mysql.ports[3306] }} -uroot -proot -e "ALTER USER 'cbq'@'%' IDENTIFIED WITH mysql_native_password BY 'cbq'; FLUSH PRIVILEGES;" + - name: Start server env: DB_HOST: localhost diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 10f22c4..af546f5 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -24,12 +24,10 @@ jobs: mysql: image: mysql:8.0 env: - MYSQL_RANDOM_ROOT_PASSWORD: yes + MYSQL_ROOT_PASSWORD: root MYSQL_USER: cbq MYSQL_PASSWORD: cbq MYSQL_DATABASE: cbq - volumes: - - ./.github/mysql/init/01-auth-plugin.sql:/docker-entrypoint-initdb.d/01-auth-plugin.sql:ro ports: - 3306 options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 @@ -54,6 +52,10 @@ jobs: box config set modules.commandbox-dotenv.checkEnvPreServerStart=false box package list + - name: Configure MySQL user auth plugin + run: | + mysql -h 127.0.0.1 -P ${{ job.services.mysql.ports[3306] }} -uroot -proot -e "ALTER USER 'cbq'@'%' IDENTIFIED WITH mysql_native_password BY 'cbq'; FLUSH PRIVILEGES;" + - name: Start server env: DB_HOST: localhost diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index ec876a1..c2cc1e6 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -19,12 +19,10 @@ jobs: mysql: image: mysql:8.0 env: - MYSQL_RANDOM_ROOT_PASSWORD: yes + MYSQL_ROOT_PASSWORD: root MYSQL_USER: cbq MYSQL_PASSWORD: cbq MYSQL_DATABASE: cbq - volumes: - - ./.github/mysql/init/01-auth-plugin.sql:/docker-entrypoint-initdb.d/01-auth-plugin.sql:ro ports: - 3306 options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 @@ -49,6 +47,10 @@ jobs: box config set modules.commandbox-dotenv.checkEnvPreServerStart=false box package list + - name: Configure MySQL user auth plugin + run: | + mysql -h 127.0.0.1 -P ${{ job.services.mysql.ports[3306] }} -uroot -proot -e "ALTER USER 'cbq'@'%' IDENTIFIED WITH mysql_native_password BY 'cbq'; FLUSH PRIVILEGES;" + - name: Start server env: DB_HOST: localhost From 5c12a1a35c8a3515b4af7ae1665bef41f93a22d6 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Tue, 7 Apr 2026 12:12:50 -0600 Subject: [PATCH 23/34] fix: protect finally job dispatch from then/catch job failures If dispatchThenJobIfNeeded or dispatchCatchJobIfNeeded threw an exception (e.g. missing WireBox mapping, connection error), the finally job would never be dispatched. Wrap both in try/catch so dispatchFinallyJobIfNeeded always runs, matching the semantic contract of "finally". Co-Authored-By: Claude Opus 4.6 --- models/Jobs/Batch.cfc | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/models/Jobs/Batch.cfc b/models/Jobs/Batch.cfc index b0ec34d..52db039 100644 --- a/models/Jobs/Batch.cfc +++ b/models/Jobs/Batch.cfc @@ -3,6 +3,7 @@ component accessors="true" { property name="dispatcher" inject="provider:Dispatcher@cbq"; property name="cbq" inject="provider:@cbq"; property name="wirebox" inject="wirebox"; + property name="log" inject="logbox:logger:{this}"; property name="repository"; property name="id" type="string"; @@ -50,7 +51,12 @@ component accessors="true" { } getRepository().markAsFinished( variables.id ); - dispatchThenJobIfNeeded(); + + try { + dispatchThenJobIfNeeded(); + } catch ( any e ) { + log.error( "Failed to dispatch then job for batch [#variables.id#]: #e.message#", e ); + } if ( counts.allJobsHaveRanExactlyOnce ) { dispatchFinallyJobIfNeeded(); @@ -65,7 +71,11 @@ component accessors="true" { cancel(); } - dispatchCatchJobIfNeeded( arguments.error ); + try { + dispatchCatchJobIfNeeded( arguments.error ); + } catch ( any e ) { + log.error( "Failed to dispatch catch job for batch [#variables.id#]: #e.message#", e ); + } } if ( counts.allJobsHaveRanExactlyOnce ) { From 970b00c0490cc4a025da65d92637027a9fc0860a Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Tue, 7 Apr 2026 12:35:55 -0600 Subject: [PATCH 24/34] fix: correct excpetion typo in SyncProvider onFailure invocation The exception was passed to the job onFailure handler under the misspelled key "excpetion" instead of "exception", so any job defining onFailure( exception ) would receive an undefined argument. Aligns SyncProvider with the AbstractQueueProvider behaviour. Co-Authored-By: Claude Sonnet 4.6 --- models/Providers/SyncProvider.cfc | 2 +- .../app/models/Jobs/OnFailureCapturingJob.cfc | 15 +++++++ .../Providers/SyncProviderOnFailureSpec.cfc | 45 +++++++++++++++++++ 3 files changed, 61 insertions(+), 1 deletion(-) create mode 100644 tests/resources/app/models/Jobs/OnFailureCapturingJob.cfc create mode 100644 tests/specs/integration/Providers/SyncProviderOnFailureSpec.cfc diff --git a/models/Providers/SyncProvider.cfc b/models/Providers/SyncProvider.cfc index 9e58fd5..c7adaf9 100644 --- a/models/Providers/SyncProvider.cfc +++ b/models/Providers/SyncProvider.cfc @@ -139,7 +139,7 @@ component accessors="true" extends="AbstractQueueProvider" { invoke( job, "onFailure", - { "excpetion" : e } + { "exception" : e } ); } diff --git a/tests/resources/app/models/Jobs/OnFailureCapturingJob.cfc b/tests/resources/app/models/Jobs/OnFailureCapturingJob.cfc new file mode 100644 index 0000000..8f25325 --- /dev/null +++ b/tests/resources/app/models/Jobs/OnFailureCapturingJob.cfc @@ -0,0 +1,15 @@ +component extends="cbq.models.Jobs.AbstractJob" { + + function handle() { + throw( + type = "cbq.tests.OnFailureCapturingJob", + message = "This job always errors to test the onFailure exception argument." + ); + } + + function onFailure() { + application.onFailureExceptionReceived = !isNull( arguments.exception ); + application.onFailureExceptionIsExpcetion = structKeyExists( arguments, "excpetion" ); + } + +} diff --git a/tests/specs/integration/Providers/SyncProviderOnFailureSpec.cfc b/tests/specs/integration/Providers/SyncProviderOnFailureSpec.cfc new file mode 100644 index 0000000..62dc748 --- /dev/null +++ b/tests/specs/integration/Providers/SyncProviderOnFailureSpec.cfc @@ -0,0 +1,45 @@ +component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { + + function run() { + describe( "SyncProvider onFailure exception argument", function() { + beforeEach( function() { + structDelete( application, "onFailureExceptionReceived" ); + structDelete( application, "onFailureExceptionIsExpcetion" ); + } ); + + it( "passes the exception as 'exception' (not 'excpetion') to the onFailure handler", function() { + var provider = getWireBox().getInstance( "SyncProvider@cbq" ).setProperties( {} ); + var pool = makeWorkerPool( provider ); + var job = getInstance( "OnFailureCapturingJob" ) + .setId( createUUID() ) + .setCurrentAttempt( 1 ) + .setMaxAttempts( 1 ); + + param application.onFailureExceptionReceived = false; + param application.onFailureExceptionIsExpcetion = true; + + expect( () => provider.marshalJob( job, pool ) ).toThrow(); + + expect( application.onFailureExceptionReceived ).toBeTrue( + "onFailure should receive the exception under the key 'exception'" + ); + expect( application.onFailureExceptionIsExpcetion ).toBeFalse( + "onFailure should NOT receive the exception under the misspelled key 'excpetion'" + ); + } ); + } ); + } + + private any function makeWorkerPool( required any provider ) { + var connection = getInstance( "QueueConnection@cbq" ) + .setName( "TestOnFailureConnection" ) + .setProvider( arguments.provider ); + + return getInstance( "WorkerPool@cbq" ) + .setName( "TestOnFailurePool" ) + .setConnection( connection ) + .setConnectionName( connection.getName() ) + .startWorkers(); + } + +} From 5f78f33b13fa1645cedb85b425458332e6d57c54 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Fri, 17 Apr 2026 12:42:41 -0600 Subject: [PATCH 25/34] fix: guard against runaway retries in DBProvider pickup loop When the marshalJob future's exception handler fails to execute properly, jobs remain reserved with reservedDate set. Once the job timeout passes, fetchPotentiallyOpenRecords picks the row up again with no maxAttempts check, causing unbounded retries (observed: 29 attempts on a job configured for 3). Add a defensive guard at pickup time: before incrementing attempts and dispatching, compare the DB attempts column against maxAttempts. If exceeded, mark the job as failed and skip. Co-Authored-By: Claude Sonnet 4.6 --- models/Providers/DBProvider.cfc | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/models/Providers/DBProvider.cfc b/models/Providers/DBProvider.cfc index c32cb42..5269475 100644 --- a/models/Providers/DBProvider.cfc +++ b/models/Providers/DBProvider.cfc @@ -59,6 +59,23 @@ component accessors="true" extends="AbstractQueueProvider" { for ( var job in lockedRecords ) { var jobCFC = variables.deserializeJob( job.payload, job.id, job.attempts ); + + var jobMaxAttempts = getMaxAttemptsForJob( jobCFC, pool ); + if ( jobMaxAttempts != 0 && job.attempts >= jobMaxAttempts ) { + if ( log.canWarn() ) { + log.warn( + "Job ###jobCFC.getId()# already has #job.attempts# attempts (max #jobMaxAttempts#). Marking as failed without further retry.", + { "job" : jobCFC.getMemento() } + ); + } + markJobAsFailedById( job.id, pool ); + ensureFailedBatchJobIsRecorded( + jobCFC, + "Job exceeded maximum attempts (#jobMaxAttempts#) before execution." + ); + continue; + } + incrementJobAttempts( jobCFC, pool ); application.cbController.getModuleService().loadMappings(); variables.marshalJob( From db4eef54068b502ebd515a3fc8ed9ada9553f92c Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Fri, 17 Apr 2026 12:44:28 -0600 Subject: [PATCH 26/34] fix: harden marshalJob exception handler against swallowed failures MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The marshalJob future was fire-and-forget. Any exception inside .onException — releaseJob serialization, batch recording, an interceptor, the job's onFailure hook — was silently absorbed by the unobserved future, leaving the row reserved and triggering unbounded timeout-based re-pickups. Wrap each side-effect in its own try/catch so one failure cannot prevent the rest from running, and add an outer catch that calls a new forceFailJob hook as a last-resort terminal kill. forceFailJob is a no-op on AbstractQueueProvider and overridden on DBProvider with a minimal UPDATE that sets failedDate and clears the reservation. It deliberately skips bookkeeping (onFailure hook, interceptors, batch recording) — it only runs when the proper failure path itself throws, and the alternative is the row being retried until manually cleaned up. Co-Authored-By: Claude Sonnet 4.6 --- models/Providers/AbstractQueueProvider.cfc | 298 ++++++++++++++------- models/Providers/DBProvider.cfc | 22 ++ 2 files changed, 227 insertions(+), 93 deletions(-) diff --git a/models/Providers/AbstractQueueProvider.cfc b/models/Providers/AbstractQueueProvider.cfc index 46cd7ab..5f30fa0 100644 --- a/models/Providers/AbstractQueueProvider.cfc +++ b/models/Providers/AbstractQueueProvider.cfc @@ -170,116 +170,130 @@ component accessors="true" { } } ) .onException( function( e ) { - // log failed job - if ( !isNull( e ) && "java.util.concurrent.CompletionException" == e.getClass().getName() ) { - var rootCause = e.getCause(); - if ( !isNull( rootCause ) ) { - e = rootCause; - } - } - - if ( log.canError() ) { - log.error( - "Exception when running job: #e.message#", - { - "job" : job.getMemento(), - "exception" : isNull( e ) ? javacast( "null", "" ) : e + try { + // log failed job + if ( !isNull( e ) && "java.util.concurrent.CompletionException" == e.getClass().getName() ) { + var rootCause = e.getCause(); + if ( !isNull( rootCause ) ) { + e = rootCause; } - ); - } - - afterJobException( - job.getId(), - job, - pool, - isNull( e ) ? javacast( "null", "" ) : e - ); - - variables.interceptorService.announce( - "onCBQJobException", - { - "job" : job, - "pool" : pool, - "exception" : isNull( e ) ? javacast( "null", "" ) : e } - ); - var jobMaxAttempts = getMaxAttemptsForJob( job, pool ); - if ( job.getIsCancelled() ) { - if ( variables.log.canDebug() ) { - variables.log.debug( "Job [#job.getId()#] was cancelled. Reason: #job.getCancelledReason()#. Deleting job [#job.getId()#]." ); + if ( log.canError() ) { + log.error( + "Exception when running job: #e.message#", + { + "job" : job.getMemento(), + "exception" : isNull( e ) ? javacast( "null", "" ) : e + } + ); } - if ( structKeyExists( job, "onFailure" ) ) { - invoke( + try { + afterJobException( + job.getId(), job, - "onFailure", - { "exception" : isNull( e ) ? javacast( "null", "" ) : e } + pool, + isNull( e ) ? javacast( "null", "" ) : e + ); + } catch ( any sideEffectException ) { + logSideEffectFailure( + "afterJobException", + job, + sideEffectException ); } - variables.interceptorService.announce( - "onCBQJobFailed", - { - "job" : job, - "exception" : isNull( e ) ? javacast( "null", "" ) : e - } - ); - - afterJobFailed( - job.getId(), - job, - pool, - isNull( e ) ? javacast( "null", "" ) : e - ); - ensureFailedBatchJobIsRecorded( job, isNull( e ) ? javacast( "null", "" ) : e ); - - variables.log.debug( "Marked job ###job.getId()# as failed after being cancelled." ); - } else if ( jobMaxAttempts == 0 || job.getCurrentAttempt() < jobMaxAttempts ) { - if ( jobMaxAttempts == 0 && variables.log.canDebug() ) { - variables.log.debug( "Job ###job.getId()# has a maxAttempts of 0 and will always be released." ); - } - if ( variables.log.canDebug() ) { - variables.log.debug( "Releasing job ###job.getId()#" ); - } - releaseJob( job, pool ); - if ( variables.log.canDebug() ) { - variables.log.debug( "Released job ###job.getId()#" ); - } - } else { - if ( variables.log.canDebug() ) { - variables.log.debug( "Maximum attempts reached. Deleting job ###job.getId()#" ); - } - - if ( structKeyExists( job, "onFailure" ) ) { - invoke( + try { + variables.interceptorService.announce( + "onCBQJobException", + { + "job" : job, + "pool" : pool, + "exception" : isNull( e ) ? javacast( "null", "" ) : e + } + ); + } catch ( any sideEffectException ) { + logSideEffectFailure( + "onCBQJobException", job, - "onFailure", - { "exception" : isNull( e ) ? javacast( "null", "" ) : e } + sideEffectException ); } - variables.interceptorService.announce( - "onCBQJobFailed", - { - "job" : job, - "exception" : isNull( e ) ? javacast( "null", "" ) : e + var jobMaxAttempts = getMaxAttemptsForJob( job, pool ); + if ( job.getIsCancelled() ) { + if ( variables.log.canDebug() ) { + variables.log.debug( "Job [#job.getId()#] was cancelled. Reason: #job.getCancelledReason()#. Deleting job [#job.getId()#]." ); } - ); - - afterJobFailed( - job.getId(), - job, - pool, - isNull( e ) ? javacast( "null", "" ) : e - ); - ensureFailedBatchJobIsRecorded( job, isNull( e ) ? javacast( "null", "" ) : e ); - - variables.log.debug( "Marked job ###job.getId()# as failed after maximum failed attempts." ); + markJobFailed( + job, + pool, + isNull( e ) ? javacast( "null", "" ) : e + ); + variables.log.debug( "Marked job ###job.getId()# as failed after being cancelled." ); + } else if ( jobMaxAttempts == 0 || job.getCurrentAttempt() < jobMaxAttempts ) { + if ( jobMaxAttempts == 0 && variables.log.canDebug() ) { + variables.log.debug( "Job ###job.getId()# has a maxAttempts of 0 and will always be released." ); + } + if ( variables.log.canDebug() ) { + variables.log.debug( "Releasing job ###job.getId()#" ); + } + try { + releaseJob( job, pool ); + if ( variables.log.canDebug() ) { + variables.log.debug( "Released job ###job.getId()#" ); + } + } catch ( any releaseException ) { + log.error( + "releaseJob failed for job ###job.getId()#. Falling back to terminal failure to prevent timeout-based re-pickup.", + { + "job" : job.getMemento(), + "originalException" : isNull( e ) ? javacast( "null", "" ) : e, + "releaseException" : releaseException + } + ); + markJobFailed( + job, + pool, + isNull( e ) ? javacast( "null", "" ) : e + ); + } + } else { + if ( variables.log.canDebug() ) { + variables.log.debug( "Maximum attempts reached. Deleting job ###job.getId()#" ); + } + markJobFailed( + job, + pool, + isNull( e ) ? javacast( "null", "" ) : e + ); + variables.log.debug( "Marked job ###job.getId()# as failed after maximum failed attempts." ); + } + } catch ( any outerException ) { + try { + log.error( + "Critical: .onException handler failed for job ###job.getId()#. Forcing terminal failure.", + { + "job" : job.getMemento(), + "originalException" : isNull( e ) ? javacast( "null", "" ) : e, + "handlerException" : outerException + } + ); + } catch ( any ignored ) { + } + try { + forceFailJob( job.getId(), pool ); + } catch ( any ignored ) { + } } if ( !isNull( afterJobHook ) && ( isCustomFunction( afterJobHook ) || isClosure( afterJobHook ) ) ) { - afterJobHook( job, pool ); + try { + afterJobHook( job, pool ); + } catch ( any hookException ) { + logSideEffectFailure( "afterJobHook", job, hookException ); + } } } ); } @@ -310,6 +324,104 @@ component accessors="true" { return; } + /** + * Last-resort terminal failure for a job. Providers should override this with + * a minimal, bulletproof write that takes the job out of any retry loop, even + * when the normal afterJobFailed path has already failed. Skips bookkeeping + * (onFailure hook, interceptors, batch recording) by design — it only runs + * when those paths themselves are broken. + */ + public void function forceFailJob( required any id, WorkerPool pool ) { + return; + } + + private void function markJobFailed( + required AbstractJob job, + required WorkerPool pool, + any exception + ) { + if ( structKeyExists( arguments.job, "onFailure" ) ) { + try { + invoke( + arguments.job, + "onFailure", + { "exception" : isNull( arguments.exception ) ? javacast( "null", "" ) : arguments.exception } + ); + } catch ( any sideEffectException ) { + logSideEffectFailure( + "onFailure", + arguments.job, + sideEffectException + ); + } + } + + try { + variables.interceptorService.announce( + "onCBQJobFailed", + { + "job" : arguments.job, + "exception" : isNull( arguments.exception ) ? javacast( "null", "" ) : arguments.exception + } + ); + } catch ( any sideEffectException ) { + logSideEffectFailure( + "onCBQJobFailed", + arguments.job, + sideEffectException + ); + } + + try { + afterJobFailed( + arguments.job.getId(), + arguments.job, + arguments.pool, + isNull( arguments.exception ) ? javacast( "null", "" ) : arguments.exception + ); + } catch ( any sideEffectException ) { + logSideEffectFailure( + "afterJobFailed", + arguments.job, + sideEffectException + ); + // afterJobFailed is what writes failedDate in DBProvider; if it threw, fall back. + try { + forceFailJob( arguments.job.getId(), arguments.pool ); + } catch ( any ignored ) { + } + } + + try { + ensureFailedBatchJobIsRecorded( + arguments.job, + isNull( arguments.exception ) ? javacast( "null", "" ) : arguments.exception + ); + } catch ( any sideEffectException ) { + logSideEffectFailure( + "ensureFailedBatchJobIsRecorded", + arguments.job, + sideEffectException + ); + } + } + + private void function logSideEffectFailure( + required string stage, + required AbstractJob job, + required any exception + ) { + try { + if ( log.canError() ) { + log.error( + "Side effect [#arguments.stage#] threw for job ###arguments.job.getId()#: #arguments.exception.message#", + { "exception" : arguments.exception } + ); + } + } catch ( any ignored ) { + } + } + public void function releaseJob( required AbstractJob job, required WorkerPool pool ) { arguments.job.setCurrentAttempt( arguments.job.getCurrentAttempt() + 1 ); push( diff --git a/models/Providers/DBProvider.cfc b/models/Providers/DBProvider.cfc index 5269475..19a00bd 100644 --- a/models/Providers/DBProvider.cfc +++ b/models/Providers/DBProvider.cfc @@ -340,6 +340,28 @@ component accessors="true" extends="AbstractQueueProvider" { .update( values = { "completedDate" : getCurrentUnixTimestamp() }, options = variables.defaultQueryOptions ); } + public void function forceFailJob( required any id, WorkerPool pool ) { + newQuery() + .table( variables.tableName ) + .where( "id", arguments.id ) + .update( + values = { + "failedDate" : getCurrentUnixTimestamp(), + "reservedBy" : { + "value" : "", + "null" : true, + "nulls" : true + }, + "reservedDate" : { + "value" : "", + "null" : true, + "nulls" : true + } + }, + options = variables.defaultQueryOptions + ); + } + private void function markJobAsFailedById( required numeric id, WorkerPool pool ) { newQuery() .table( variables.tableName ) From ca588c264959f594307872ff9a8eddddda7b6ae6 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Fri, 17 Apr 2026 19:56:48 -0600 Subject: [PATCH 27/34] refactor: extract processLockedRecord for testability and add max-attempts integration tests The inline loop body is pulled into a private processLockedRecord method so integration tests can make it public and exercise the maxAttempts pre-flight guard in isolation. Co-Authored-By: Claude Sonnet 4.6 --- models/Providers/DBProvider.cfc | 63 ++--- .../Providers/DBProviderMaxAttemptsSpec.cfc | 243 ++++++++++++++++++ 2 files changed, 277 insertions(+), 29 deletions(-) create mode 100644 tests/specs/integration/Providers/DBProviderMaxAttemptsSpec.cfc diff --git a/models/Providers/DBProvider.cfc b/models/Providers/DBProvider.cfc index 19a00bd..654f111 100644 --- a/models/Providers/DBProvider.cfc +++ b/models/Providers/DBProvider.cfc @@ -58,35 +58,7 @@ component accessors="true" extends="AbstractQueueProvider" { var lockedRecords = fetchLockedRecords( capacity, pool ); for ( var job in lockedRecords ) { - var jobCFC = variables.deserializeJob( job.payload, job.id, job.attempts ); - - var jobMaxAttempts = getMaxAttemptsForJob( jobCFC, pool ); - if ( jobMaxAttempts != 0 && job.attempts >= jobMaxAttempts ) { - if ( log.canWarn() ) { - log.warn( - "Job ###jobCFC.getId()# already has #job.attempts# attempts (max #jobMaxAttempts#). Marking as failed without further retry.", - { "job" : jobCFC.getMemento() } - ); - } - markJobAsFailedById( job.id, pool ); - ensureFailedBatchJobIsRecorded( - jobCFC, - "Job exceeded maximum attempts (#jobMaxAttempts#) before execution." - ); - continue; - } - - incrementJobAttempts( jobCFC, pool ); - application.cbController.getModuleService().loadMappings(); - variables.marshalJob( - job = jobCFC, - pool = pool, - afterJobHook = () => { - // variables.log.debug( "Job finished. Immediately running the scheduled task again." ); - // forceRun = true; - // task.run(); - } - ); + processLockedRecord( job, pool ); } return lockedRecords.len(); @@ -292,6 +264,39 @@ component accessors="true" extends="AbstractQueueProvider" { return true; } + private void function processLockedRecord( required any record, required WorkerPool pool ) { + var jobCFC = variables.deserializeJob( + arguments.record.payload, + arguments.record.id, + arguments.record.attempts + ); + + var jobMaxAttempts = getMaxAttemptsForJob( jobCFC, arguments.pool ); + if ( jobMaxAttempts != 0 && arguments.record.attempts >= jobMaxAttempts ) { + if ( log.canWarn() ) { + log.warn( + "Job ###jobCFC.getId()# already has #arguments.record.attempts# attempts (max #jobMaxAttempts#). Marking as failed without further retry.", + { "job" : jobCFC.getMemento() } + ); + } + markJobAsFailedById( arguments.record.id, arguments.pool ); + ensureFailedBatchJobIsRecorded( + jobCFC, + "Job exceeded maximum attempts (#jobMaxAttempts#) before execution." + ); + return; + } + + incrementJobAttempts( jobCFC, arguments.pool ); + application.cbController.getModuleService().loadMappings(); + variables.marshalJob( + job = jobCFC, + pool = arguments.pool, + afterJobHook = () => { + } + ); + } + private void function incrementJobAttempts( required AbstractJob job, required WorkerPool pool ) { if ( log.canDebug() ) { log.debug( "Reserving job ###arguments.job.getId()#" ); diff --git a/tests/specs/integration/Providers/DBProviderMaxAttemptsSpec.cfc b/tests/specs/integration/Providers/DBProviderMaxAttemptsSpec.cfc new file mode 100644 index 0000000..4dce371 --- /dev/null +++ b/tests/specs/integration/Providers/DBProviderMaxAttemptsSpec.cfc @@ -0,0 +1,243 @@ +component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { + + function run() { + describe( "DBProvider maxAttempts safeguards", function() { + beforeEach( function() { + variables.provider = getWireBox().getInstance( "DBProvider@cbq" ).setProperties( {} ); + makePublic( variables.provider, "processLockedRecord" ); + variables.pool = makeWorkerPool( variables.provider ); + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .delete(); + } ); + + afterEach( function() { + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .delete(); + } ); + + it( "forceFailJob sets failedDate and clears the reservation", function() { + var job = getWireBox().getInstance( "SendWelcomeEmailJob" ).setMaxAttempts( 3 ); + variables.provider.push( "default", job ); + + var now = javacast( "long", getTickCount() / 1000 ); + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .update( { + "reservedBy" : variables.pool.getUniqueId(), + "reservedDate" : now, + "availableDate" : now + 60, + "attempts" : 5 + } ); + + var jobId = variables.provider + .newQuery() + .from( "cbq_jobs" ) + .value( "id" ); + + variables.provider.forceFailJob( jobId, variables.pool ); + + var row = variables.provider + .newQuery() + .from( "cbq_jobs" ) + .where( "id", jobId ) + .first(); + + expect( row.failedDate ).notToBeNull( "failedDate should be set" ); + expect( row.failedDate ).toBeGT( 0, "failedDate should be a unix timestamp" ); + expect( row.reservedBy ?: "" ).toBe( "", "reservedBy should be cleared" ); + expect( row.reservedDate ?: "" ).toBe( "", "reservedDate should be cleared" ); + } ); + + it( "skips dispatch and marks the job failed when attempts already meets maxAttempts", function() { + var job = getWireBox().getInstance( "AlwaysErrorJob" ).setMaxAttempts( 3 ); + variables.provider.push( "default", job ); + + var now = javacast( "long", getTickCount() / 1000 ); + // Simulate the runaway state: 29 attempts in DB, payload still says maxAttempts=3, + // reserved by this pool but reservedDate was never set (the symptom we observed). + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .update( { + "reservedBy" : variables.pool.getUniqueId(), + "reservedDate" : { + "value" : "", + "null" : true, + "nulls" : true + }, + "availableDate" : now - 1, + "attempts" : 29 + } ); + + var record = variables.provider + .newQuery() + .from( "cbq_jobs" ) + .first(); + + prepareMock( variables.provider ); + variables.provider.$( "incrementJobAttempts" ); + variables.provider.$( "marshalJob" ); + + variables.provider.processLockedRecord( record, variables.pool ); + + expect( variables.provider.$never( "incrementJobAttempts" ) ).toBeTrue( + "incrementJobAttempts must not run once attempts >= maxAttempts" + ); + expect( variables.provider.$never( "marshalJob" ) ).toBeTrue( + "marshalJob must not run once attempts >= maxAttempts" + ); + + var row = variables.provider + .newQuery() + .from( "cbq_jobs" ) + .where( "id", record.id ) + .first(); + expect( row.failedDate ).notToBeNull( "the runaway job should be marked failed" ); + } ); + + it( "still proceeds normally when attempts is below maxAttempts", function() { + var job = getWireBox().getInstance( "SendWelcomeEmailJob" ).setMaxAttempts( 3 ); + variables.provider.push( "default", job ); + + var now = javacast( "long", getTickCount() / 1000 ); + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .update( { + "reservedBy" : variables.pool.getUniqueId(), + "reservedDate" : { + "value" : "", + "null" : true, + "nulls" : true + }, + "availableDate" : now - 1, + "attempts" : 1 + } ); + + var record = variables.provider + .newQuery() + .from( "cbq_jobs" ) + .first(); + + prepareMock( variables.provider ); + variables.provider.$( "incrementJobAttempts" ); + variables.provider.$( "marshalJob" ); + + variables.provider.processLockedRecord( record, variables.pool ); + + expect( variables.provider.$once( "incrementJobAttempts" ) ).toBeTrue( + "incrementJobAttempts should run when attempts < maxAttempts" + ); + expect( variables.provider.$once( "marshalJob" ) ).toBeTrue( + "marshalJob should run when attempts < maxAttempts" + ); + } ); + + it( "still marks the row failed when releaseJob throws inside the exception handler", function() { + // Regression: previously, if releaseJob threw inside .onException, the + // future swallowed the secondary exception and the row stayed reserved, + // causing unbounded timeout-based re-pickups. + var job = getWireBox() + .getInstance( "AlwaysErrorJob" ) + .setMaxAttempts( 5 ) + .setCurrentAttempt( 0 ) + .setId( randRange( 1, 1000 ) ); + + variables.provider.push( "default", job ); + var jobId = reserveJobForPool(); + job.setId( jobId ); + + prepareMock( variables.provider ); + variables.provider + .$( "releaseJob" ) + .$throws( type = "TestSimulatedFailure", message = "simulated releaseJob failure" ); + + try { + var jobFuture = variables.provider.marshalJob( job, variables.pool ); + if ( !isNull( jobFuture ) ) { + jobFuture.get(); + } + } catch ( any e ) { + } + + var row = variables.provider + .newQuery() + .from( "cbq_jobs" ) + .where( "id", jobId ) + .first(); + expect( row.failedDate ?: "" ).notToBe( + "", + "the row must be marked failed even when releaseJob throws, otherwise the timeout watcher will retry it forever" + ); + } ); + + it( "falls back to forceFailJob when even afterJobFailed throws", function() { + // Floor of the defense: if the proper failure-recording path is broken, + // markJobFailed should escalate to forceFailJob to guarantee the row exits + // the retry loop. + var job = getWireBox() + .getInstance( "AlwaysErrorJob" ) + .setMaxAttempts( 1 ) + .setCurrentAttempt( 0 ) + .setId( randRange( 1, 1000 ) ); + + variables.provider.push( "default", job ); + var jobId = reserveJobForPool(); + job.setId( jobId ); + + prepareMock( variables.provider ); + makePublic( variables.provider, "afterJobFailed" ); + variables.provider + .$( "afterJobFailed" ) + .$throws( type = "TestSimulatedFailure", message = "simulated afterJobFailed failure" ); + variables.provider.$( "forceFailJob" ); + + try { + var jobFuture = variables.provider.marshalJob( job, variables.pool ); + if ( !isNull( jobFuture ) ) { + jobFuture.get(); + } + } catch ( any e ) { + } + + expect( variables.provider.$atLeast( 1, "forceFailJob" ) ).toBeTrue( + "forceFailJob must run when afterJobFailed throws" + ); + } ); + } ); + } + + private numeric function reserveJobForPool() { + var now = javacast( "long", getTickCount() / 1000 ); + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .update( { + "reservedBy" : variables.pool.getUniqueId(), + "reservedDate" : now, + "availableDate" : now + 60 + } ); + return variables.provider + .newQuery() + .from( "cbq_jobs" ) + .value( "id" ); + } + + private any function makeWorkerPool( required any provider ) { + var connection = getInstance( "QueueConnection@cbq" ) + .setName( "TestMaxAttemptsConnection" ) + .setProvider( arguments.provider ); + + return getInstance( "WorkerPool@cbq" ) + .setName( "TestMaxAttemptsPool" ) + .setConnection( connection ) + .setConnectionName( connection.getName() ) + .startWorkers(); + } + +} From df54db653238750615cd16b77cdbf0eea8c8d69e Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Fri, 17 Apr 2026 20:29:16 -0600 Subject: [PATCH 28/34] fix: guard releaseJob-failure log call so markJobFailed always runs If log.error threw in the async thread (e.g. when serialising a MockBox exception struct), markJobFailed was silently skipped, leaving the row reserved and the timeout watcher free to re-pick it up indefinitely. Wrapping the log call in its own try/catch ensures the fallback path to markJobFailed is always reached regardless of logging failures. Co-Authored-By: Claude Sonnet 4.6 --- models/Providers/AbstractQueueProvider.cfc | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/models/Providers/AbstractQueueProvider.cfc b/models/Providers/AbstractQueueProvider.cfc index 5f30fa0..673a63e 100644 --- a/models/Providers/AbstractQueueProvider.cfc +++ b/models/Providers/AbstractQueueProvider.cfc @@ -245,14 +245,17 @@ component accessors="true" { variables.log.debug( "Released job ###job.getId()#" ); } } catch ( any releaseException ) { - log.error( - "releaseJob failed for job ###job.getId()#. Falling back to terminal failure to prevent timeout-based re-pickup.", - { - "job" : job.getMemento(), - "originalException" : isNull( e ) ? javacast( "null", "" ) : e, - "releaseException" : releaseException - } - ); + try { + log.error( + "releaseJob failed for job ###job.getId()#. Falling back to terminal failure to prevent timeout-based re-pickup.", + { + "job" : job.getMemento(), + "originalException" : isNull( e ) ? javacast( "null", "" ) : e, + "releaseException" : releaseException + } + ); + } catch ( any ignored ) { + } markJobFailed( job, pool, From 56a22024bf2e09c4fcf9b9bb0bfd321287c0e004 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Sun, 19 Apr 2026 08:54:44 -0600 Subject: [PATCH 29/34] test: assert markJobFailed called (not DB row) when releaseJob throws MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit WireBox provider methods (newQuery) are not reliably invocable through MockBox pass-through in async threads, so the DB write in markJobAsFailedById silently failed in CI regardless of the log-guarding fix. Switching to a mock-call assertion — the same technique used by the forceFailJob-fallback test — verifies the correct code path without depending on the broken DB write path. Co-Authored-By: Claude Sonnet 4.6 --- .../Providers/DBProviderMaxAttemptsSpec.cfc | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/tests/specs/integration/Providers/DBProviderMaxAttemptsSpec.cfc b/tests/specs/integration/Providers/DBProviderMaxAttemptsSpec.cfc index 4dce371..0a77fa0 100644 --- a/tests/specs/integration/Providers/DBProviderMaxAttemptsSpec.cfc +++ b/tests/specs/integration/Providers/DBProviderMaxAttemptsSpec.cfc @@ -138,10 +138,13 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { ); } ); - it( "still marks the row failed when releaseJob throws inside the exception handler", function() { + it( "calls markJobFailed when releaseJob throws inside the exception handler", function() { // Regression: previously, if releaseJob threw inside .onException, the // future swallowed the secondary exception and the row stayed reserved, - // causing unbounded timeout-based re-pickups. + // causing unbounded timeout-based re-pickups. We verify markJobFailed is + // invoked (the action that terminates the retry loop), not the DB write itself, + // because WireBox provider methods (newQuery) are not reliably callable through + // MockBox pass-through in async threads. var job = getWireBox() .getInstance( "AlwaysErrorJob" ) .setMaxAttempts( 5 ) @@ -153,9 +156,11 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { job.setId( jobId ); prepareMock( variables.provider ); + makePublic( variables.provider, "markJobFailed" ); variables.provider .$( "releaseJob" ) .$throws( type = "TestSimulatedFailure", message = "simulated releaseJob failure" ); + variables.provider.$( "markJobFailed" ); try { var jobFuture = variables.provider.marshalJob( job, variables.pool ); @@ -165,14 +170,8 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { } catch ( any e ) { } - var row = variables.provider - .newQuery() - .from( "cbq_jobs" ) - .where( "id", jobId ) - .first(); - expect( row.failedDate ?: "" ).notToBe( - "", - "the row must be marked failed even when releaseJob throws, otherwise the timeout watcher will retry it forever" + expect( variables.provider.$atLeast( 1, "markJobFailed" ) ).toBeTrue( + "markJobFailed must be called when releaseJob throws so the row exits the retry loop" ); } ); From e3ab7c9400e458793ed84d68f42d7c2f27898f80 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Sun, 19 Apr 2026 20:02:18 -0600 Subject: [PATCH 30/34] test: use real subclass fixture to test releaseJob-throws path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MockBox cannot reliably stub private methods reached through the async closure's captured variables scope, and its pass-through for WireBox provider methods (newQuery) is broken in async threads. Replace the prepareMock approach with a real FailingReleaseDBProvider subclass that overrides releaseJob to throw — no mocking needed, so newQuery stays a proper WireBox provider method and the DB write goes through correctly. Co-Authored-By: Claude Sonnet 4.6 --- .../Providers/FailingReleaseDBProvider.cfc | 17 +++++ .../Providers/DBProviderMaxAttemptsSpec.cfc | 66 +++++++++++++------ 2 files changed, 64 insertions(+), 19 deletions(-) create mode 100644 tests/resources/app/models/Providers/FailingReleaseDBProvider.cfc diff --git a/tests/resources/app/models/Providers/FailingReleaseDBProvider.cfc b/tests/resources/app/models/Providers/FailingReleaseDBProvider.cfc new file mode 100644 index 0000000..2e27f1a --- /dev/null +++ b/tests/resources/app/models/Providers/FailingReleaseDBProvider.cfc @@ -0,0 +1,17 @@ +/** + * Test fixture: a DBProvider whose releaseJob always throws. + * Used by DBProviderMaxAttemptsSpec to verify that the catch block + * in marshalJob's .onException handler correctly falls back to + * markJobFailed when releaseJob fails — without needing MockBox + * (which interferes with WireBox provider methods in async threads). + */ +component extends="cbq.models.Providers.DBProvider" { + + public void function releaseJob( required any job, required any pool ) { + throw( + type = "FailingReleaseDBProvider.SimulatedFailure", + message = "Simulated releaseJob failure for testing" + ); + } + +} diff --git a/tests/specs/integration/Providers/DBProviderMaxAttemptsSpec.cfc b/tests/specs/integration/Providers/DBProviderMaxAttemptsSpec.cfc index 0a77fa0..c2ba261 100644 --- a/tests/specs/integration/Providers/DBProviderMaxAttemptsSpec.cfc +++ b/tests/specs/integration/Providers/DBProviderMaxAttemptsSpec.cfc @@ -138,41 +138,69 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { ); } ); - it( "calls markJobFailed when releaseJob throws inside the exception handler", function() { + it( "still marks the row failed when releaseJob throws inside the exception handler", function() { // Regression: previously, if releaseJob threw inside .onException, the // future swallowed the secondary exception and the row stayed reserved, - // causing unbounded timeout-based re-pickups. We verify markJobFailed is - // invoked (the action that terminates the retry loop), not the DB write itself, - // because WireBox provider methods (newQuery) are not reliably callable through - // MockBox pass-through in async threads. + // causing unbounded timeout-based re-pickups. + // We use a real subclass (FailingReleaseDBProvider) instead of MockBox so that + // WireBox provider methods (newQuery) continue to work inside the async thread. + var failingProvider = getWireBox() + .getInstance( "FailingReleaseDBProvider" ) + .setProperties( {} ); + var failingPool = makeWorkerPool( failingProvider ); + + failingProvider + .newQuery() + .table( "cbq_jobs" ) + .delete(); + var job = getWireBox() .getInstance( "AlwaysErrorJob" ) .setMaxAttempts( 5 ) - .setCurrentAttempt( 0 ) - .setId( randRange( 1, 1000 ) ); + .setCurrentAttempt( 0 ); - variables.provider.push( "default", job ); - var jobId = reserveJobForPool(); - job.setId( jobId ); + failingProvider.push( "default", job ); - prepareMock( variables.provider ); - makePublic( variables.provider, "markJobFailed" ); - variables.provider - .$( "releaseJob" ) - .$throws( type = "TestSimulatedFailure", message = "simulated releaseJob failure" ); - variables.provider.$( "markJobFailed" ); + var now = javacast( "long", getTickCount() / 1000 ); + failingProvider + .newQuery() + .table( "cbq_jobs" ) + .update( { + "reservedBy" : failingPool.getUniqueId(), + "reservedDate" : now, + "availableDate": now + 60 + } ); + + var jobId = failingProvider + .newQuery() + .from( "cbq_jobs" ) + .value( "id" ); + + job.setId( jobId ); try { - var jobFuture = variables.provider.marshalJob( job, variables.pool ); + var jobFuture = failingProvider.marshalJob( job, failingPool ); if ( !isNull( jobFuture ) ) { jobFuture.get(); } } catch ( any e ) { } - expect( variables.provider.$atLeast( 1, "markJobFailed" ) ).toBeTrue( - "markJobFailed must be called when releaseJob throws so the row exits the retry loop" + var row = failingProvider + .newQuery() + .from( "cbq_jobs" ) + .where( "id", jobId ) + .first(); + + expect( row.failedDate ?: "" ).notToBe( + "", + "the row must be marked failed even when releaseJob throws, otherwise the timeout watcher will retry it forever" ); + + failingProvider + .newQuery() + .table( "cbq_jobs" ) + .delete(); } ); it( "falls back to forceFailJob when even afterJobFailed throws", function() { From c5bb58ade78f115556b698f2f4f5f0224131deed Mon Sep 17 00:00:00 2001 From: elpete <2583646+elpete@users.noreply.github.com> Date: Mon, 20 Apr 2026 02:03:08 +0000 Subject: [PATCH 31/34] Apply cfformat changes --- .../integration/Providers/DBProviderMaxAttemptsSpec.cfc | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/specs/integration/Providers/DBProviderMaxAttemptsSpec.cfc b/tests/specs/integration/Providers/DBProviderMaxAttemptsSpec.cfc index c2ba261..3729861 100644 --- a/tests/specs/integration/Providers/DBProviderMaxAttemptsSpec.cfc +++ b/tests/specs/integration/Providers/DBProviderMaxAttemptsSpec.cfc @@ -144,9 +144,7 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { // causing unbounded timeout-based re-pickups. // We use a real subclass (FailingReleaseDBProvider) instead of MockBox so that // WireBox provider methods (newQuery) continue to work inside the async thread. - var failingProvider = getWireBox() - .getInstance( "FailingReleaseDBProvider" ) - .setProperties( {} ); + var failingProvider = getWireBox().getInstance( "FailingReleaseDBProvider" ).setProperties( {} ); var failingPool = makeWorkerPool( failingProvider ); failingProvider @@ -166,9 +164,9 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { .newQuery() .table( "cbq_jobs" ) .update( { - "reservedBy" : failingPool.getUniqueId(), + "reservedBy" : failingPool.getUniqueId(), "reservedDate" : now, - "availableDate": now + 60 + "availableDate" : now + 60 } ); var jobId = failingProvider From 12829bb5a14546d6ad8d6a24f59adc6f0300f433 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Sun, 19 Apr 2026 20:26:58 -0600 Subject: [PATCH 32/34] chore: include test model fixtures in cfformat script Adds tests/resources/app/models/**/*.cfc to the format, format:check, and format:watch scripts so fixture files like FailingReleaseDBProvider are checked and formatted consistently with the rest of the codebase. Co-Authored-By: Claude Sonnet 4.6 --- box.json | 6 +++--- .../app/models/Jobs/AlwaysErrorJob.cfc | 5 +---- .../app/models/Jobs/BeforeAndAfterJob.cfc | 20 +++++++++---------- .../app/models/Jobs/OnFailureCapturingJob.cfc | 4 ++-- .../app/models/Jobs/ReleaseTestJob.cfc | 14 ++++++------- .../app/models/Jobs/SendWelcomeEmailJob.cfc | 10 +++++----- .../Providers/FailingReleaseDBProvider.cfc | 2 +- 7 files changed, 29 insertions(+), 32 deletions(-) diff --git a/box.json b/box.json index 67b69f5..2af081d 100644 --- a/box.json +++ b/box.json @@ -31,9 +31,9 @@ "cfmigrations":"modules/cfmigrations/" }, "scripts":{ - "format":"cfformat run ModuleConfig.cfc,models/**/*.cfc,interceptors/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc --overwrite", - "format:check":"cfformat check ModuleConfig.cfc,models/**/*.cfc,interceptors/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc --verbose", - "format:watch":"cfformat watch ModuleConfig.cfc,models/**/*.cfc,interceptors/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc", + "format":"cfformat run ModuleConfig.cfc,models/**/*.cfc,interceptors/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc,tests/resources/app/models/**/*.cfc --overwrite", + "format:check":"cfformat check ModuleConfig.cfc,models/**/*.cfc,interceptors/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc,tests/resources/app/models/**/*.cfc --verbose", + "format:watch":"cfformat watch ModuleConfig.cfc,models/**/*.cfc,interceptors/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc,tests/resources/app/models/**/*.cfc", "install:2021":"cfpm install document,feed,zip" }, "ignore":[ diff --git a/tests/resources/app/models/Jobs/AlwaysErrorJob.cfc b/tests/resources/app/models/Jobs/AlwaysErrorJob.cfc index 4bfcaf5..7018cf2 100644 --- a/tests/resources/app/models/Jobs/AlwaysErrorJob.cfc +++ b/tests/resources/app/models/Jobs/AlwaysErrorJob.cfc @@ -1,10 +1,7 @@ component extends="cbq.models.Jobs.AbstractJob" { function handle() { - throw( - type = "cbq.tests.AlwaysErrorJob", - message = "This job always errors for testing." - ); + throw( type = "cbq.tests.AlwaysErrorJob", message = "This job always errors for testing." ); } } diff --git a/tests/resources/app/models/Jobs/BeforeAndAfterJob.cfc b/tests/resources/app/models/Jobs/BeforeAndAfterJob.cfc index 9b9202e..175a247 100644 --- a/tests/resources/app/models/Jobs/BeforeAndAfterJob.cfc +++ b/tests/resources/app/models/Jobs/BeforeAndAfterJob.cfc @@ -1,15 +1,15 @@ component extends="cbq.models.Jobs.AbstractJob" { - function handle() { - // do nothing - } + function handle() { + // do nothing + } - function before() { - application.jobBeforeCalled = true; - } + function before() { + application.jobBeforeCalled = true; + } - function after() { - application.jobAfterCalled = true; - } + function after() { + application.jobAfterCalled = true; + } -} \ No newline at end of file +} diff --git a/tests/resources/app/models/Jobs/OnFailureCapturingJob.cfc b/tests/resources/app/models/Jobs/OnFailureCapturingJob.cfc index 8f25325..d4ddb76 100644 --- a/tests/resources/app/models/Jobs/OnFailureCapturingJob.cfc +++ b/tests/resources/app/models/Jobs/OnFailureCapturingJob.cfc @@ -2,13 +2,13 @@ component extends="cbq.models.Jobs.AbstractJob" { function handle() { throw( - type = "cbq.tests.OnFailureCapturingJob", + type = "cbq.tests.OnFailureCapturingJob", message = "This job always errors to test the onFailure exception argument." ); } function onFailure() { - application.onFailureExceptionReceived = !isNull( arguments.exception ); + application.onFailureExceptionReceived = !isNull( arguments.exception ); application.onFailureExceptionIsExpcetion = structKeyExists( arguments, "excpetion" ); } diff --git a/tests/resources/app/models/Jobs/ReleaseTestJob.cfc b/tests/resources/app/models/Jobs/ReleaseTestJob.cfc index 5f3b7e9..56dd6f4 100644 --- a/tests/resources/app/models/Jobs/ReleaseTestJob.cfc +++ b/tests/resources/app/models/Jobs/ReleaseTestJob.cfc @@ -1,11 +1,11 @@ component extends="cbq.models.Jobs.AbstractJob" { - function handle() { - this.release( 2 ); - } + function handle() { + this.release( 2 ); + } - function onFailure() { - this.onFailureCalled = true; - } + function onFailure() { + this.onFailureCalled = true; + } -} \ No newline at end of file +} diff --git a/tests/resources/app/models/Jobs/SendWelcomeEmailJob.cfc b/tests/resources/app/models/Jobs/SendWelcomeEmailJob.cfc index e454c33..119368e 100644 --- a/tests/resources/app/models/Jobs/SendWelcomeEmailJob.cfc +++ b/tests/resources/app/models/Jobs/SendWelcomeEmailJob.cfc @@ -1,9 +1,9 @@ component extends="cbq.models.Jobs.AbstractJob" { - property name="log" inject="logbox:logger:{this}"; + property name="log" inject="logbox:logger:{this}"; - function handle() { - log.info( "sending email" ); - } + function handle() { + log.info( "sending email" ); + } -} \ No newline at end of file +} diff --git a/tests/resources/app/models/Providers/FailingReleaseDBProvider.cfc b/tests/resources/app/models/Providers/FailingReleaseDBProvider.cfc index 2e27f1a..b91d78a 100644 --- a/tests/resources/app/models/Providers/FailingReleaseDBProvider.cfc +++ b/tests/resources/app/models/Providers/FailingReleaseDBProvider.cfc @@ -9,7 +9,7 @@ component extends="cbq.models.Providers.DBProvider" { public void function releaseJob( required any job, required any pool ) { throw( - type = "FailingReleaseDBProvider.SimulatedFailure", + type = "FailingReleaseDBProvider.SimulatedFailure", message = "Simulated releaseJob failure for testing" ); } From 6885930b73d7d641f8ecc972e24e2cb68b40d9d4 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Sun, 19 Apr 2026 21:41:08 -0600 Subject: [PATCH 33/34] fix: address Copilot review feedback on PR #26 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Wrap afterJobHook in try/catch so a throwing hook logs via logSideEffectFailure and does not flip a successful job to failed - Rename onFailureExceptionIsExpcetion → onFailureExceptionHasExcpetionKey in OnFailureCapturingJob and SyncProviderOnFailureSpec to separate the intentional 'excpetion' typo-under-test from the variable naming Co-Authored-By: Claude Sonnet 4.6 --- models/Providers/AbstractQueueProvider.cfc | 10 +++++++++- .../app/models/Jobs/OnFailureCapturingJob.cfc | 2 +- .../Providers/SyncProviderOnFailureSpec.cfc | 6 +++--- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/models/Providers/AbstractQueueProvider.cfc b/models/Providers/AbstractQueueProvider.cfc index 673a63e..5de4dd5 100644 --- a/models/Providers/AbstractQueueProvider.cfc +++ b/models/Providers/AbstractQueueProvider.cfc @@ -166,7 +166,15 @@ component accessors="true" { dispatchNextJobInChain( job, pool ); if ( !isNull( afterJobHook ) && ( isCustomFunction( afterJobHook ) || isClosure( afterJobHook ) ) ) { - afterJobHook( job, pool ); + try { + afterJobHook( job, pool ); + } catch ( any sideEffectException ) { + logSideEffectFailure( + "afterJobHook", + job, + sideEffectException + ); + } } } ) .onException( function( e ) { diff --git a/tests/resources/app/models/Jobs/OnFailureCapturingJob.cfc b/tests/resources/app/models/Jobs/OnFailureCapturingJob.cfc index d4ddb76..4741692 100644 --- a/tests/resources/app/models/Jobs/OnFailureCapturingJob.cfc +++ b/tests/resources/app/models/Jobs/OnFailureCapturingJob.cfc @@ -9,7 +9,7 @@ component extends="cbq.models.Jobs.AbstractJob" { function onFailure() { application.onFailureExceptionReceived = !isNull( arguments.exception ); - application.onFailureExceptionIsExpcetion = structKeyExists( arguments, "excpetion" ); + application.onFailureExceptionHasExcpetionKey = structKeyExists( arguments, "excpetion" ); } } diff --git a/tests/specs/integration/Providers/SyncProviderOnFailureSpec.cfc b/tests/specs/integration/Providers/SyncProviderOnFailureSpec.cfc index 62dc748..15609fc 100644 --- a/tests/specs/integration/Providers/SyncProviderOnFailureSpec.cfc +++ b/tests/specs/integration/Providers/SyncProviderOnFailureSpec.cfc @@ -4,7 +4,7 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { describe( "SyncProvider onFailure exception argument", function() { beforeEach( function() { structDelete( application, "onFailureExceptionReceived" ); - structDelete( application, "onFailureExceptionIsExpcetion" ); + structDelete( application, "onFailureExceptionHasExcpetionKey" ); } ); it( "passes the exception as 'exception' (not 'excpetion') to the onFailure handler", function() { @@ -16,14 +16,14 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { .setMaxAttempts( 1 ); param application.onFailureExceptionReceived = false; - param application.onFailureExceptionIsExpcetion = true; + param application.onFailureExceptionHasExcpetionKey = true; expect( () => provider.marshalJob( job, pool ) ).toThrow(); expect( application.onFailureExceptionReceived ).toBeTrue( "onFailure should receive the exception under the key 'exception'" ); - expect( application.onFailureExceptionIsExpcetion ).toBeFalse( + expect( application.onFailureExceptionHasExcpetionKey ).toBeFalse( "onFailure should NOT receive the exception under the misspelled key 'excpetion'" ); } ); From 78a71fea15ee9c3975c7d3d9d9c3714b6aecfe62 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Mon, 20 Apr 2026 13:03:40 -0600 Subject: [PATCH 34/34] fix: use CF_SQL_LONGVARCHAR for LONGTEXT columns in LogFailedJobsInterceptor memento, properties, exceptionExtendedInfo, exceptionStackTrace, and exception all map to LONGTEXT in the cbq_failed_jobs schema. Binding them as CF_SQL_VARCHAR risks truncation or driver errors on large payloads (e.g. deep stack traces or complex job mementos). Co-Authored-By: Claude Sonnet 4.6 --- interceptors/LogFailedJobsInterceptor.cfc | 25 ++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/interceptors/LogFailedJobsInterceptor.cfc b/interceptors/LogFailedJobsInterceptor.cfc index fa6386f..02470ef 100644 --- a/interceptors/LogFailedJobsInterceptor.cfc +++ b/interceptors/LogFailedJobsInterceptor.cfc @@ -36,8 +36,14 @@ component { "connection" : connectionName, "queue" : queueName, "mapping" : arguments.data.job.getMapping(), - "memento" : serializeJSON( arguments.data.job.getMemento() ), - "properties" : serializeJSON( arguments.data.job.getProperties() ), + "memento" : { + "value" : serializeJSON( arguments.data.job.getMemento() ), + "cfsqltype" : "CF_SQL_LONGVARCHAR" + }, + "properties" : { + "value" : serializeJSON( arguments.data.job.getProperties() ), + "cfsqltype" : "CF_SQL_LONGVARCHAR" + }, "exceptionType" : { "value" : arguments.data.exception.type ?: "", "cfsqltype" : "CF_SQL_VARCHAR", @@ -58,7 +64,7 @@ component { }, "exceptionExtendedInfo" : { "value" : arguments.data.exception.extendedInfo ?: "", - "cfsqltype" : "CF_SQL_VARCHAR", + "cfsqltype" : "CF_SQL_LONGVARCHAR", "null" : ( arguments.data.exception.extendedInfo ?: "" ) == "", "nulls" : ( arguments.data.exception.extendedInfo ?: "" ) == "" }, @@ -68,7 +74,7 @@ component { arguments.data.exception.stackTrace ) ), - "cfsqltype" : "CF_SQL_VARCHAR", + "cfsqltype" : "CF_SQL_LONGVARCHAR", "null" : isNull( arguments.data.exception.stackTrace ) || ( isSimpleValue( arguments.data.exception.stackTrace ) && arguments.data.exception.stackTrace == "" ), @@ -76,9 +82,14 @@ component { isSimpleValue( arguments.data.exception.stackTrace ) && arguments.data.exception.stackTrace == "" ) }, - "exception" : isNull( arguments.data.exception ) ? javacast( "null", "" ) : serializeJSON( - arguments.data.exception - ), + "exception" : { + "value" : isNull( arguments.data.exception ) ? javacast( "null", "" ) : serializeJSON( + arguments.data.exception + ), + "cfsqltype" : "CF_SQL_LONGVARCHAR", + "null" : isNull( arguments.data.exception ), + "nulls" : isNull( arguments.data.exception ) + }, "failedDate" : { "value" : getCurrentUnixTimestamp(), "cfsqltype" : "CF_SQL_BIGINT"