diff --git a/app/jobs/runtime/delayed_jobs_recover.rb b/app/jobs/runtime/delayed_jobs_recover.rb new file mode 100644 index 00000000000..3d38ddbe5be --- /dev/null +++ b/app/jobs/runtime/delayed_jobs_recover.rb @@ -0,0 +1,91 @@ +module VCAP::CloudController + module Jobs + module Runtime + class DelayedJobsRecover < VCAP::CloudController::Jobs::CCJob + def perform + logger.info('Recover halted delayed jobs') + recover + end + + def max_attempts + 1 + end + + private + + def recover + # Find stuck service instance create operations where the broker is still working + # but CC's polling job has permanently failed due to a transient error (e.g. brief db connection flip). + # Join path: service_instance_operations → service_instances → jobs → delayed_jobs. + # + # Filters: + # - service_instance_operations.state='in progress': the broker has not yet reported a final state + # (succeeded or failed) that CC could successfully persist; if CC had received and saved a final + # state from the broker, this column would already be 'succeeded' or 'failed' — not 'in progress' + # - service_instance_operations.type='create': scope to create operations only + # - service_instance_operations.created_at > cutoff: operations beyond the max async polling window + # are intentionally excluded — the broker has given up on them too, so re-enqueuing is pointless + # - jobs.state IN (POLLING, FAILED): the pollable job has not reached a terminal success state; + # POLLING covers the case where the failure hook itself couldn't write FAILED due to the DB flip + # - jobs.operation='service_instance.create': prevents matching update/delete jobs for the same + # service instance that happen to share the same resource_guid + # - delayed_jobs.failed_at IS NOT NULL: the delayed job permanently failed (exhausted max_attempts); + # jobs still alive or locked have failed_at=NULL and must not be touched + cutoff_time = Time.now - default_maximum_duration_seconds + stuck = ServiceInstanceOperation. + join(:service_instances, id: Sequel[:service_instance_operations][:service_instance_id]). + join(:jobs, resource_guid: Sequel[:service_instances][:guid]). + join(:delayed_jobs, guid: Sequel[:jobs][:delayed_job_guid]). + where(Sequel[:service_instance_operations][:state] => 'in progress'). + where(Sequel[:service_instance_operations][:type] => 'create'). + where { Sequel[:service_instance_operations][:created_at] > cutoff_time }. + where(Sequel[:jobs][:state] => [PollableJobModel::POLLING_STATE, PollableJobModel::FAILED_STATE]). + where(Sequel[:jobs][:operation] => 'service_instance.create'). + exclude(Sequel[:delayed_jobs][:failed_at] => nil). + select(Sequel[:jobs][:guid].as(:pollable_guid), Sequel[:delayed_jobs][:guid].as(:dj_guid)). + order(Sequel[:service_instance_operations][:created_at]). + limit(batch_size) + + stuck.each do |row| + delayed = Delayed::Job.first(guid: row[:dj_guid]) + next unless delayed + + reenqueue(row[:pollable_guid], delayed) + end + end + + def reenqueue(pollable_guid, delayed) + # re-verify atomically that the pollable job still points to this dead delayed_job. + # if another process already re-enqueued a new job, pollable.delayed_job_guid was + # updated to the new delayed_job's guid, so where clause returns nil and we skip safely. + PollableJobModel.db.transaction do + pjob = PollableJobModel.where(guid: pollable_guid, + delayed_job_guid: delayed.guid, + state: [PollableJobModel::POLLING_STATE, PollableJobModel::FAILED_STATE]). + for_update.first + return unless pjob + + # bring the pollable job into the clean polling state + pjob.update(cf_api_error: nil, state: PollableJobModel::POLLING_STATE) + + # unwrap the serialized handler and re-enqueue via the reoccurring job's enqueue_next_job method + inner_job = Jobs::Enqueuer.unwrap_job(delayed.payload_object) + inner_job.send(:enqueue_next_job, pjob) + end + end + + def default_maximum_duration_seconds + Config.config.get(:broker_client_max_async_poll_duration_minutes).minutes + end + + def logger + @logger ||= Steno.logger('cc.background') + end + + def batch_size + 10 + end + end + end + end +end diff --git a/config/cloud_controller.yml b/config/cloud_controller.yml index 0983c7f0673..a9fd9394a8d 100644 --- a/config/cloud_controller.yml +++ b/config/cloud_controller.yml @@ -369,7 +369,8 @@ diego_sync: pending_droplets: frequency_in_seconds: 300 expiration_in_seconds: 42 - +delayed_jobs_recover: + frequency_in_seconds: 600 pending_builds: expiration_in_seconds: 42 frequency_in_seconds: 300 diff --git a/db/migrations/20260505071445_add_jobs_operation_state_index.rb b/db/migrations/20260505071445_add_jobs_operation_state_index.rb new file mode 100644 index 00000000000..aeefa7f731e --- /dev/null +++ b/db/migrations/20260505071445_add_jobs_operation_state_index.rb @@ -0,0 +1,38 @@ +Sequel.migration do + no_transaction # required for concurrently option on postgres + + up do + if database_type == :postgres + VCAP::Migration.with_concurrent_timeout(self) do + add_index :jobs, %i[operation state], + name: :jobs_operation_state_index, + where: "state IN ('POLLING', 'FAILED')", + if_not_exists: true, + concurrently: true + end + elsif database_type == :mysql + alter_table(:jobs) do + # rubocop:disable Sequel/ConcurrentIndex -- MySQL does not support concurrent index operations + add_index %i[operation state], name: :jobs_operation_state_index unless @db.indexes(:jobs).key?(:jobs_operation_state_index) + # rubocop:enable Sequel/ConcurrentIndex + end + end + end + + down do + if database_type == :postgres + VCAP::Migration.with_concurrent_timeout(self) do + drop_index :jobs, %i[operation state], + name: :jobs_operation_state_index, + if_exists: true, + concurrently: true + end + elsif database_type == :mysql + alter_table(:jobs) do + # rubocop:disable Sequel/ConcurrentIndex + drop_index %i[operation state], name: :jobs_operation_state_index if @db.indexes(:jobs).key?(:jobs_operation_state_index) + # rubocop:enable Sequel/ConcurrentIndex + end + end + end +end diff --git a/lib/cloud_controller/clock/scheduler.rb b/lib/cloud_controller/clock/scheduler.rb index 388b5db11d5..914dc67e350 100644 --- a/lib/cloud_controller/clock/scheduler.rb +++ b/lib/cloud_controller/clock/scheduler.rb @@ -24,7 +24,8 @@ class Scheduler { name: 'pending_droplets', class: Jobs::Runtime::PendingDropletCleanup }, { name: 'pending_builds', class: Jobs::Runtime::PendingBuildCleanup }, { name: 'failed_jobs', class: Jobs::Runtime::FailedJobsCleanup }, - { name: 'service_operations_initial_cleanup', class: Jobs::Runtime::ServiceOperationsInitialCleanup } + { name: 'service_operations_initial_cleanup', class: Jobs::Runtime::ServiceOperationsInitialCleanup }, + { name: 'delayed_jobs_recover', class: Jobs::Runtime::DelayedJobsRecover } ].freeze def initialize(config) diff --git a/lib/cloud_controller/config_schemas/clock_schema.rb b/lib/cloud_controller/config_schemas/clock_schema.rb index 81976bbf551..3ad399e50ca 100644 --- a/lib/cloud_controller/config_schemas/clock_schema.rb +++ b/lib/cloud_controller/config_schemas/clock_schema.rb @@ -34,6 +34,9 @@ class ClockSchema < VCAP::Config completed_tasks: { cutoff_age_in_days: Integer }, + delayed_jobs_recover: { + frequency_in_seconds: Integer + }, default_health_check_timeout: Integer, uaa: { diff --git a/lib/cloud_controller/jobs.rb b/lib/cloud_controller/jobs.rb index 9f39f53d152..47d49f4dde9 100644 --- a/lib/cloud_controller/jobs.rb +++ b/lib/cloud_controller/jobs.rb @@ -25,6 +25,7 @@ require 'jobs/runtime/expired_blob_cleanup' require 'jobs/runtime/expired_orphaned_blob_cleanup' require 'jobs/runtime/expired_resource_cleanup' +require 'jobs/runtime/delayed_jobs_recover' require 'jobs/runtime/failed_jobs_cleanup' require 'jobs/runtime/service_operations_initial_cleanup' require 'jobs/runtime/legacy_jobs' diff --git a/lib/tasks/jobs.rake b/lib/tasks/jobs.rake index 86c33fe9208..74946b32a25 100644 --- a/lib/tasks/jobs.rake +++ b/lib/tasks/jobs.rake @@ -49,6 +49,7 @@ namespace :jobs do 'audit_events', 'failed_jobs', 'service_operations_initial_cleanup', + 'delayed_jobs_recover', 'service_usage_events', 'completed_tasks', 'expired_blob_cleanup', diff --git a/spec/migrations/20260505071445_add_jobs_operation_state_index_spec.rb b/spec/migrations/20260505071445_add_jobs_operation_state_index_spec.rb new file mode 100644 index 00000000000..65edad88827 --- /dev/null +++ b/spec/migrations/20260505071445_add_jobs_operation_state_index_spec.rb @@ -0,0 +1,63 @@ +# rubocop:disable Migration/TooManyMigrationRuns +require 'spec_helper' +require 'migrations/helpers/migration_shared_context' + +def operation_state_partial_index_present + # partial indexes are not returned in `db.indexes`. That's why we have to query this information manually. + partial_indexes = db.fetch("SELECT * FROM pg_indexes WHERE tablename = 'jobs' AND indexname = 'jobs_operation_state_index';") + + index_present = false + partial_indexes.each do |_index| + index_present = true + end + + index_present +end + +RSpec.describe 'migration to add operation_state_index on jobs table', isolation: :truncation, type: :migration do + include_context 'migration' do + let(:migration_filename) { '20260505071445_add_jobs_operation_state_index.rb' } + end + + describe 'jobs table' do + it 'adds index and handles idempotency gracefully' do + if db.database_type == :postgres + # Test up migration + expect(operation_state_partial_index_present).to be_falsey + expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true) }.not_to raise_error + expect(operation_state_partial_index_present).to be_truthy + + # Test up migration idempotency + expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true) }.not_to raise_error + expect(operation_state_partial_index_present).to be_truthy + + # Test down migration + expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true) }.not_to raise_error + expect(operation_state_partial_index_present).to be_falsey + + # Test down migration idempotency + expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true) }.not_to raise_error + expect(operation_state_partial_index_present).to be_falsey + + elsif db.database_type == :mysql + # Test up migration + expect(db.indexes(:jobs)).not_to include(:jobs_operation_state_index) + expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true) }.not_to raise_error + expect(db.indexes(:jobs)).to include(:jobs_operation_state_index) + + # Test up migration idempotency + expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true) }.not_to raise_error + expect(db.indexes(:jobs)).to include(:jobs_operation_state_index) + + # Test down migration + expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true) }.not_to raise_error + expect(db.indexes(:jobs)).not_to include(:jobs_operation_state_index) + + # Test down migration idempotency + expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true) }.not_to raise_error + expect(db.indexes(:jobs)).not_to include(:jobs_operation_state_index) + end + end + end +end +# rubocop:enable Migration/TooManyMigrationRuns diff --git a/spec/unit/jobs/runtime/delayed_jobs_recover_spec.rb b/spec/unit/jobs/runtime/delayed_jobs_recover_spec.rb new file mode 100644 index 00000000000..485e5df7969 --- /dev/null +++ b/spec/unit/jobs/runtime/delayed_jobs_recover_spec.rb @@ -0,0 +1,194 @@ +require 'spec_helper' + +module VCAP::CloudController + module Jobs::Runtime + RSpec.describe DelayedJobsRecover, job_context: :worker do + subject(:job) { DelayedJobsRecover.new } + + let(:fake_logger) { instance_double(Steno::Logger, info: nil, warn: nil) } + let(:max_poll_duration_minutes) { 60 } + + before do + allow(Steno).to receive(:logger).and_return(fake_logger) + TestConfig.override(broker_client_max_async_poll_duration_minutes: max_poll_duration_minutes) + end + + # Builds a fully stuck scenario that the job should pick up and re-enqueue by default. + # All filter conditions are satisfied: sio is in progress/create/within cutoff, + # pjob is FAILED with operation=service_instance.create, delayed_job has failed_at set. + # Override individual parameters to break a single filter and test exclusion. + def make_stuck_scenario( + sio_state: 'in progress', + sio_type: 'create', + sio_created_at: Time.now, + pjob_state: PollableJobModel::FAILED_STATE, + dj_failed_at: Time.now + ) + service_instance = ManagedServiceInstance.make + + ServiceInstanceOperation.make( + service_instance_id: service_instance.id, + type: sio_type, + state: sio_state, + created_at: sio_created_at + ) + + dj = Delayed::Job.create!( + guid: SecureRandom.uuid, + handler: 'fake', + run_at: Time.now, + failed_at: dj_failed_at, + queue: 'cc-generic' + ) + + pjob = PollableJobModel.make( + state: pjob_state, + operation: 'service_instance.create', + resource_guid: service_instance.guid, + resource_type: 'service_instances', + delayed_job_guid: dj.guid + ) + + { service_instance: service_instance, pjob: pjob, delayed_job: dj } + end + + it { is_expected.to be_a_valid_job } + + describe '#perform' do + context 'when there are no stuck jobs' do + it 'does nothing' do + make_stuck_scenario(sio_state: 'succeeded') + expect(fake_logger).to receive(:info).with('Recover halted delayed jobs') + expect { job.perform }.not_to(change { PollableJobModel.where(state: PollableJobModel::POLLING_STATE).count }) + end + end + + context 'when sio state is not in progress' do + it 'does not re-enqueue' do + scenario = make_stuck_scenario(sio_state: 'succeeded') + job.perform + expect(scenario[:pjob].reload.state).to eq(PollableJobModel::FAILED_STATE) + end + end + + context 'when sio type is not create' do + it 'does not re-enqueue' do + scenario = make_stuck_scenario(sio_type: 'update') + job.perform + expect(scenario[:pjob].reload.state).to eq(PollableJobModel::FAILED_STATE) + end + end + + context 'when sio created_at is beyond the max polling window' do + it 'does not re-enqueue' do + scenario = make_stuck_scenario(sio_created_at: Time.now - (max_poll_duration_minutes + 1).minutes) + job.perform + expect(scenario[:pjob].reload.state).to eq(PollableJobModel::FAILED_STATE) + end + end + + context 'when delayed_job.failed_at is nil (job still running or locked)' do + it 'does not re-enqueue' do + scenario = make_stuck_scenario(dj_failed_at: nil) + job.perform + expect(scenario[:pjob].reload.state).to eq(PollableJobModel::FAILED_STATE) + end + end + + context 'when pollable job state is COMPLETE' do + it 'does not re-enqueue' do + scenario = make_stuck_scenario(pjob_state: PollableJobModel::COMPLETE_STATE) + job.perform + expect(scenario[:pjob].reload.state).to eq(PollableJobModel::COMPLETE_STATE) + end + end + + context 'when pollable job state is PROCESSING' do + it 'does not re-enqueue' do + scenario = make_stuck_scenario(pjob_state: PollableJobModel::PROCESSING_STATE) + job.perform + expect(scenario[:pjob].reload.state).to eq(PollableJobModel::PROCESSING_STATE) + end + end + + context 'when pollable job operation is not service_instance.create' do + it 'does not re-enqueue' do + scenario = make_stuck_scenario + scenario[:pjob].update(operation: 'service_instance.update') + job.perform + expect(scenario[:pjob].reload.state).to eq(PollableJobModel::FAILED_STATE) + end + end + + context 'when a job is stuck with state FAILED' do + it 'calls reenqueue' do + scenario = make_stuck_scenario + expect_any_instance_of(described_class).to receive(:reenqueue).with(scenario[:pjob].guid, anything) + job.perform + end + end + + context 'when a job is stuck with state POLLING' do + it 'calls reenqueue (covers DB flip before failure hook could write FAILED)' do + scenario = make_stuck_scenario(pjob_state: PollableJobModel::POLLING_STATE) + expect_any_instance_of(described_class).to receive(:reenqueue).with(scenario[:pjob].guid, anything) + job.perform + end + end + + context 'when there are multiple stuck jobs within the batch size' do + it 'calls reenqueue for each' do + 3.times { make_stuck_scenario } + expect_any_instance_of(described_class).to receive(:reenqueue).exactly(3).times + job.perform + end + end + + context 'when there are more stuck jobs than the batch size (10)' do + it 'processes only up to 10 jobs per run' do + 11.times { make_stuck_scenario } + expect_any_instance_of(described_class).to receive(:reenqueue).exactly(10).times + job.perform + end + end + end + + describe '#reenqueue' do + let(:inner_job) { instance_double(Jobs::ReoccurringJob) } + + before do + allow(Jobs::Enqueuer).to receive(:unwrap_job).and_return(inner_job) + allow(inner_job).to receive(:enqueue_next_job) + end + + it 'resets pjob to POLLING state and clears cf_api_error' do + scenario = make_stuck_scenario + scenario[:pjob].update(cf_api_error: 'some error') + + job.send(:reenqueue, scenario[:pjob].guid, scenario[:delayed_job]) + + expect(scenario[:pjob].reload.state).to eq(PollableJobModel::POLLING_STATE) + expect(scenario[:pjob].reload.cf_api_error).to be_nil + end + + it 'calls enqueue_next_job on the unwrapped inner job' do + scenario = make_stuck_scenario + + expect(inner_job).to receive(:enqueue_next_job).with(instance_of(PollableJobModel)) + + job.send(:reenqueue, scenario[:pjob].guid, scenario[:delayed_job]) + end + + context 'when another process already re-enqueued the job (delayed_job_guid changed)' do + it 'skips without raising and does not call enqueue_next_job' do + scenario = make_stuck_scenario + scenario[:pjob].update(delayed_job_guid: 'some-other-guid') + + expect(inner_job).not_to receive(:enqueue_next_job) + expect { job.send(:reenqueue, scenario[:pjob].guid, scenario[:delayed_job]) }.not_to raise_error + end + end + end + end + end +end diff --git a/spec/unit/lib/cloud_controller/clock/scheduler_spec.rb b/spec/unit/lib/cloud_controller/clock/scheduler_spec.rb index 20d0e49b5bd..beb7a885091 100644 --- a/spec/unit/lib/cloud_controller/clock/scheduler_spec.rb +++ b/spec/unit/lib/cloud_controller/clock/scheduler_spec.rb @@ -21,6 +21,7 @@ module VCAP::CloudController failed_jobs: { frequency_in_seconds: 400, cutoff_age_in_days: 4, max_number_of_failed_delayed_jobs: 10 }, pollable_jobs: { cutoff_age_in_days: 2 }, service_operations_initial_cleanup: { frequency_in_seconds: 600 }, + delayed_jobs_recover: { frequency_in_seconds: 600 }, service_usage_events: { cutoff_age_in_days: 5 }, completed_tasks: { cutoff_age_in_days: 6 }, pending_droplets: { frequency_in_seconds: 300, expiration_in_seconds: 600 }, @@ -161,6 +162,12 @@ module VCAP::CloudController expect(block.call).to be_instance_of(Jobs::Runtime::ServiceOperationsInitialCleanup) end + expect(clock).to receive(:schedule_frequent_worker_job) do |args, &block| + expect(args).to eql(name: 'delayed_jobs_recover', interval: 600) + expect(Jobs::Runtime::DelayedJobsRecover).to receive(:new).and_call_original + expect(block.call).to be_instance_of(Jobs::Runtime::DelayedJobsRecover) + end + schedule.start end