Raise TaskAlreadyRunningError when starting an already-running task instance#60855
Conversation
116a330 to
c991bc6
Compare
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py
Outdated
Show resolved
Hide resolved
ashb
left a comment
There was a problem hiding this comment.
It's not clear to me why we need this "Ignore" behaviour?
36b30ba to
0ca6829
Compare
|
@ashb Thanks for the review, I've pushed updates addressing all your feedback. Quick summary: Why the "ignore" behaviour: In Airflow 3.x, the scheduler now processes executor events for RUNNING tasks (not just QUEUED as in 2.x). When Celery redelivers a message for an already-running task, the worker hits a 409 from the API server. Without this fix, Celery records it as a FAILURE event, and the scheduler marks the still-running task as failed. The fix lets the supervisor detect the 409 (task already running) and exit gracefully instead of propagating a failure. Changes made:
Ready for another look when you have a chance. |
|
@anishgirianish This PR has been converted to draft because it does not yet meet our Pull Request quality criteria. Issues found:
What to do next:
Converting a PR to draft is not a rejection — it is an invitation to bring the PR up to the project's standards so that maintainer review time is spent productively. If you have questions, feel free to ask on the Airflow Slack. |
|
@anishgirianish can you please rebase and resolve conflicts? |
|
@eladkal sure will do thank you |
d7c1038 to
dfec523
Compare
Hi @eladkal , I have rebased the branch and resolved the conflicts and pushed, would like to request you for the re-review. thank you |
jscheffl
left a comment
There was a problem hiding this comment.
Thanks for the PR. I assume and this we are actually facing the SAME problem and this is one of the things that drags-down our stability after upgrading to Airflow 3 (FYI @AutomationDev85 / @wolfdn / @clellmann) so I'd favor merging this ASAP (1) before next provider wave and (2) before we cut 3.2.0.
Whereas I am not approving here and hesitant to merge (besides one nit comment) regarding:
a) In the past we always aimed to separate PRs from providers and core - in order to be able to be able to back-port. So in my view the core changes and provider changes should be split in two PRs. In matter of urgency before next release cut I'd accept this as exception though
b) This error handling is fixing the symptoms but the root in general is the re-delivery of the message by celery. Actually it fully would ignore these re-deliveries then. Would it not also be reasonable to turn the re-delivery feature off in general? Not sure why it is there because atm this generates way more problems than I assume it fixes.
What we do not consider in the fix is the case the worker has really "passed away" and for which the Celery re-delivery was generated first place. I am not an Celery expert but on server API in airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:182 do we maybe need to distinguish if a current heartbeat is there? HTTP 409 is resonable if the task is "alive" running and a current heartbeat is there. But if not alive (no current heartbeat) is it then still be needed to mark as failed (asking for regular retry scheme) or allow a re-start on a different worker?
Especially on (b) I am not an expert and would kindly request feedback from @ashb / @amoghrajesh / @kaxil as experts on SDK... feeling a bit 80% sure only if this is the right way to fix.
Sorry, unfortunately as of upcoming 3.2.0 release and we face the problem in our production and have many many user compliants about this it has some urgency for us as well. So my urgency is in LGTM for 3.2.0.
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
This is now looking good - another pair of eyes would be good - but as @eladkal said, would be very good to extract the changes in providers/celery into a separate PR, then we can better backport to 3.1.9
Ah, yeah and pytests must be green :-D
f3ae085 to
7554dc5
Compare
7554dc5 to
8868d8c
Compare
eladkal
left a comment
There was a problem hiding this comment.
LGTM but needs also review from @amoghrajesh
There was a problem hiding this comment.
Pull request overview
This PR aims to address Celery broker redelivery where a task that is already running gets re-delivered and incorrectly treated as a failure, by detecting an “already running” conflict from the Execution API and surfacing it as a dedicated SDK exception.
Changes:
- Add
TaskAlreadyRunningErrorto the Task SDK exceptions. - Update
TaskInstanceOperations.start()to translate a specific 409 conflict (“invalid_state”, previous_state=”running”) intoTaskAlreadyRunningError. - Adjust Task SDK client error parsing/tests so
ServerResponseError.detailfor JSON-object details is unwrapped, and add tests for the new “already running” start behavior.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| task-sdk/tests/task_sdk/execution_time/test_supervisor.py | Removes a supervisor-level test covering the 409 “already running” start scenario. |
| task-sdk/tests/task_sdk/api/test_client.py | Updates assertions for ServerResponseError.detail shape; adds tests for TaskInstanceOperations.start() raising TaskAlreadyRunningError. |
| task-sdk/src/airflow/sdk/exceptions.py | Introduces TaskAlreadyRunningError. |
| task-sdk/src/airflow/sdk/api/client.py | Adds conflict-to-TaskAlreadyRunningError translation in task_instances.start(); enhances error parsing to support dict detail bodies. |
I know there are still ongoing problems but still would like to have more feedback fro other contributors on this. Feeling myself un-comfortable to merge and thus would rather risk to keep the problem open before merging too early. I think CoPilit responses are ok if being ignored. @kaxil WDYT? Can we ad-hoc merge to get Celery fixes into provider wave? |
252df08 to
53a7e1b
Compare
kaxil
left a comment
There was a problem hiding this comment.
We should change the PR title though -- it isn't fixing rather providing mechanism for executors to handle it:
My suggestion:
Raise TaskAlreadyRunningError when starting an already-running task instance
TaskAlreadyRunningError when starting an already-running task instance
Handle 409 CONFLICT (task already running) from the API server gracefully by raising TaskAlreadyRunningError instead of letting it propagate as a generic failure. closes: apache#58441
c7ff3b0 to
d5d0450
Compare
|
@vatsrahul1001 This is tagged mileston 3.1.9, will you consider this for 3.2.0 as well? |
Summary
When a Celery broker redelivers a task message (e.g., due to a visibility timeout or broker failover), a second worker picks up the message and attempts to start a task instance that is already running on the original worker. Before this fix, the Execution API returned a 409 "invalid state" error, which the supervisor treated as an unhandled failure, crashing the redelivered worker with a generic
ServerResponseError. This could confuse the Celery executor about the task's actual state and produce noisy, unhelpful error logs in production deployments.This PR fixes the core (task-sdk) side of the issue. A companion PR #64052 adds the Celery provider-side handling.
Changes
TaskAlreadyRunningErrorintask-sdk/src/airflow/sdk/exceptions.pyto distinguish "task is already running on another worker" from other API errors.TaskInstanceOperations.start()inclient.pynow inspects the 409 response body: ifreason=invalid_stateandprevious_state=running, it raisesTaskAlreadyRunningErrorinstead of the genericServerResponseError.ActivitySubprocess.start()propagates theTaskAlreadyRunningErrorup to the executor layer, allowing each executor to decide how to handle redelivery (e.g., Celery catches it and raisesIgnore()to suppress state reporting in the companion PR).Testing
test_client.py::test_task_instance_start_already_running: Verifies thatTaskInstanceOperations.start()raisesTaskAlreadyRunningError(notServerResponseError) when the API returns 409 withprevious_state=running.test_supervisor.py::test_start_raises_task_already_running_and_kills_subprocess: End-to-end test: mocks a 409 API response, callsActivitySubprocess.start(), and assertsTaskAlreadyRunningErrorpropagates through the supervisor layer and the child process is cleaned up.breeze start-airflow --executor CeleryExecutor. Confirmed that a redelivered task no longer gets marked as failed and the original worker continues execution uninterrupted.closes #58441