diff --git a/ruby/lib/ci/queue/configuration.rb b/ruby/lib/ci/queue/configuration.rb index 9e9c133..8882f94 100644 --- a/ruby/lib/ci/queue/configuration.rb +++ b/ruby/lib/ci/queue/configuration.rb @@ -15,6 +15,7 @@ class Configuration attr_accessor :timing_redis_url attr_accessor :write_duration_averages attr_accessor :heartbeat_grace_period, :heartbeat_interval + attr_accessor :master_lock_ttl, :max_election_attempts attr_reader :circuit_breakers attr_writer :seed, :build_id attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout @@ -66,7 +67,9 @@ def initialize( branch: nil, timing_redis_url: nil, heartbeat_grace_period: 30, - heartbeat_interval: 10 + heartbeat_interval: 10, + master_lock_ttl: 30, + max_election_attempts: 3 ) @build_id = build_id @circuit_breakers = [CircuitBreaker::Disabled] @@ -105,6 +108,8 @@ def initialize( @write_duration_averages = false @heartbeat_grace_period = heartbeat_grace_period @heartbeat_interval = heartbeat_interval + @master_lock_ttl = master_lock_ttl + @max_election_attempts = max_election_attempts end def queue_init_timeout diff --git a/ruby/lib/ci/queue/redis.rb b/ruby/lib/ci/queue/redis.rb index c3876ef..96f7606 100644 --- a/ruby/lib/ci/queue/redis.rb +++ b/ruby/lib/ci/queue/redis.rb @@ -18,6 +18,7 @@ module Queue module Redis Error = Class.new(StandardError) LostMaster = Class.new(Error) + MasterDied = Class.new(Error) class << self diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 9ebdd58..221ac94 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -37,15 +37,15 @@ def created_at=(timestamp) def size redis.multi do |transaction| - transaction.llen(key('queue')) - transaction.zcard(key('running')) + transaction.llen(generation_key('queue')) + transaction.zcard(generation_key('running')) end.inject(:+) end def to_a redis.multi do |transaction| - transaction.lrange(key('queue'), 0, -1) - transaction.zrange(key('running'), 0, -1) + transaction.lrange(generation_key('queue'), 0, -1) + transaction.zrange(generation_key('running'), 0, -1) end.flatten.reverse.map { |k| index.fetch(k) } end @@ -56,11 +56,28 @@ def progress def wait_for_master(timeout: 120) return true if master? - (timeout * 10 + 1).to_i.times do - return true if queue_initialized? + deadline = CI::Queue.time_now + timeout + last_status = nil + while CI::Queue.time_now < deadline + status = master_status + + # Success - queue is ready + if status == 'ready' || status == 'finished' + learn_generation unless master? + return true + end + + # Master lock expired during setup (died mid-population) + # Status will be nil if lock expired + if status.nil? && last_status == 'setup' + raise MasterDied, "Master lock expired during setup - master may have died" + end + + last_status = status sleep 0.1 end + raise LostMaster, "The master worker (worker #{master_worker_id}) is still `#{master_status}` after #{timeout} seconds waiting." end @@ -110,7 +127,26 @@ def build_id end def master_status - redis.get(key('master-status')) + status = redis.get(key('master-status')) + # Handle new format "setup:#{generation}" - return just "setup" for compatibility + return 'setup' if status&.start_with?('setup:') + status + end + + def generation_key(*args) + gen = @generation || @current_generation + return key(*args) unless gen # Fallback for backwards compatibility + key('gen', gen, *args) + end + + def learn_generation + @current_generation = redis.get(key('current-generation')) + raise MasterDied, "No generation available - master may have died" unless @current_generation + @current_generation + end + + def current_generation + @generation || @current_generation end def eval_script(script, *args) diff --git a/ruby/lib/ci/queue/redis/build_record.rb b/ruby/lib/ci/queue/redis/build_record.rb index 0423b05..c138d8a 100644 --- a/ruby/lib/ci/queue/redis/build_record.rb +++ b/ruby/lib/ci/queue/redis/build_record.rb @@ -27,7 +27,7 @@ def passing_tests TOTAL_KEY = "___total___" def requeued_tests - requeues = redis.hgetall(key('requeues-count')) + requeues = redis.hgetall(generation_key('requeues-count')) requeues.delete(TOTAL_KEY) requeues end @@ -126,6 +126,12 @@ def record_stats(stats, pipeline: redis) def key(*args) ['build', config.build_id, *args].join(':') end + + def generation_key(*args) + gen = @queue.respond_to?(:current_generation) ? @queue.current_generation : nil + return key(*args) unless gen # Fallback for backwards compatibility + key('gen', gen, *args) + end end end end diff --git a/ruby/lib/ci/queue/redis/supervisor.rb b/ruby/lib/ci/queue/redis/supervisor.rb index c33edcf..9cdcdd6 100644 --- a/ruby/lib/ci/queue/redis/supervisor.rb +++ b/ruby/lib/ci/queue/redis/supervisor.rb @@ -9,7 +9,7 @@ def master? def total wait_for_master(timeout: config.queue_init_timeout) - redis.get(key('total')).to_i + redis.get(generation_key('total')).to_i end def build @@ -53,7 +53,7 @@ def wait_for_workers def active_workers? # if there are running jobs we assume there are still agents active - redis.zrangebyscore(key('running'), CI::Queue.time_now.to_f - config.timeout, "+inf", limit: [0,1]).count > 0 + redis.zrangebyscore(generation_key('running'), CI::Queue.time_now.to_f - config.timeout, "+inf", limit: [0,1]).count > 0 end end end diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 7de1f17..a0852f4 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -2,6 +2,7 @@ require 'ci/queue/static' require 'concurrent/set' +require 'securerandom' module CI module Queue @@ -38,19 +39,37 @@ def populate(tests, random: Random.new) @index = tests.map { |t| [t.id, t] }.to_h @total = tests.size - if acquire_master_role? - executables = reorder_tests(tests, random: random) + election_attempts = 0 + max_attempts = config.max_election_attempts - chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } - individual_tests = executables.reject { |e| e.is_a?(CI::Queue::TestChunk) } + loop do + if acquire_master_role? + executables = reorder_tests(tests, random: random) - store_chunk_metadata(chunks) if chunks.any? + chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } + individual_tests = executables.reject { |e| e.is_a?(CI::Queue::TestChunk) } - all_ids = chunks.map(&:id) + individual_tests.map(&:id) - push(all_ids) - end + store_chunk_metadata(chunks) if chunks.any? + + all_ids = chunks.map(&:id) + individual_tests.map(&:id) + push(all_ids) + end - register_worker_presence + register_worker_presence + + begin + wait_for_master(timeout: config.queue_init_timeout) + break # Queue is ready + rescue MasterDied => e + election_attempts += 1 + if election_attempts >= max_attempts + raise LostMaster, "Failed to elect master after #{max_attempts} attempts: #{e.message}" + end + @master = nil + @generation = nil + warn "Previous master died (attempt #{election_attempts}/#{max_attempts}), retrying election..." + end + end self end @@ -87,6 +106,12 @@ def poll idle_state_printed = false attempt = 0 until shutdown_required? || config.circuit_breakers.any?(&:open?) || exhausted? || max_test_failed? + # Check for generation staleness - another master may have taken over + if generation_stale? + warn "Generation changed - queue was repopulated by new master. Exiting poll loop." + break + end + if id = reserve attempt = 0 idle_since = nil @@ -104,15 +129,15 @@ def poll idle_since ||= CI::Queue.time_now if CI::Queue.time_now - idle_since > 120 && !idle_state_printed puts "Worker #{worker_id} has been idle for 120 seconds. Printing global state..." - running_tests = redis.zrange(key('running'), 0, -1, withscores: true) - puts " Processed tests: #{redis.scard(key('processed'))}" - puts " Pending tests: #{redis.llen(key('queue'))}. #{redis.lrange(key('queue'), 0, -1)}" + running_tests = redis.zrange(generation_key('running'), 0, -1, withscores: true) + puts " Processed tests: #{redis.scard(generation_key('processed'))}" + puts " Pending tests: #{redis.llen(generation_key('queue'))}. #{redis.lrange(generation_key('queue'), 0, -1)}" puts " Running tests: #{running_tests.size}. #{running_tests}" - puts " Owners: #{redis.hgetall(key('owners'))}" + puts " Owners: #{redis.hgetall(generation_key('owners'))}" unless running_tests.empty? puts ' Checking if running tests are in processed set:' running_tests.each do |test, _score| - puts " #{test}: #{redis.sismember(key('processed'), test)}" + puts " #{test}: #{redis.sismember(generation_key('processed'), test)}" end end idle_state_printed = true @@ -126,7 +151,7 @@ def poll end redis.pipelined do |pipeline| pipeline.expire(key('worker', worker_id, 'queue'), config.redis_ttl) - pipeline.expire(key('processed'), config.redis_ttl) + pipeline.expire(generation_key('processed'), config.redis_ttl) end rescue *CONNECTION_ERRORS end @@ -173,7 +198,7 @@ def acknowledge(test_or_id) begin eval_script( :acknowledge, - keys: [key('running'), key('processed'), key('owners')], + keys: [generation_key('running'), generation_key('processed'), generation_key('owners')], argv: [test_key] ) == 1 rescue StandardError => e @@ -198,12 +223,12 @@ def requeue(test, offset: Redis.requeue_offset, skip_reservation_check: false) requeued = config.max_requeues > 0 && global_max_requeues > 0 && !config.known_flaky?(test_key) && eval_script( :requeue, keys: [ - key('processed'), - key('requeues-count'), - key('queue'), - key('running'), - key('worker', worker_id, 'queue'), - key('owners') + generation_key('processed'), + generation_key('requeues-count'), + generation_key('queue'), + generation_key('running'), + key('worker', worker_id, 'queue'), # Worker-specific, not generation-scoped + generation_key('owners') ], argv: [config.max_requeues, global_max_requeues, test_key, offset] ) == 1 @@ -215,7 +240,7 @@ def requeue(test, offset: Redis.requeue_offset, skip_reservation_check: false) def release! eval_script( :release, - keys: [key('running'), key('worker', worker_id, 'queue'), key('owners')], + keys: [generation_key('running'), key('worker', worker_id, 'queue'), generation_key('owners')], argv: [] ) nil @@ -237,12 +262,12 @@ def heartbeat(test_or_id = nil) result = eval_script( :heartbeat, keys: [ - key('running'), - key('processed'), - key('owners'), - key('worker', worker_id, 'queue'), - key('heartbeats'), - key('test-group-timeout') + generation_key('running'), + generation_key('processed'), + generation_key('owners'), + key('worker', worker_id, 'queue'), # Worker-specific, not generation-scoped + generation_key('heartbeats'), + generation_key('test-group-timeout') ], argv: [current_time, test_key, config.timeout] ) @@ -264,6 +289,25 @@ def heartbeat(test_or_id = nil) attr_reader :index + def generation_key(*args) + gen = @generation || @current_generation + raise "Generation not set - call learn_generation first" unless gen + key('gen', gen, *args) + end + + def learn_generation + @current_generation = redis.get(key('current-generation')) + raise MasterDied, "No generation available - master may have died" unless @current_generation + @current_generation + end + + # Check if our cached generation is stale + def generation_stale? + return false unless @current_generation + current = redis.get(key('current-generation')) + current && current != @current_generation + end + # Runs a block while sending periodic heartbeats in a background thread. # This prevents other workers from stealing the test while it's being executed. def with_heartbeat(test_id) @@ -338,19 +382,19 @@ def try_to_reserve_test test_id = eval_script( :reserve, keys: [ - key('queue'), - key('running'), - key('processed'), - key('worker', worker_id, 'queue'), - key('owners'), - key('test-group-timeout') + generation_key('queue'), + generation_key('running'), + generation_key('processed'), + key('worker', worker_id, 'queue'), # Worker-specific, not generation-scoped + generation_key('owners'), + generation_key('test-group-timeout') ], argv: [current_time, 'true', config.timeout] ) if test_id # Check what timeout was used (dynamic or default) - dynamic_timeout = redis.hget(key('test-group-timeout'), test_id) + dynamic_timeout = redis.hget(generation_key('test-group-timeout'), test_id) timeout_used = dynamic_timeout ? dynamic_timeout.to_f : config.timeout deadline = current_time + timeout_used gap_seconds = timeout_used @@ -386,19 +430,19 @@ def try_to_reserve_lost_test lost_test = eval_script( :reserve_lost, keys: [ - key('running'), - key('completed'), - key('worker', worker_id, 'queue'), - key('owners'), - key('test-group-timeout'), - key('heartbeats') + generation_key('running'), + generation_key('completed'), + key('worker', worker_id, 'queue'), # Worker-specific, not generation-scoped + generation_key('owners'), + generation_key('test-group-timeout'), + generation_key('heartbeats') ], argv: [current_time, timeout, 'true', config.timeout, config.heartbeat_grace_period] ) if lost_test # Check what timeout was used (dynamic or default) - dynamic_timeout = redis.hget(key('test-group-timeout'), lost_test) + dynamic_timeout = redis.hget(generation_key('test-group-timeout'), lost_test) timeout_used = dynamic_timeout ? dynamic_timeout.to_f : config.timeout deadline = current_time + timeout_used gap_seconds = timeout_used @@ -429,7 +473,7 @@ def try_to_reserve_lost_test if lost_test.nil? && idle? puts "Worker #{worker_id} could not reserve a lost test while idle" puts 'Printing running tests:' - puts "#{redis.zrange(key('running'), 0, -1, withscores: true)}" + puts "#{redis.zrange(generation_key('running'), 0, -1, withscores: true)}" end build.record_warning(Warnings::RESERVED_LOST_TEST, test: lost_test, timeout: timeout) if lost_test @@ -442,13 +486,15 @@ def push(tests) if @master redis.multi do |transaction| - transaction.lpush(key('queue'), tests) unless tests.empty? - transaction.set(key('total'), @total) + transaction.lpush(generation_key('queue'), tests) unless tests.empty? + transaction.set(generation_key('total'), @total) transaction.set(key('master-status'), 'ready') + transaction.set(key('current-generation'), @generation) - transaction.expire(key('queue'), config.redis_ttl) - transaction.expire(key('total'), config.redis_ttl) + transaction.expire(generation_key('queue'), config.redis_ttl) + transaction.expire(generation_key('total'), config.redis_ttl) transaction.expire(key('master-status'), config.redis_ttl) + transaction.expire(key('current-generation'), config.redis_ttl) end end rescue *CONNECTION_ERRORS @@ -462,21 +508,38 @@ def register def acquire_master_role? return true if @master - @master = redis.setnx(key('master-status'), 'setup') + # Generate unique ID for this population attempt + @generation = SecureRandom.uuid + + # Use SET with NX and EX - embeds generation in lock value + # Lock value format: "setup:#{generation}" + @master = redis.set( + key('master-status'), + "setup:#{@generation}", + nx: true, + ex: config.master_lock_ttl + ) + if @master begin - redis.set(key('master-worker-id'), worker_id) - redis.expire(key('master-worker-id'), config.redis_ttl) - warn "Worker #{worker_id} elected as master" + redis.multi do |tx| + tx.set(key('master-worker-id'), worker_id) + tx.expire(key('master-worker-id'), config.redis_ttl) + end + warn "Worker #{worker_id} elected as master (generation #{@generation})" rescue *CONNECTION_ERRORS # If setting master-worker-id fails, we still have master status # Log but don't lose master role warn("Failed to set master-worker-id: #{$!.message}") end + else + @generation = nil # Clear generation if we didn't win end + @master rescue *CONNECTION_ERRORS @master = nil + @generation = nil false end @@ -495,17 +558,17 @@ def store_chunk_metadata(chunks) chunks.each_slice(batch_size) do |chunk_batch| redis.multi do |transaction| chunk_batch.each do |chunk| - # Store chunk metadata with TTL + # Store chunk metadata with TTL (generation-namespaced) transaction.set( - key('chunk', chunk.id), + generation_key('chunk', chunk.id), chunk.to_json ) - transaction.expire(key('chunk', chunk.id), config.redis_ttl) + transaction.expire(generation_key('chunk', chunk.id), config.redis_ttl) - # Track all chunks for cleanup - transaction.sadd(key('chunks'), chunk.id) + # Track all chunks for cleanup (generation-namespaced) + transaction.sadd(generation_key('chunks'), chunk.id) - # Store dynamic timeout for this chunk + # Store dynamic timeout for this chunk (generation-namespaced) # Timeout = estimated_duration (in ms) converted to seconds + buffer # estimated_duration is in milliseconds, convert to seconds and add 10% buffer buffer_percent = 10 @@ -513,10 +576,10 @@ def store_chunk_metadata(chunks) chunk_timeout = (estimated_duration_seconds * (1 + buffer_percent / 100.0)).round(2) # Format to string to avoid floating point precision issues in Redis # Use %g to remove trailing zeros - transaction.hset(key('test-group-timeout'), chunk.id, format('%g', chunk_timeout)) + transaction.hset(generation_key('test-group-timeout'), chunk.id, format('%g', chunk_timeout)) end - transaction.expire(key('chunks'), config.redis_ttl) - transaction.expire(key('test-group-timeout'), config.redis_ttl) + transaction.expire(generation_key('chunks'), config.redis_ttl) + transaction.expire(generation_key('test-group-timeout'), config.redis_ttl) end end end @@ -536,8 +599,8 @@ def resolve_executable(id) end def resolve_chunk(chunk_id) - # Fetch chunk metadata from Redis - chunk_json = redis.get(key('chunk', chunk_id)) + # Fetch chunk metadata from Redis (generation-namespaced) + chunk_json = redis.get(generation_key('chunk', chunk_id)) unless chunk_json warn "Warning: Chunk metadata not found for #{chunk_id}" return nil