-
Notifications
You must be signed in to change notification settings - Fork 208
Description
How to reproduce
- Create a job with a concurrency above 1 and a common concurrency_key (see code below)
- In the Rails console, enqueue many instances of the job
- In a separate terminal, run
JOB_PROCESSES=5 bin/jobs
At first, 5 jobs will run in parallel. After the first round, most of the time, fewer than 5 jobs will be processed at a time. Often, as little as one worker will process all of the jobs.
class DummJob < ApplicationJob
limits_concurrency to: 5, key: name
def perform(id)
puts "Processing job #{id}"
sleep(1)
end
end
# In the Rails console
20.times do |i|
DummJob.perform_later(i)
endThe cause
The problem is caused by a race condition and the behavior of the query used to retrieve the next blocked execution in a non-blocking way. The query below doesn't lock a single row with the specified concurrency_key, but rather locks a whole range of records with the same concurrency_key, even though only one row is returned. When two workers run this query at the same time, the second worker retrieves no records, assumes that there are no remaining blocked jobs, and moves on. This leaves the application in a state where the semaphore value is 4 (correct), but no workers are picking up the blocked jobs until they are unblocked one at a time every 10 minutes by the orchestrator. This can be dislodged by queueing more jobs since they get added to the ready_executions table, but if you have large spikes of jobs being queued (like we do), this mechanism isn't reliable.
-- This locks a range because of the index on the concurrency_key column
SELECT `solid_queue_blocked_executions`.*
FROM `solid_queue_blocked_executions`
WHERE `solid_queue_blocked_executions`.`concurrency_key` = 'DummyJob/DummyJob'
ORDER BY `solid_queue_blocked_executions`.`priority` ASC, `solid_queue_blocked_executions`.`job_id` ASC
LIMIT 1 FOR UPDATE SKIP LOCKED;
-- This locks a single row (query from docs)
SELECT job_id
FROM solid_queue_ready_executions
ORDER BY priority ASC, job_id ASC
LIMIT ?
FOR UPDATE SKIP LOCKED;Solutions
Unfortunately, I don't really have a good solution to propose, only workarounds. If we order by id instead of priority and job_id, I believe the query locks a single row at a time and fixes the problem, but I expect it will tank performance. I do, however, have some workarounds I plan on implementing.
Workarounds
By using a random number as the concurrency_key, we can limit concurrency at the cost of an inconsistent number of workers processing a job, since some numbers will be selected more often than others.
class DummJob < ApplicationJob
limits_concurrency to: 1, key: ->(_id) { SecureRandom.rand(5) }
endHowever, we can more uniformly distribute the concurrency keys using a counter. The improvement from this will depend on how many jobs are enqueued by the same process (the more by a single processor, the better).
class DummyJob < ApplicationJob
mattr_accessor :counter
limits_concurrency to: 1, key: ->(**) {
self.class.counter ||= SecureRandom.rand(5)
self.class.counter = (self.class.counter + 1) % 5
}
endI hope that's clear. Let me know what you think, and thanks for this great gem. We're aaaalllmost completely off resque and being able to query completed jobs in a SQL database for diagnosing issues has been great.