diff --git a/.github/workflows/cron.yml b/.github/workflows/cron.yml index 04d03e7..da360e6 100644 --- a/.github/workflows/cron.yml +++ b/.github/workflows/cron.yml @@ -25,9 +25,9 @@ jobs: experimental: true services: mysql: - image: mysql:5.7 + image: mysql:8.0 env: - MYSQL_RANDOM_ROOT_PASSWORD: yes + MYSQL_ROOT_PASSWORD: root MYSQL_USER: cbq MYSQL_PASSWORD: cbq MYSQL_DATABASE: cbq @@ -56,6 +56,10 @@ jobs: box install ${{ matrix.coldbox }} --noSave box package list + - name: Configure MySQL user auth plugin + run: | + mysql -h 127.0.0.1 -P ${{ job.services.mysql.ports[3306] }} -uroot -proot -e "ALTER USER 'cbq'@'%' IDENTIFIED WITH mysql_native_password BY 'cbq'; FLUSH PRIVILEGES;" + - name: Start server env: DB_HOST: localhost @@ -78,4 +82,4 @@ jobs: DB_PASSWORD: cbq DB_DATABASE: cbq DB_SCHEMA: cbq - run: box testbox run \ No newline at end of file + run: box testbox run diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index b66507b..af546f5 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -22,9 +22,9 @@ jobs: cfengine: ["lucee@5", "lucee@6", "adobe@2021", "adobe@2023", "adobe@2025"] services: mysql: - image: mysql:5.7 + image: mysql:8.0 env: - MYSQL_RANDOM_ROOT_PASSWORD: yes + MYSQL_ROOT_PASSWORD: root MYSQL_USER: cbq MYSQL_PASSWORD: cbq MYSQL_DATABASE: cbq @@ -52,6 +52,10 @@ jobs: box config set modules.commandbox-dotenv.checkEnvPreServerStart=false box package list + - name: Configure MySQL user auth plugin + run: | + mysql -h 127.0.0.1 -P ${{ job.services.mysql.ports[3306] }} -uroot -proot -e "ALTER USER 'cbq'@'%' IDENTIFIED WITH mysql_native_password BY 'cbq'; FLUSH PRIVILEGES;" + - name: Start server env: DB_HOST: localhost @@ -101,4 +105,4 @@ jobs: - name: Commit Format Changes uses: stefanzweifel/git-auto-commit-action@v5.2.0 with: - commit_message: Apply cfformat changes \ No newline at end of file + commit_message: Apply cfformat changes diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 15b250f..c2cc1e6 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -17,9 +17,9 @@ jobs: cfengine: ["lucee@5", "lucee@6", "adobe@2021", "adobe@2023", "adobe@2025"] services: mysql: - image: mysql:5.7 + image: mysql:8.0 env: - MYSQL_RANDOM_ROOT_PASSWORD: yes + MYSQL_ROOT_PASSWORD: root MYSQL_USER: cbq MYSQL_PASSWORD: cbq MYSQL_DATABASE: cbq @@ -47,6 +47,10 @@ jobs: box config set modules.commandbox-dotenv.checkEnvPreServerStart=false box package list + - name: Configure MySQL user auth plugin + run: | + mysql -h 127.0.0.1 -P ${{ job.services.mysql.ports[3306] }} -uroot -proot -e "ALTER USER 'cbq'@'%' IDENTIFIED WITH mysql_native_password BY 'cbq'; FLUSH PRIVILEGES;" + - name: Start server env: DB_HOST: localhost diff --git a/box.json b/box.json index 6aeff2f..2af081d 100644 --- a/box.json +++ b/box.json @@ -1,6 +1,6 @@ { "name":"cbq", - "version":"5.0.7", + "version":"6.0.0-beta.3", "author":"Eric Peterson ", "location":"forgeboxStorage", "homepage":"https://github.com/coldbox-modules/cbq", @@ -31,9 +31,9 @@ "cfmigrations":"modules/cfmigrations/" }, "scripts":{ - "format":"cfformat run ModuleConfig.cfc,models/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc --overwrite", - "format:check":"cfformat check ModuleConfig.cfc,models/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc --verbose", - "format:watch":"cfformat watch ModuleConfig.cfc,models/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc", + "format":"cfformat run ModuleConfig.cfc,models/**/*.cfc,interceptors/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc,tests/resources/app/models/**/*.cfc --overwrite", + "format:check":"cfformat check ModuleConfig.cfc,models/**/*.cfc,interceptors/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc,tests/resources/app/models/**/*.cfc --verbose", + "format:watch":"cfformat watch ModuleConfig.cfc,models/**/*.cfc,interceptors/**/*.cfc,tests/specs/**/*.cfc,tests/resources/app/handlers/**/*.cfc,tests/resources/app/config/**/*.cfc,tests/resources/app/models/**/*.cfc", "install:2021":"cfpm install document,feed,zip" }, "ignore":[ diff --git a/interceptors/LogFailedJobsInterceptor.cfc b/interceptors/LogFailedJobsInterceptor.cfc index 3cbcb9c..02470ef 100644 --- a/interceptors/LogFailedJobsInterceptor.cfc +++ b/interceptors/LogFailedJobsInterceptor.cfc @@ -36,15 +36,26 @@ component { "connection" : connectionName, "queue" : queueName, "mapping" : arguments.data.job.getMapping(), - "memento" : serializeJSON( arguments.data.job.getMemento() ), - "properties" : serializeJSON( arguments.data.job.getProperties() ), + "memento" : { + "value" : serializeJSON( arguments.data.job.getMemento() ), + "cfsqltype" : "CF_SQL_LONGVARCHAR" + }, + "properties" : { + "value" : serializeJSON( arguments.data.job.getProperties() ), + "cfsqltype" : "CF_SQL_LONGVARCHAR" + }, "exceptionType" : { "value" : arguments.data.exception.type ?: "", "cfsqltype" : "CF_SQL_VARCHAR", "null" : ( arguments.data.exception.type ?: "" ) == "", "nulls" : ( arguments.data.exception.type ?: "" ) == "" }, - "exceptionMessage" : arguments.data.exception.message, + "exceptionMessage" : { + "value" : arguments.data.exception.message ?: "", + "cfsqltype" : "CF_SQL_VARCHAR", + "null" : ( arguments.data.exception.message ?: "" ) == "", + "nulls" : ( arguments.data.exception.message ?: "" ) == "" + }, "exceptionDetail" : { "value" : arguments.data.exception.detail ?: "", "cfsqltype" : "CF_SQL_VARCHAR", @@ -53,27 +64,47 @@ component { }, "exceptionExtendedInfo" : { "value" : arguments.data.exception.extendedInfo ?: "", - "cfsqltype" : "CF_SQL_VARCHAR", + "cfsqltype" : "CF_SQL_LONGVARCHAR", "null" : ( arguments.data.exception.extendedInfo ?: "" ) == "", "nulls" : ( arguments.data.exception.extendedInfo ?: "" ) == "" }, - "exceptionStackTrace" : arguments.data.exception.stackTrace, - "exception" : serializeJSON( arguments.data.exception ), - "failedDate" : { "value": getCurrentUnixTimestamp(), "cfsqltype": "CF_SQL_BIGINT" }, - "originalId" : { "value": arguments.data.job.getId(), "cfsqltype": "CF_SQL_VARCHAR" } + "exceptionStackTrace" : { + "value" : isNull( arguments.data.exception.stackTrace ) ? "" : ( + isSimpleValue( arguments.data.exception.stackTrace ) ? arguments.data.exception.stackTrace : serializeJSON( + arguments.data.exception.stackTrace + ) + ), + "cfsqltype" : "CF_SQL_LONGVARCHAR", + "null" : isNull( arguments.data.exception.stackTrace ) || ( + isSimpleValue( arguments.data.exception.stackTrace ) && arguments.data.exception.stackTrace == "" + ), + "nulls" : isNull( arguments.data.exception.stackTrace ) || ( + isSimpleValue( arguments.data.exception.stackTrace ) && arguments.data.exception.stackTrace == "" + ) + }, + "exception" : { + "value" : isNull( arguments.data.exception ) ? javacast( "null", "" ) : serializeJSON( + arguments.data.exception + ), + "cfsqltype" : "CF_SQL_LONGVARCHAR", + "null" : isNull( arguments.data.exception ), + "nulls" : isNull( arguments.data.exception ) + }, + "failedDate" : { + "value" : getCurrentUnixTimestamp(), + "cfsqltype" : "CF_SQL_BIGINT" + }, + "originalId" : { + "value" : arguments.data.job.getId(), + "cfsqltype" : "CF_SQL_VARCHAR" + } }; try { qb.table( variables.settings.logFailedJobsProperties.tableName ) - .insert( - values = logData, - options = options - ); + .insert( values = logData, options = options ); } catch ( any e ) { - log.error( "Failed to log failed job: #e.message#", { - "log": logData, - "exception": e - } ); + log.error( "Failed to log failed job: #e.message#", { "log" : logData, "exception" : e } ); rethrow; } } diff --git a/models/Jobs/Batch.cfc b/models/Jobs/Batch.cfc index e12d82f..52db039 100644 --- a/models/Jobs/Batch.cfc +++ b/models/Jobs/Batch.cfc @@ -3,6 +3,7 @@ component accessors="true" { property name="dispatcher" inject="provider:Dispatcher@cbq"; property name="cbq" inject="provider:@cbq"; property name="wirebox" inject="wirebox"; + property name="log" inject="logbox:logger:{this}"; property name="repository"; property name="id" type="string"; @@ -10,6 +11,7 @@ component accessors="true" { property name="totalJobs" type="numeric"; property name="pendingJobs" type="numeric"; property name="failedJobs" type="numeric"; + property name="successfulJobs" type="numeric"; property name="failedJobIds" type="array"; property name="options" type="struct"; property name="createdDate" type="numeric"; @@ -49,7 +51,12 @@ component accessors="true" { } getRepository().markAsFinished( variables.id ); - dispatchThenJobIfNeeded(); + + try { + dispatchThenJobIfNeeded(); + } catch ( any e ) { + log.error( "Failed to dispatch then job for batch [#variables.id#]: #e.message#", e ); + } if ( counts.allJobsHaveRanExactlyOnce ) { dispatchFinallyJobIfNeeded(); @@ -64,7 +71,11 @@ component accessors="true" { cancel(); } - dispatchCatchJobIfNeeded( arguments.error ); + try { + dispatchCatchJobIfNeeded( arguments.error ); + } catch ( any e ) { + log.error( "Failed to dispatch catch job for batch [#variables.id#]: #e.message#", e ); + } } if ( counts.allJobsHaveRanExactlyOnce ) { diff --git a/models/Jobs/DBBatchRepository.cfc b/models/Jobs/DBBatchRepository.cfc index 98f5282..b7ffee1 100644 --- a/models/Jobs/DBBatchRepository.cfc +++ b/models/Jobs/DBBatchRepository.cfc @@ -52,14 +52,19 @@ component singleton accessors="true" { public Batch function store( required PendingBatch batch ) { var id = variables.timeBasedUUIDGenerator.generate().toString(); + var batchName = arguments.batch.getName(); + if ( isNull( batchName ) || !isSimpleValue( batchName ) ) { + batchName = ""; + } qb.table( variables.batchTableName ) .insert( values = { "id" : id, - "name" : arguments.batch.getName(), + "name" : batchName, "totalJobs" : 0, "pendingJobs" : 0, + "successfulJobs" : 0, "failedJobs" : 0, "failedJobIds" : "[]", "options" : serializeJSON( arguments.batch.getOptions() ), @@ -102,23 +107,20 @@ component singleton accessors="true" { throw( type = "cbq.BatchNotFound", message = "No batch found for id [#arguments.batchId#]" ); } + var updatedValues = { + "pendingJobs" : data.pendingJobs - 1, + "successfulJobs" : data.successfulJobs + 1, + "failedJobs" : data.failedJobs + }; + qb.table( variables.batchTableName ) .where( "id", arguments.batchId ) - .update( - values = { - "pendingJobs" : data.pendingJobs - 1, - "failedJobs" : data.failedJobs, - "failedJobIds" : serializeJSON( - deserializeJSON( data.failedJobIds ).filter( ( failedJobId ) => failedJobId != jobId ) - ) - }, - options = variables.defaultQueryOptions - ); + .update( values = updatedValues, options = variables.defaultQueryOptions ); return { "pendingJobs" : data.pendingJobs - 1, "failedJobs" : data.failedJobs, - "allJobsHaveRanExactlyOnce" : ( data.pendingJobs - 1 ) - data.failedJobs == 0 + "allJobsHaveRanExactlyOnce" : ( data.pendingJobs - 1 ) == 0 }; } } @@ -135,21 +137,20 @@ component singleton accessors="true" { throw( type = "cbq.BatchNotFound", message = "No batch found for id [#arguments.batchId#]" ); } + var updatedValues = { + "pendingJobs" : data.pendingJobs - 1, + "failedJobs" : data.failedJobs + 1, + "failedJobIds" : serializeJSON( deserializeJSON( data.failedJobIds ).append( arguments.jobId ) ) + }; + qb.table( variables.batchTableName ) .where( "id", arguments.batchId ) - .update( - values = { - "pendingJobs" : data.pendingJobs, - "failedJobs" : data.failedJobs + 1, - "failedJobIds" : serializeJSON( deserializeJSON( data.failedJobIds ).append( arguments.jobId ) ) - }, - options = variables.defaultQueryOptions - ); + .update( values = updatedValues, options = variables.defaultQueryOptions ); return { - "pendingJobs" : data.pendingJobs, + "pendingJobs" : data.pendingJobs - 1, "failedJobs" : data.failedJobs + 1, - "allJobsHaveRanExactlyOnce" : data.pendingJobs - ( data.failedJobs + 1 ) == 0 + "allJobsHaveRanExactlyOnce" : ( data.pendingJobs - 1 ) == 0 }; } } @@ -191,6 +192,7 @@ component singleton accessors="true" { batch.setTotalJobs( data.totalJobs ); batch.setPendingJobs( data.pendingJobs ); batch.setFailedJobs( data.failedJobs ); + batch.setSuccessfulJobs( data.successfulJobs ); batch.setFailedJobIds( deserializeJSON( data.failedJobIds ) ); batch.setOptions( deserializeJSON( data.options ) ); batch.setCreatedDate( data.createdDate ); diff --git a/models/Providers/AbstractQueueProvider.cfc b/models/Providers/AbstractQueueProvider.cfc index 46cd7ab..5de4dd5 100644 --- a/models/Providers/AbstractQueueProvider.cfc +++ b/models/Providers/AbstractQueueProvider.cfc @@ -166,120 +166,145 @@ component accessors="true" { dispatchNextJobInChain( job, pool ); if ( !isNull( afterJobHook ) && ( isCustomFunction( afterJobHook ) || isClosure( afterJobHook ) ) ) { - afterJobHook( job, pool ); + try { + afterJobHook( job, pool ); + } catch ( any sideEffectException ) { + logSideEffectFailure( + "afterJobHook", + job, + sideEffectException + ); + } } } ) .onException( function( e ) { - // log failed job - if ( !isNull( e ) && "java.util.concurrent.CompletionException" == e.getClass().getName() ) { - var rootCause = e.getCause(); - if ( !isNull( rootCause ) ) { - e = rootCause; - } - } - - if ( log.canError() ) { - log.error( - "Exception when running job: #e.message#", - { - "job" : job.getMemento(), - "exception" : isNull( e ) ? javacast( "null", "" ) : e + try { + // log failed job + if ( !isNull( e ) && "java.util.concurrent.CompletionException" == e.getClass().getName() ) { + var rootCause = e.getCause(); + if ( !isNull( rootCause ) ) { + e = rootCause; } - ); - } - - afterJobException( - job.getId(), - job, - pool, - isNull( e ) ? javacast( "null", "" ) : e - ); - - variables.interceptorService.announce( - "onCBQJobException", - { - "job" : job, - "pool" : pool, - "exception" : isNull( e ) ? javacast( "null", "" ) : e } - ); - var jobMaxAttempts = getMaxAttemptsForJob( job, pool ); - if ( job.getIsCancelled() ) { - if ( variables.log.canDebug() ) { - variables.log.debug( "Job [#job.getId()#] was cancelled. Reason: #job.getCancelledReason()#. Deleting job [#job.getId()#]." ); + if ( log.canError() ) { + log.error( + "Exception when running job: #e.message#", + { + "job" : job.getMemento(), + "exception" : isNull( e ) ? javacast( "null", "" ) : e + } + ); } - if ( structKeyExists( job, "onFailure" ) ) { - invoke( + try { + afterJobException( + job.getId(), job, - "onFailure", - { "exception" : isNull( e ) ? javacast( "null", "" ) : e } + pool, + isNull( e ) ? javacast( "null", "" ) : e + ); + } catch ( any sideEffectException ) { + logSideEffectFailure( + "afterJobException", + job, + sideEffectException ); } - variables.interceptorService.announce( - "onCBQJobFailed", - { - "job" : job, - "exception" : isNull( e ) ? javacast( "null", "" ) : e - } - ); - - afterJobFailed( - job.getId(), - job, - pool, - isNull( e ) ? javacast( "null", "" ) : e - ); - ensureFailedBatchJobIsRecorded( job, isNull( e ) ? javacast( "null", "" ) : e ); - - variables.log.debug( "Marked job ###job.getId()# as failed after being cancelled." ); - } else if ( jobMaxAttempts == 0 || job.getCurrentAttempt() < jobMaxAttempts ) { - if ( jobMaxAttempts == 0 && variables.log.canDebug() ) { - variables.log.debug( "Job ###job.getId()# has a maxAttempts of 0 and will always be released." ); - } - if ( variables.log.canDebug() ) { - variables.log.debug( "Releasing job ###job.getId()#" ); - } - releaseJob( job, pool ); - if ( variables.log.canDebug() ) { - variables.log.debug( "Released job ###job.getId()#" ); - } - } else { - if ( variables.log.canDebug() ) { - variables.log.debug( "Maximum attempts reached. Deleting job ###job.getId()#" ); - } - - if ( structKeyExists( job, "onFailure" ) ) { - invoke( + try { + variables.interceptorService.announce( + "onCBQJobException", + { + "job" : job, + "pool" : pool, + "exception" : isNull( e ) ? javacast( "null", "" ) : e + } + ); + } catch ( any sideEffectException ) { + logSideEffectFailure( + "onCBQJobException", job, - "onFailure", - { "exception" : isNull( e ) ? javacast( "null", "" ) : e } + sideEffectException ); } - variables.interceptorService.announce( - "onCBQJobFailed", - { - "job" : job, - "exception" : isNull( e ) ? javacast( "null", "" ) : e + var jobMaxAttempts = getMaxAttemptsForJob( job, pool ); + if ( job.getIsCancelled() ) { + if ( variables.log.canDebug() ) { + variables.log.debug( "Job [#job.getId()#] was cancelled. Reason: #job.getCancelledReason()#. Deleting job [#job.getId()#]." ); } - ); - - afterJobFailed( - job.getId(), - job, - pool, - isNull( e ) ? javacast( "null", "" ) : e - ); - ensureFailedBatchJobIsRecorded( job, isNull( e ) ? javacast( "null", "" ) : e ); - - variables.log.debug( "Marked job ###job.getId()# as failed after maximum failed attempts." ); + markJobFailed( + job, + pool, + isNull( e ) ? javacast( "null", "" ) : e + ); + variables.log.debug( "Marked job ###job.getId()# as failed after being cancelled." ); + } else if ( jobMaxAttempts == 0 || job.getCurrentAttempt() < jobMaxAttempts ) { + if ( jobMaxAttempts == 0 && variables.log.canDebug() ) { + variables.log.debug( "Job ###job.getId()# has a maxAttempts of 0 and will always be released." ); + } + if ( variables.log.canDebug() ) { + variables.log.debug( "Releasing job ###job.getId()#" ); + } + try { + releaseJob( job, pool ); + if ( variables.log.canDebug() ) { + variables.log.debug( "Released job ###job.getId()#" ); + } + } catch ( any releaseException ) { + try { + log.error( + "releaseJob failed for job ###job.getId()#. Falling back to terminal failure to prevent timeout-based re-pickup.", + { + "job" : job.getMemento(), + "originalException" : isNull( e ) ? javacast( "null", "" ) : e, + "releaseException" : releaseException + } + ); + } catch ( any ignored ) { + } + markJobFailed( + job, + pool, + isNull( e ) ? javacast( "null", "" ) : e + ); + } + } else { + if ( variables.log.canDebug() ) { + variables.log.debug( "Maximum attempts reached. Deleting job ###job.getId()#" ); + } + markJobFailed( + job, + pool, + isNull( e ) ? javacast( "null", "" ) : e + ); + variables.log.debug( "Marked job ###job.getId()# as failed after maximum failed attempts." ); + } + } catch ( any outerException ) { + try { + log.error( + "Critical: .onException handler failed for job ###job.getId()#. Forcing terminal failure.", + { + "job" : job.getMemento(), + "originalException" : isNull( e ) ? javacast( "null", "" ) : e, + "handlerException" : outerException + } + ); + } catch ( any ignored ) { + } + try { + forceFailJob( job.getId(), pool ); + } catch ( any ignored ) { + } } if ( !isNull( afterJobHook ) && ( isCustomFunction( afterJobHook ) || isClosure( afterJobHook ) ) ) { - afterJobHook( job, pool ); + try { + afterJobHook( job, pool ); + } catch ( any hookException ) { + logSideEffectFailure( "afterJobHook", job, hookException ); + } } } ); } @@ -310,6 +335,104 @@ component accessors="true" { return; } + /** + * Last-resort terminal failure for a job. Providers should override this with + * a minimal, bulletproof write that takes the job out of any retry loop, even + * when the normal afterJobFailed path has already failed. Skips bookkeeping + * (onFailure hook, interceptors, batch recording) by design — it only runs + * when those paths themselves are broken. + */ + public void function forceFailJob( required any id, WorkerPool pool ) { + return; + } + + private void function markJobFailed( + required AbstractJob job, + required WorkerPool pool, + any exception + ) { + if ( structKeyExists( arguments.job, "onFailure" ) ) { + try { + invoke( + arguments.job, + "onFailure", + { "exception" : isNull( arguments.exception ) ? javacast( "null", "" ) : arguments.exception } + ); + } catch ( any sideEffectException ) { + logSideEffectFailure( + "onFailure", + arguments.job, + sideEffectException + ); + } + } + + try { + variables.interceptorService.announce( + "onCBQJobFailed", + { + "job" : arguments.job, + "exception" : isNull( arguments.exception ) ? javacast( "null", "" ) : arguments.exception + } + ); + } catch ( any sideEffectException ) { + logSideEffectFailure( + "onCBQJobFailed", + arguments.job, + sideEffectException + ); + } + + try { + afterJobFailed( + arguments.job.getId(), + arguments.job, + arguments.pool, + isNull( arguments.exception ) ? javacast( "null", "" ) : arguments.exception + ); + } catch ( any sideEffectException ) { + logSideEffectFailure( + "afterJobFailed", + arguments.job, + sideEffectException + ); + // afterJobFailed is what writes failedDate in DBProvider; if it threw, fall back. + try { + forceFailJob( arguments.job.getId(), arguments.pool ); + } catch ( any ignored ) { + } + } + + try { + ensureFailedBatchJobIsRecorded( + arguments.job, + isNull( arguments.exception ) ? javacast( "null", "" ) : arguments.exception + ); + } catch ( any sideEffectException ) { + logSideEffectFailure( + "ensureFailedBatchJobIsRecorded", + arguments.job, + sideEffectException + ); + } + } + + private void function logSideEffectFailure( + required string stage, + required AbstractJob job, + required any exception + ) { + try { + if ( log.canError() ) { + log.error( + "Side effect [#arguments.stage#] threw for job ###arguments.job.getId()#: #arguments.exception.message#", + { "exception" : arguments.exception } + ); + } + } catch ( any ignored ) { + } + } + public void function releaseJob( required AbstractJob job, required WorkerPool pool ) { arguments.job.setCurrentAttempt( arguments.job.getCurrentAttempt() + 1 ); push( diff --git a/models/Providers/ColdBoxAsyncProvider.cfc b/models/Providers/ColdBoxAsyncProvider.cfc index 2e82802..f8ea080 100644 --- a/models/Providers/ColdBoxAsyncProvider.cfc +++ b/models/Providers/ColdBoxAsyncProvider.cfc @@ -23,9 +23,9 @@ component accessors="true" extends="AbstractQueueProvider" { sleep( delay * 1000 ); return true; }, workerPool.getExecutor() ) - .then( function() { + .thenCompose( function() { job.setId( createUUID() ); - if ( !isNull( arguments.currentAttempt ) ) { + if ( attempts > 0 ) { job.setCurrentAttempt( attempts ); } return marshalJob( job, workerPool ); diff --git a/models/Providers/DBProvider.cfc b/models/Providers/DBProvider.cfc index 0b47a03..654f111 100644 --- a/models/Providers/DBProvider.cfc +++ b/models/Providers/DBProvider.cfc @@ -58,18 +58,7 @@ component accessors="true" extends="AbstractQueueProvider" { var lockedRecords = fetchLockedRecords( capacity, pool ); for ( var job in lockedRecords ) { - var jobCFC = variables.deserializeJob( job.payload, job.id, job.attempts ); - incrementJobAttempts( jobCFC, pool ); - application.cbController.getModuleService().loadMappings(); - variables.marshalJob( - job = jobCFC, - pool = pool, - afterJobHook = () => { - // variables.log.debug( "Job finished. Immediately running the scheduled task again." ); - // forceRun = true; - // task.run(); - } - ); + processLockedRecord( job, pool ); } return lockedRecords.len(); @@ -275,6 +264,39 @@ component accessors="true" extends="AbstractQueueProvider" { return true; } + private void function processLockedRecord( required any record, required WorkerPool pool ) { + var jobCFC = variables.deserializeJob( + arguments.record.payload, + arguments.record.id, + arguments.record.attempts + ); + + var jobMaxAttempts = getMaxAttemptsForJob( jobCFC, arguments.pool ); + if ( jobMaxAttempts != 0 && arguments.record.attempts >= jobMaxAttempts ) { + if ( log.canWarn() ) { + log.warn( + "Job ###jobCFC.getId()# already has #arguments.record.attempts# attempts (max #jobMaxAttempts#). Marking as failed without further retry.", + { "job" : jobCFC.getMemento() } + ); + } + markJobAsFailedById( arguments.record.id, arguments.pool ); + ensureFailedBatchJobIsRecorded( + jobCFC, + "Job exceeded maximum attempts (#jobMaxAttempts#) before execution." + ); + return; + } + + incrementJobAttempts( jobCFC, arguments.pool ); + application.cbController.getModuleService().loadMappings(); + variables.marshalJob( + job = jobCFC, + pool = arguments.pool, + afterJobHook = () => { + } + ); + } + private void function incrementJobAttempts( required AbstractJob job, required WorkerPool pool ) { if ( log.canDebug() ) { log.debug( "Reserving job ###arguments.job.getId()#" ); @@ -323,6 +345,28 @@ component accessors="true" extends="AbstractQueueProvider" { .update( values = { "completedDate" : getCurrentUnixTimestamp() }, options = variables.defaultQueryOptions ); } + public void function forceFailJob( required any id, WorkerPool pool ) { + newQuery() + .table( variables.tableName ) + .where( "id", arguments.id ) + .update( + values = { + "failedDate" : getCurrentUnixTimestamp(), + "reservedBy" : { + "value" : "", + "null" : true, + "nulls" : true + }, + "reservedDate" : { + "value" : "", + "null" : true, + "nulls" : true + } + }, + options = variables.defaultQueryOptions + ); + } + private void function markJobAsFailedById( required numeric id, WorkerPool pool ) { newQuery() .table( variables.tableName ) @@ -394,12 +438,15 @@ component accessors="true" extends="AbstractQueueProvider" { variables.getCurrentUnixTimestamp() ); } ); - // past the reserved date - q1.orWhere( - "reservedDate", - "<=", - variables.getCurrentUnixTimestamp() - pool.getTimeout() - ); + // past the job's own timeout (availableDate was set to now + jobTimeout at reservation time) + q1.orWhere( ( q2 ) => { + q2.whereNotNull( "reservedDate" ) + .where( + "availableDate", + "<=", + variables.getCurrentUnixTimestamp() + ); + } ); // reserved by a worker but never released q1.orWhere( ( q3 ) => { q3.whereNull( "reservedDate" ) @@ -448,11 +495,14 @@ component accessors="true" extends="AbstractQueueProvider" { q2.whereNull( "reservedBy" ); q2.whereNull( "reservedDate" ); } ) - .orWhere( - "reservedDate", - "<=", - variables.getCurrentUnixTimestamp() - pool.getTimeout() - ); + .orWhere( ( q2 ) => { + q2.whereNotNull( "reservedDate" ) + .where( + "availableDate", + "<=", + variables.getCurrentUnixTimestamp() + ); + } ); } ) .update( values = { diff --git a/models/Providers/SyncProvider.cfc b/models/Providers/SyncProvider.cfc index 9e58fd5..c7adaf9 100644 --- a/models/Providers/SyncProvider.cfc +++ b/models/Providers/SyncProvider.cfc @@ -139,7 +139,7 @@ component accessors="true" extends="AbstractQueueProvider" { invoke( job, "onFailure", - { "excpetion" : e } + { "exception" : e } ); } diff --git a/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc b/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc new file mode 100644 index 0000000..17d0e9f --- /dev/null +++ b/resources/database/migrations/2000_01_01_000008_add_successfulJobs_to_cbq_batches_table.cfc @@ -0,0 +1,15 @@ +component { + + function up( schema ) { + schema.alter( "cbq_batches", ( t ) => { + t.addColumn( t.unsignedInteger( "successfulJobs" ).default( 0 ) ); + } ); + } + + function down( schema ) { + schema.alter( "cbq_batches", ( t ) => { + t.dropColumn( "successfulJobs" ); + } ); + } + +} diff --git a/resources/database/migrations/2000_01_01_000009_make_cbq_batches_name_nullable.cfc b/resources/database/migrations/2000_01_01_000009_make_cbq_batches_name_nullable.cfc new file mode 100644 index 0000000..7a2101f --- /dev/null +++ b/resources/database/migrations/2000_01_01_000009_make_cbq_batches_name_nullable.cfc @@ -0,0 +1,15 @@ +component { + + function up( schema, qb ) { + schema.alter( "cbq_batches", ( t ) => { + t.modifyColumn( "name", t.string( "name" ).nullable() ); + } ); + } + + function down( schema, qb ) { + schema.alter( "cbq_batches", ( t ) => { + t.modifyColumn( "name", t.string( "name" ) ); + } ); + } + +} diff --git a/tests/Application.cfc b/tests/Application.cfc index 63187e5..c8b5042 100644 --- a/tests/Application.cfc +++ b/tests/Application.cfc @@ -21,6 +21,10 @@ component { this.mappings[ "/testbox" ] = rootPath & "/testbox"; this.datasource = "cbq"; + this.javaSettings = { + "loadPaths" : [ rootPath & "/lib" ], + "reloadOnChange" : false + }; function onRequestStart() { createObject( "java", "java.lang.System" ).setProperty( "ENVIRONMENT", "testing" ); diff --git a/tests/resources/app/models/Jobs/AlwaysErrorJob.cfc b/tests/resources/app/models/Jobs/AlwaysErrorJob.cfc new file mode 100644 index 0000000..7018cf2 --- /dev/null +++ b/tests/resources/app/models/Jobs/AlwaysErrorJob.cfc @@ -0,0 +1,7 @@ +component extends="cbq.models.Jobs.AbstractJob" { + + function handle() { + throw( type = "cbq.tests.AlwaysErrorJob", message = "This job always errors for testing." ); + } + +} diff --git a/tests/resources/app/models/Jobs/BeforeAndAfterJob.cfc b/tests/resources/app/models/Jobs/BeforeAndAfterJob.cfc index 9b9202e..175a247 100644 --- a/tests/resources/app/models/Jobs/BeforeAndAfterJob.cfc +++ b/tests/resources/app/models/Jobs/BeforeAndAfterJob.cfc @@ -1,15 +1,15 @@ component extends="cbq.models.Jobs.AbstractJob" { - function handle() { - // do nothing - } + function handle() { + // do nothing + } - function before() { - application.jobBeforeCalled = true; - } + function before() { + application.jobBeforeCalled = true; + } - function after() { - application.jobAfterCalled = true; - } + function after() { + application.jobAfterCalled = true; + } -} \ No newline at end of file +} diff --git a/tests/resources/app/models/Jobs/OnFailureCapturingJob.cfc b/tests/resources/app/models/Jobs/OnFailureCapturingJob.cfc new file mode 100644 index 0000000..4741692 --- /dev/null +++ b/tests/resources/app/models/Jobs/OnFailureCapturingJob.cfc @@ -0,0 +1,15 @@ +component extends="cbq.models.Jobs.AbstractJob" { + + function handle() { + throw( + type = "cbq.tests.OnFailureCapturingJob", + message = "This job always errors to test the onFailure exception argument." + ); + } + + function onFailure() { + application.onFailureExceptionReceived = !isNull( arguments.exception ); + application.onFailureExceptionHasExcpetionKey = structKeyExists( arguments, "excpetion" ); + } + +} diff --git a/tests/resources/app/models/Jobs/ReleaseTestJob.cfc b/tests/resources/app/models/Jobs/ReleaseTestJob.cfc index 5f3b7e9..56dd6f4 100644 --- a/tests/resources/app/models/Jobs/ReleaseTestJob.cfc +++ b/tests/resources/app/models/Jobs/ReleaseTestJob.cfc @@ -1,11 +1,11 @@ component extends="cbq.models.Jobs.AbstractJob" { - function handle() { - this.release( 2 ); - } + function handle() { + this.release( 2 ); + } - function onFailure() { - this.onFailureCalled = true; - } + function onFailure() { + this.onFailureCalled = true; + } -} \ No newline at end of file +} diff --git a/tests/resources/app/models/Jobs/RequestScopeBeforeAndAfterJob.cfc b/tests/resources/app/models/Jobs/RequestScopeBeforeAndAfterJob.cfc new file mode 100644 index 0000000..b2b714d --- /dev/null +++ b/tests/resources/app/models/Jobs/RequestScopeBeforeAndAfterJob.cfc @@ -0,0 +1,15 @@ +component extends="cbq.models.Jobs.AbstractJob" { + + function handle() { + // do nothing + } + + function before() { + request.jobBeforeCalled = true; + } + + function after() { + request.jobAfterCalled = true; + } + +} diff --git a/tests/resources/app/models/Jobs/SendWelcomeEmailJob.cfc b/tests/resources/app/models/Jobs/SendWelcomeEmailJob.cfc index e454c33..119368e 100644 --- a/tests/resources/app/models/Jobs/SendWelcomeEmailJob.cfc +++ b/tests/resources/app/models/Jobs/SendWelcomeEmailJob.cfc @@ -1,9 +1,9 @@ component extends="cbq.models.Jobs.AbstractJob" { - property name="log" inject="logbox:logger:{this}"; + property name="log" inject="logbox:logger:{this}"; - function handle() { - log.info( "sending email" ); - } + function handle() { + log.info( "sending email" ); + } -} \ No newline at end of file +} diff --git a/tests/resources/app/models/Providers/FailingReleaseDBProvider.cfc b/tests/resources/app/models/Providers/FailingReleaseDBProvider.cfc new file mode 100644 index 0000000..b91d78a --- /dev/null +++ b/tests/resources/app/models/Providers/FailingReleaseDBProvider.cfc @@ -0,0 +1,17 @@ +/** + * Test fixture: a DBProvider whose releaseJob always throws. + * Used by DBProviderMaxAttemptsSpec to verify that the catch block + * in marshalJob's .onException handler correctly falls back to + * markJobFailed when releaseJob fails — without needing MockBox + * (which interferes with WireBox provider methods in async threads). + */ +component extends="cbq.models.Providers.DBProvider" { + + public void function releaseJob( required any job, required any pool ) { + throw( + type = "FailingReleaseDBProvider.SimulatedFailure", + message = "Simulated releaseJob failure for testing" + ); + } + +} diff --git a/tests/specs/integration/BatchFinallySpec.cfc b/tests/specs/integration/BatchFinallySpec.cfc new file mode 100644 index 0000000..9463d1a --- /dev/null +++ b/tests/specs/integration/BatchFinallySpec.cfc @@ -0,0 +1,72 @@ +component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { + + function run() { + describe( "batch finally dispatching", function() { + beforeEach( function() { + structDelete( request, "jobBeforeCalled" ); + structDelete( request, "jobAfterCalled" ); + + param request.jobBeforeCalled = false; + param request.jobAfterCalled = false; + } ); + + it( "dispatches the finally job when the last job fails", function() { + var cbq = getWireBox().getInstance( "@cbq" ); + registerSyncConnectionAndWorkerPool(); + + var successJob = cbq.job( "SendWelcomeEmailJob" ); + var failingJob = cbq.job( job = "ReleaseTestJob", maxAttempts = 1 ); + + var pendingBatch = cbq + .batch( [ successJob, failingJob ] ) + .onConnection( "syncBatch" ) + .onComplete( job = "RequestScopeBeforeAndAfterJob", connection = "syncBatch" ); + + try { + pendingBatch.dispatch(); + } catch ( cbq.MaxAttemptsReached e ) { + // The sync provider rethrows the terminal failure. + } + + expect( request.jobAfterCalled ).toBeTrue( "The `finally` job should dispatch even when the last job fails." ); + } ); + + it( "dispatches the finally job when all jobs succeed", function() { + var cbq = getWireBox().getInstance( "@cbq" ); + registerSyncConnectionAndWorkerPool(); + + var pendingBatch = cbq + .batch( [ + cbq.job( "SendWelcomeEmailJob" ), + cbq.job( "SendWelcomeEmailJob" ) + ] ) + .onConnection( "syncBatch" ) + .onComplete( job = "RequestScopeBeforeAndAfterJob", connection = "syncBatch" ); + + pendingBatch.dispatch(); + + expect( request.jobAfterCalled ).toBeTrue( "The `finally` job should dispatch when all batch jobs succeed." ); + } ); + } ); + } + + private void function registerSyncConnectionAndWorkerPool() { + var config = getWireBox().getInstance( "Config@cbq" ); + + if ( !config.getConnections().keyExists( "syncBatch" ) ) { + config.registerConnection( + name = "syncBatch", + provider = getWireBox().getInstance( "SyncProvider@cbq" ).setProperties( {} ) + ); + } + + if ( !config.getWorkerPools().keyExists( "syncBatch" ) ) { + config.registerWorkerPool( + name = "syncBatch", + connectionName = "syncBatch", + maxAttempts = 1 + ); + } + } + +} diff --git a/tests/specs/integration/DBBatchRepositoryCountsSpec.cfc b/tests/specs/integration/DBBatchRepositoryCountsSpec.cfc new file mode 100644 index 0000000..7505483 --- /dev/null +++ b/tests/specs/integration/DBBatchRepositoryCountsSpec.cfc @@ -0,0 +1,124 @@ +component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { + + function run() { + describe( "DBBatchRepository counts", function() { + it( "initializes successfulJobs for newly stored batches", function() { + var repository = getWireBox().getInstance( "DBBatchRepository@cbq" ); + var batch = repository.store( + getWireBox() + .getInstance( "@cbq" ) + .batch( [] ) + .allowFailures() + ); + + expect( batch.getSuccessfulJobs() ).toBe( 0 ); + } ); + + it( "successful jobs increment successfulJobs and decrement pendingJobs", function() { + var repository = getWireBox().getInstance( "DBBatchRepository@cbq" ); + var config = registerSyncConnectionAndWorkerPool(); + var batch = createTrackedBatch( repository, 1 ); + var provider = config.getConnection( "syncBatchCounts" ).getProvider(); + var pool = config.getWorkerPool( "syncBatchCounts" ); + + var job = getWireBox() + .getInstance( "@cbq" ) + .job( "SendWelcomeEmailJob" ) + .setId( createUUID() ) + .withBatchId( batch.getId() ); + + provider.marshalJob( job, pool ); + + var updatedBatch = repository.find( batch.getId() ); + + expect( updatedBatch.getPendingJobs() ).toBe( 0 ); + expect( updatedBatch.getFailedJobs() ).toBe( 0 ); + expect( updatedBatch.getSuccessfulJobs() ).toBe( 1 ); + } ); + + it( "retryable errors do not change pending, successful, or failed counts", function() { + var repository = getWireBox().getInstance( "DBBatchRepository@cbq" ); + var config = registerSyncConnectionAndWorkerPool(); + var batch = createTrackedBatch( repository, 1 ); + var provider = config.getConnection( "syncBatchCounts" ).getProvider(); + var pool = config.getWorkerPool( "syncBatchCounts" ); + + var job = getWireBox() + .getInstance( "@cbq" ) + .job( "AlwaysErrorJob" ) + .setId( createUUID() ) + .withBatchId( batch.getId() ) + .setCurrentAttempt( 1 ) + .setMaxAttempts( 2 ); + + expect( () => provider.marshalJob( job, pool ) ).toThrow( "cbq.SyncProviderJobFailed" ); + + var updatedBatch = repository.find( batch.getId() ); + + expect( updatedBatch.getPendingJobs() ).toBe( 1 ); + expect( updatedBatch.getSuccessfulJobs() ).toBe( 0 ); + expect( updatedBatch.getFailedJobs() ).toBe( 0 ); + expect( updatedBatch.getFailedJobIds() ).toBeEmpty(); + } ); + + it( "failed jobs increment failedJobs, append failedJobIds, and decrement pendingJobs", function() { + var repository = getWireBox().getInstance( "DBBatchRepository@cbq" ); + var config = registerSyncConnectionAndWorkerPool(); + var batch = createTrackedBatch( repository, 1 ); + var provider = config.getConnection( "syncBatchCounts" ).getProvider(); + var pool = config.getWorkerPool( "syncBatchCounts" ); + var failedJobId = createUUID(); + + var job = getWireBox() + .getInstance( "@cbq" ) + .job( "AlwaysErrorJob" ) + .setId( failedJobId ) + .withBatchId( batch.getId() ) + .setCurrentAttempt( 1 ) + .setMaxAttempts( 1 ); + + expect( () => provider.marshalJob( job, pool ) ).toThrow(); + + var updatedBatch = repository.find( batch.getId() ); + + expect( updatedBatch.getPendingJobs() ).toBe( 0 ); + expect( updatedBatch.getSuccessfulJobs() ).toBe( 0 ); + expect( updatedBatch.getFailedJobs() ).toBe( 1 ); + expect( updatedBatch.getFailedJobIds() ).toHaveLength( 1 ); + expect( updatedBatch.getFailedJobIds()[ 1 ] ).toBe( failedJobId ); + } ); + } ); + } + + private any function registerSyncConnectionAndWorkerPool() { + var config = getWireBox().getInstance( "Config@cbq" ); + + if ( !config.getConnections().keyExists( "syncBatchCounts" ) ) { + config.registerConnection( + name = "syncBatchCounts", + provider = getWireBox().getInstance( "SyncProvider@cbq" ).setProperties( {} ) + ); + } + + if ( !config.getWorkerPools().keyExists( "syncBatchCounts" ) ) { + config.registerWorkerPool( + name = "syncBatchCounts", + connectionName = "syncBatchCounts", + maxAttempts = 2 + ); + } + + return config; + } + + private any function createTrackedBatch( required any repository, required numeric totalJobs ) { + var pendingBatch = getWireBox() + .getInstance( "@cbq" ) + .batch( [] ) + .allowFailures(); + var batch = arguments.repository.store( pendingBatch ); + arguments.repository.incrementTotalJobs( batch.getId(), arguments.totalJobs ); + return arguments.repository.find( batch.getId() ); + } + +} diff --git a/tests/specs/integration/Providers/DBProviderMaxAttemptsSpec.cfc b/tests/specs/integration/Providers/DBProviderMaxAttemptsSpec.cfc new file mode 100644 index 0000000..3729861 --- /dev/null +++ b/tests/specs/integration/Providers/DBProviderMaxAttemptsSpec.cfc @@ -0,0 +1,268 @@ +component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { + + function run() { + describe( "DBProvider maxAttempts safeguards", function() { + beforeEach( function() { + variables.provider = getWireBox().getInstance( "DBProvider@cbq" ).setProperties( {} ); + makePublic( variables.provider, "processLockedRecord" ); + variables.pool = makeWorkerPool( variables.provider ); + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .delete(); + } ); + + afterEach( function() { + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .delete(); + } ); + + it( "forceFailJob sets failedDate and clears the reservation", function() { + var job = getWireBox().getInstance( "SendWelcomeEmailJob" ).setMaxAttempts( 3 ); + variables.provider.push( "default", job ); + + var now = javacast( "long", getTickCount() / 1000 ); + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .update( { + "reservedBy" : variables.pool.getUniqueId(), + "reservedDate" : now, + "availableDate" : now + 60, + "attempts" : 5 + } ); + + var jobId = variables.provider + .newQuery() + .from( "cbq_jobs" ) + .value( "id" ); + + variables.provider.forceFailJob( jobId, variables.pool ); + + var row = variables.provider + .newQuery() + .from( "cbq_jobs" ) + .where( "id", jobId ) + .first(); + + expect( row.failedDate ).notToBeNull( "failedDate should be set" ); + expect( row.failedDate ).toBeGT( 0, "failedDate should be a unix timestamp" ); + expect( row.reservedBy ?: "" ).toBe( "", "reservedBy should be cleared" ); + expect( row.reservedDate ?: "" ).toBe( "", "reservedDate should be cleared" ); + } ); + + it( "skips dispatch and marks the job failed when attempts already meets maxAttempts", function() { + var job = getWireBox().getInstance( "AlwaysErrorJob" ).setMaxAttempts( 3 ); + variables.provider.push( "default", job ); + + var now = javacast( "long", getTickCount() / 1000 ); + // Simulate the runaway state: 29 attempts in DB, payload still says maxAttempts=3, + // reserved by this pool but reservedDate was never set (the symptom we observed). + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .update( { + "reservedBy" : variables.pool.getUniqueId(), + "reservedDate" : { + "value" : "", + "null" : true, + "nulls" : true + }, + "availableDate" : now - 1, + "attempts" : 29 + } ); + + var record = variables.provider + .newQuery() + .from( "cbq_jobs" ) + .first(); + + prepareMock( variables.provider ); + variables.provider.$( "incrementJobAttempts" ); + variables.provider.$( "marshalJob" ); + + variables.provider.processLockedRecord( record, variables.pool ); + + expect( variables.provider.$never( "incrementJobAttempts" ) ).toBeTrue( + "incrementJobAttempts must not run once attempts >= maxAttempts" + ); + expect( variables.provider.$never( "marshalJob" ) ).toBeTrue( + "marshalJob must not run once attempts >= maxAttempts" + ); + + var row = variables.provider + .newQuery() + .from( "cbq_jobs" ) + .where( "id", record.id ) + .first(); + expect( row.failedDate ).notToBeNull( "the runaway job should be marked failed" ); + } ); + + it( "still proceeds normally when attempts is below maxAttempts", function() { + var job = getWireBox().getInstance( "SendWelcomeEmailJob" ).setMaxAttempts( 3 ); + variables.provider.push( "default", job ); + + var now = javacast( "long", getTickCount() / 1000 ); + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .update( { + "reservedBy" : variables.pool.getUniqueId(), + "reservedDate" : { + "value" : "", + "null" : true, + "nulls" : true + }, + "availableDate" : now - 1, + "attempts" : 1 + } ); + + var record = variables.provider + .newQuery() + .from( "cbq_jobs" ) + .first(); + + prepareMock( variables.provider ); + variables.provider.$( "incrementJobAttempts" ); + variables.provider.$( "marshalJob" ); + + variables.provider.processLockedRecord( record, variables.pool ); + + expect( variables.provider.$once( "incrementJobAttempts" ) ).toBeTrue( + "incrementJobAttempts should run when attempts < maxAttempts" + ); + expect( variables.provider.$once( "marshalJob" ) ).toBeTrue( + "marshalJob should run when attempts < maxAttempts" + ); + } ); + + it( "still marks the row failed when releaseJob throws inside the exception handler", function() { + // Regression: previously, if releaseJob threw inside .onException, the + // future swallowed the secondary exception and the row stayed reserved, + // causing unbounded timeout-based re-pickups. + // We use a real subclass (FailingReleaseDBProvider) instead of MockBox so that + // WireBox provider methods (newQuery) continue to work inside the async thread. + var failingProvider = getWireBox().getInstance( "FailingReleaseDBProvider" ).setProperties( {} ); + var failingPool = makeWorkerPool( failingProvider ); + + failingProvider + .newQuery() + .table( "cbq_jobs" ) + .delete(); + + var job = getWireBox() + .getInstance( "AlwaysErrorJob" ) + .setMaxAttempts( 5 ) + .setCurrentAttempt( 0 ); + + failingProvider.push( "default", job ); + + var now = javacast( "long", getTickCount() / 1000 ); + failingProvider + .newQuery() + .table( "cbq_jobs" ) + .update( { + "reservedBy" : failingPool.getUniqueId(), + "reservedDate" : now, + "availableDate" : now + 60 + } ); + + var jobId = failingProvider + .newQuery() + .from( "cbq_jobs" ) + .value( "id" ); + + job.setId( jobId ); + + try { + var jobFuture = failingProvider.marshalJob( job, failingPool ); + if ( !isNull( jobFuture ) ) { + jobFuture.get(); + } + } catch ( any e ) { + } + + var row = failingProvider + .newQuery() + .from( "cbq_jobs" ) + .where( "id", jobId ) + .first(); + + expect( row.failedDate ?: "" ).notToBe( + "", + "the row must be marked failed even when releaseJob throws, otherwise the timeout watcher will retry it forever" + ); + + failingProvider + .newQuery() + .table( "cbq_jobs" ) + .delete(); + } ); + + it( "falls back to forceFailJob when even afterJobFailed throws", function() { + // Floor of the defense: if the proper failure-recording path is broken, + // markJobFailed should escalate to forceFailJob to guarantee the row exits + // the retry loop. + var job = getWireBox() + .getInstance( "AlwaysErrorJob" ) + .setMaxAttempts( 1 ) + .setCurrentAttempt( 0 ) + .setId( randRange( 1, 1000 ) ); + + variables.provider.push( "default", job ); + var jobId = reserveJobForPool(); + job.setId( jobId ); + + prepareMock( variables.provider ); + makePublic( variables.provider, "afterJobFailed" ); + variables.provider + .$( "afterJobFailed" ) + .$throws( type = "TestSimulatedFailure", message = "simulated afterJobFailed failure" ); + variables.provider.$( "forceFailJob" ); + + try { + var jobFuture = variables.provider.marshalJob( job, variables.pool ); + if ( !isNull( jobFuture ) ) { + jobFuture.get(); + } + } catch ( any e ) { + } + + expect( variables.provider.$atLeast( 1, "forceFailJob" ) ).toBeTrue( + "forceFailJob must run when afterJobFailed throws" + ); + } ); + } ); + } + + private numeric function reserveJobForPool() { + var now = javacast( "long", getTickCount() / 1000 ); + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .update( { + "reservedBy" : variables.pool.getUniqueId(), + "reservedDate" : now, + "availableDate" : now + 60 + } ); + return variables.provider + .newQuery() + .from( "cbq_jobs" ) + .value( "id" ); + } + + private any function makeWorkerPool( required any provider ) { + var connection = getInstance( "QueueConnection@cbq" ) + .setName( "TestMaxAttemptsConnection" ) + .setProvider( arguments.provider ); + + return getInstance( "WorkerPool@cbq" ) + .setName( "TestMaxAttemptsPool" ) + .setConnection( connection ) + .setConnectionName( connection.getName() ) + .startWorkers(); + } + +} diff --git a/tests/specs/integration/Providers/DBProviderTimeoutWatcherSpec.cfc b/tests/specs/integration/Providers/DBProviderTimeoutWatcherSpec.cfc new file mode 100644 index 0000000..84a7d8a --- /dev/null +++ b/tests/specs/integration/Providers/DBProviderTimeoutWatcherSpec.cfc @@ -0,0 +1,107 @@ +component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { + + function run() { + describe( "DBProvider timeout watcher", function() { + beforeEach( function() { + variables.provider = getWireBox().getInstance( "DBProvider@cbq" ).setProperties( {} ); + makePublic( variables.provider, "fetchPotentiallyOpenRecords" ); + variables.pool = makeWorkerPool( variables.provider ); + // clean up any leftover test records + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .delete(); + } ); + + afterEach( function() { + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .delete(); + } ); + + it( "does not re-grab a reserved job that is still within its job-specific timeout", function() { + var job = getWireBox().getInstance( "SendWelcomeEmailJob" ); + variables.provider.push( "default", job ); + + // Simulate the job being reserved by another worker with a 300s job timeout + var otherWorkerUUID = createUUID(); + var now = javacast( "long", getTickCount() / 1000 ); + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .update( { + "reservedBy" : otherWorkerUUID, + "reservedDate" : now, + "availableDate" : now + 300 + } ); + + var ids = variables.provider.fetchPotentiallyOpenRecords( capacity = 10, pool = variables.pool ); + + expect( ids ).toBeEmpty( "A reserved job still within its job-specific timeout should not be re-grabbed" ); + } ); + + it( "re-grabs a reserved job whose job-specific timeout has expired", function() { + var job = getWireBox().getInstance( "SendWelcomeEmailJob" ); + variables.provider.push( "default", job ); + + // Simulate the job being reserved 310s ago with a 300s job timeout (availableDate now in the past) + var otherWorkerUUID = createUUID(); + var now = javacast( "long", getTickCount() / 1000 ); + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .update( { + "reservedBy" : otherWorkerUUID, + "reservedDate" : now - 310, + "availableDate" : now - 10 + } ); + + var ids = variables.provider.fetchPotentiallyOpenRecords( capacity = 10, pool = variables.pool ); + + expect( ids ).toHaveLength( + 1, + "A reserved job whose job-specific timeout has expired should be re-grabbed" + ); + } ); + + it( "does not re-grab a job using the pool timeout when the job-specific timeout is longer", function() { + // Core bug fix: pool timeout is 60s, job timeout is 300s. + // After 65s (past pool timeout, within job timeout), job should NOT be re-grabbed. + var job = getWireBox().getInstance( "SendWelcomeEmailJob" ); + variables.provider.push( "default", job ); + + var otherWorkerUUID = createUUID(); + var now = javacast( "long", getTickCount() / 1000 ); + // Reserved 65s ago, job timeout is 300s, so availableDate is still 235s in the future + variables.provider + .newQuery() + .table( "cbq_jobs" ) + .update( { + "reservedBy" : otherWorkerUUID, + "reservedDate" : now - 65, + "availableDate" : now + 235 + } ); + + var ids = variables.provider.fetchPotentiallyOpenRecords( capacity = 10, pool = variables.pool ); + + expect( ids ).toBeEmpty( + "A job past the pool timeout but still within its job-specific timeout should not be re-grabbed" + ); + } ); + } ); + } + + private any function makeWorkerPool( required any provider ) { + var connection = getInstance( "QueueConnection@cbq" ) + .setName( "TestTimeoutWatcherConnection" ) + .setProvider( arguments.provider ); + + return getInstance( "WorkerPool@cbq" ) + .setName( "TestTimeoutWatcherPool" ) + .setConnection( connection ) + .setConnectionName( connection.getName() ) + .startWorkers(); + } + +} diff --git a/tests/specs/integration/Providers/SyncProviderOnFailureSpec.cfc b/tests/specs/integration/Providers/SyncProviderOnFailureSpec.cfc new file mode 100644 index 0000000..15609fc --- /dev/null +++ b/tests/specs/integration/Providers/SyncProviderOnFailureSpec.cfc @@ -0,0 +1,45 @@ +component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" { + + function run() { + describe( "SyncProvider onFailure exception argument", function() { + beforeEach( function() { + structDelete( application, "onFailureExceptionReceived" ); + structDelete( application, "onFailureExceptionHasExcpetionKey" ); + } ); + + it( "passes the exception as 'exception' (not 'excpetion') to the onFailure handler", function() { + var provider = getWireBox().getInstance( "SyncProvider@cbq" ).setProperties( {} ); + var pool = makeWorkerPool( provider ); + var job = getInstance( "OnFailureCapturingJob" ) + .setId( createUUID() ) + .setCurrentAttempt( 1 ) + .setMaxAttempts( 1 ); + + param application.onFailureExceptionReceived = false; + param application.onFailureExceptionHasExcpetionKey = true; + + expect( () => provider.marshalJob( job, pool ) ).toThrow(); + + expect( application.onFailureExceptionReceived ).toBeTrue( + "onFailure should receive the exception under the key 'exception'" + ); + expect( application.onFailureExceptionHasExcpetionKey ).toBeFalse( + "onFailure should NOT receive the exception under the misspelled key 'excpetion'" + ); + } ); + } ); + } + + private any function makeWorkerPool( required any provider ) { + var connection = getInstance( "QueueConnection@cbq" ) + .setName( "TestOnFailureConnection" ) + .setProvider( arguments.provider ); + + return getInstance( "WorkerPool@cbq" ) + .setName( "TestOnFailurePool" ) + .setConnection( connection ) + .setConnectionName( connection.getName() ) + .startWorkers(); + } + +}