Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion ruby/lib/ci/queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class Configuration
attr_accessor :timing_redis_url
attr_accessor :write_duration_averages
attr_accessor :heartbeat_grace_period, :heartbeat_interval
attr_accessor :master_lock_ttl, :max_election_attempts
attr_reader :circuit_breakers
attr_writer :seed, :build_id
attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout
Expand Down Expand Up @@ -66,7 +67,9 @@ def initialize(
branch: nil,
timing_redis_url: nil,
heartbeat_grace_period: 30,
heartbeat_interval: 10
heartbeat_interval: 10,
master_lock_ttl: 30,
max_election_attempts: 3
)
@build_id = build_id
@circuit_breakers = [CircuitBreaker::Disabled]
Expand Down Expand Up @@ -105,6 +108,8 @@ def initialize(
@write_duration_averages = false
@heartbeat_grace_period = heartbeat_grace_period
@heartbeat_interval = heartbeat_interval
@master_lock_ttl = master_lock_ttl
@max_election_attempts = max_election_attempts
end

def queue_init_timeout
Expand Down
1 change: 1 addition & 0 deletions ruby/lib/ci/queue/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ module Queue
module Redis
Error = Class.new(StandardError)
LostMaster = Class.new(Error)
MasterDied = Class.new(Error)

class << self

Expand Down
50 changes: 43 additions & 7 deletions ruby/lib/ci/queue/redis/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ def created_at=(timestamp)

def size
redis.multi do |transaction|
transaction.llen(key('queue'))
transaction.zcard(key('running'))
transaction.llen(generation_key('queue'))
transaction.zcard(generation_key('running'))
end.inject(:+)
end

def to_a
redis.multi do |transaction|
transaction.lrange(key('queue'), 0, -1)
transaction.zrange(key('running'), 0, -1)
transaction.lrange(generation_key('queue'), 0, -1)
transaction.zrange(generation_key('running'), 0, -1)
end.flatten.reverse.map { |k| index.fetch(k) }
end

Expand All @@ -56,11 +56,28 @@ def progress
def wait_for_master(timeout: 120)
return true if master?

(timeout * 10 + 1).to_i.times do
return true if queue_initialized?
deadline = CI::Queue.time_now + timeout
last_status = nil

while CI::Queue.time_now < deadline
status = master_status

# Success - queue is ready
if status == 'ready' || status == 'finished'
learn_generation unless master?
return true
end

# Master lock expired during setup (died mid-population)
# Status will be nil if lock expired
if status.nil? && last_status == 'setup'
raise MasterDied, "Master lock expired during setup - master may have died"
end

last_status = status
sleep 0.1
end

raise LostMaster, "The master worker (worker #{master_worker_id}) is still `#{master_status}` after #{timeout} seconds waiting."
end

Expand Down Expand Up @@ -110,7 +127,26 @@ def build_id
end

def master_status
redis.get(key('master-status'))
status = redis.get(key('master-status'))
# Handle new format "setup:#{generation}" - return just "setup" for compatibility
return 'setup' if status&.start_with?('setup:')
status
end

def generation_key(*args)
gen = @generation || @current_generation
return key(*args) unless gen # Fallback for backwards compatibility
key('gen', gen, *args)
end

def learn_generation
@current_generation = redis.get(key('current-generation'))
raise MasterDied, "No generation available - master may have died" unless @current_generation
@current_generation
end

def current_generation
@generation || @current_generation
end

def eval_script(script, *args)
Expand Down
8 changes: 7 additions & 1 deletion ruby/lib/ci/queue/redis/build_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def passing_tests

TOTAL_KEY = "___total___"
def requeued_tests
requeues = redis.hgetall(key('requeues-count'))
requeues = redis.hgetall(generation_key('requeues-count'))
requeues.delete(TOTAL_KEY)
requeues
end
Expand Down Expand Up @@ -126,6 +126,12 @@ def record_stats(stats, pipeline: redis)
def key(*args)
['build', config.build_id, *args].join(':')
end

def generation_key(*args)
gen = @queue.respond_to?(:current_generation) ? @queue.current_generation : nil
return key(*args) unless gen # Fallback for backwards compatibility
key('gen', gen, *args)
end
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions ruby/lib/ci/queue/redis/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def master?

def total
wait_for_master(timeout: config.queue_init_timeout)
redis.get(key('total')).to_i
redis.get(generation_key('total')).to_i
end

def build
Expand Down Expand Up @@ -53,7 +53,7 @@ def wait_for_workers

def active_workers?
# if there are running jobs we assume there are still agents active
redis.zrangebyscore(key('running'), CI::Queue.time_now.to_f - config.timeout, "+inf", limit: [0,1]).count > 0
redis.zrangebyscore(generation_key('running'), CI::Queue.time_now.to_f - config.timeout, "+inf", limit: [0,1]).count > 0
end
end
end
Expand Down
Loading
Loading