Skip to content

Raise TaskAlreadyRunningError when starting an already-running task instance#60855

Merged
jscheffl merged 5 commits intoapache:mainfrom
anishgirianish:fix/58441-fix-celery-task-marked-as-failed
Mar 25, 2026
Merged

Raise TaskAlreadyRunningError when starting an already-running task instance#60855
jscheffl merged 5 commits intoapache:mainfrom
anishgirianish:fix/58441-fix-celery-task-marked-as-failed

Conversation

@anishgirianish
Copy link
Contributor

@anishgirianish anishgirianish commented Jan 21, 2026

Summary

When a Celery broker redelivers a task message (e.g., due to a visibility timeout or broker failover), a second worker picks up the message and attempts to start a task instance that is already running on the original worker. Before this fix, the Execution API returned a 409 "invalid state" error, which the supervisor treated as an unhandled failure, crashing the redelivered worker with a generic ServerResponseError. This could confuse the Celery executor about the task's actual state and produce noisy, unhelpful error logs in production deployments.

This PR fixes the core (task-sdk) side of the issue. A companion PR #64052 adds the Celery provider-side handling.

Changes

  • Added TaskAlreadyRunningError in task-sdk/src/airflow/sdk/exceptions.py to distinguish "task is already running on another worker" from other API errors.
  • TaskInstanceOperations.start() in client.py now inspects the 409 response body: if reason=invalid_state and previous_state=running, it raises TaskAlreadyRunningError instead of the generic ServerResponseError.
  • ActivitySubprocess.start() propagates the TaskAlreadyRunningError up to the executor layer, allowing each executor to decide how to handle redelivery (e.g., Celery catches it and raises Ignore() to suppress state reporting in the companion PR).

Testing

  • test_client.py::test_task_instance_start_already_running : Verifies that TaskInstanceOperations.start() raises TaskAlreadyRunningError (not ServerResponseError) when the API returns 409 with previous_state=running.
  • test_supervisor.py::test_start_raises_task_already_running_and_kills_subprocess : End-to-end test: mocks a 409 API response, calls ActivitySubprocess.start(), and asserts TaskAlreadyRunningError propagates through the supervisor layer and the child process is cleaned up.
  • Manual validation via Breeze: Verified the fix end-to-end by simulating broker redelivery in a local Celery setup using breeze start-airflow --executor CeleryExecutor. Confirmed that a redelivered task no longer gets marked as failed and the original worker continues execution uninterrupted.

closes #58441

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me why we need this "Ignore" behaviour?

@anishgirianish anishgirianish force-pushed the fix/58441-fix-celery-task-marked-as-failed branch from 36b30ba to 0ca6829 Compare February 10, 2026 14:33
@anishgirianish anishgirianish requested a review from ashb February 14, 2026 23:29
@anishgirianish
Copy link
Contributor Author

@ashb Thanks for the review, I've pushed updates addressing all your feedback. Quick summary:

Why the "ignore" behaviour: In Airflow 3.x, the scheduler now processes executor events for RUNNING tasks (not just QUEUED as in 2.x). When Celery redelivers a message for an already-running task, the worker hits a 409 from the API server. Without this fix, Celery records it as a FAILURE event, and the scheduler marks the still-running task as failed. The fix lets the supervisor detect the 409 (task already running) and exit gracefully instead of propagating a failure.

Changes made:

  • Moved 409 handling into TaskInstanceOperations.start() (no more fragile dict parsing)
  • Replaced all broker/Celery-specific language with executor-agnostic terms

Ready for another look when you have a chance.

@potiuk
Copy link
Member

potiuk commented Mar 12, 2026

@anishgirianish This PR has been converted to draft because it does not yet meet our Pull Request quality criteria.

Issues found:

  • Merge conflicts: This PR has merge conflicts with the main branch. Your branch is 823 commits behind main. Please rebase your branch (git fetch origin && git rebase origin/main), resolve the conflicts, and push again. See contributing quick start.

Note: Your branch is 823 commits behind main. Some check failures may be caused by changes in the base branch rather than by your PR. Please rebase your branch and push again to get up-to-date CI results.

What to do next:

  • The comment informs you what you need to do.
  • Fix each issue, then mark the PR as "Ready for review" in the GitHub UI - but only after making sure that all the issues are fixed.
  • Maintainers will then proceed with a normal review.

Converting a PR to draft is not a rejection — it is an invitation to bring the PR up to the project's standards so that maintainer review time is spent productively. If you have questions, feel free to ask on the Airflow Slack.

@potiuk potiuk marked this pull request as draft March 12, 2026 00:57
@eladkal
Copy link
Contributor

eladkal commented Mar 14, 2026

@anishgirianish can you please rebase and resolve conflicts?

@anishgirianish
Copy link
Contributor Author

anishgirianish commented Mar 14, 2026

@eladkal sure will do thank you

@anishgirianish anishgirianish force-pushed the fix/58441-fix-celery-task-marked-as-failed branch from d7c1038 to dfec523 Compare March 14, 2026 09:56
@anishgirianish anishgirianish marked this pull request as ready for review March 14, 2026 19:59
@anishgirianish
Copy link
Contributor Author

anishgirianish commented Mar 21, 2026

@anishgirianish can you please rebase and resolve conflicts?

Hi @eladkal , I have rebased the branch and resolved the conflicts and pushed, would like to request you for the re-review. thank you

@eladkal eladkal requested a review from jscheffl March 21, 2026 06:34
Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I assume and this we are actually facing the SAME problem and this is one of the things that drags-down our stability after upgrading to Airflow 3 (FYI @AutomationDev85 / @wolfdn / @clellmann) so I'd favor merging this ASAP (1) before next provider wave and (2) before we cut 3.2.0.

Whereas I am not approving here and hesitant to merge (besides one nit comment) regarding:

a) In the past we always aimed to separate PRs from providers and core - in order to be able to be able to back-port. So in my view the core changes and provider changes should be split in two PRs. In matter of urgency before next release cut I'd accept this as exception though

b) This error handling is fixing the symptoms but the root in general is the re-delivery of the message by celery. Actually it fully would ignore these re-deliveries then. Would it not also be reasonable to turn the re-delivery feature off in general? Not sure why it is there because atm this generates way more problems than I assume it fixes.
What we do not consider in the fix is the case the worker has really "passed away" and for which the Celery re-delivery was generated first place. I am not an Celery expert but on server API in airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:182 do we maybe need to distinguish if a current heartbeat is there? HTTP 409 is resonable if the task is "alive" running and a current heartbeat is there. But if not alive (no current heartbeat) is it then still be needed to mark as failed (asking for regular retry scheme) or allow a re-start on a different worker?

Especially on (b) I am not an expert and would kindly request feedback from @ashb / @amoghrajesh / @kaxil as experts on SDK... feeling a bit 80% sure only if this is the right way to fix.
Sorry, unfortunately as of upcoming 3.2.0 release and we face the problem in our production and have many many user compliants about this it has some urgency for us as well. So my urgency is in LGTM for 3.2.0.

@jscheffl jscheffl added this to the Airflow 3.2.0 milestone Mar 21, 2026
@eladkal eladkal modified the milestones: Airflow 3.2.0, Airflow 3.1.9 Mar 21, 2026
Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now looking good - another pair of eyes would be good - but as @eladkal said, would be very good to extract the changes in providers/celery into a separate PR, then we can better backport to 3.1.9

Ah, yeah and pytests must be green :-D

@anishgirianish anishgirianish force-pushed the fix/58441-fix-celery-task-marked-as-failed branch from f3ae085 to 7554dc5 Compare March 21, 2026 21:30
@anishgirianish anishgirianish force-pushed the fix/58441-fix-celery-task-marked-as-failed branch from 7554dc5 to 8868d8c Compare March 21, 2026 21:55
Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM but needs also review from @amoghrajesh

@eladkal
Copy link
Contributor

eladkal commented Mar 24, 2026

@jscheffl would you like to proceed with this PR? We need to make a decision so that #64052 would be included in the up coming provider wave

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR aims to address Celery broker redelivery where a task that is already running gets re-delivered and incorrectly treated as a failure, by detecting an “already running” conflict from the Execution API and surfacing it as a dedicated SDK exception.

Changes:

  • Add TaskAlreadyRunningError to the Task SDK exceptions.
  • Update TaskInstanceOperations.start() to translate a specific 409 conflict (“invalid_state”, previous_state=”running”) into TaskAlreadyRunningError.
  • Adjust Task SDK client error parsing/tests so ServerResponseError.detail for JSON-object details is unwrapped, and add tests for the new “already running” start behavior.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.

File Description
task-sdk/tests/task_sdk/execution_time/test_supervisor.py Removes a supervisor-level test covering the 409 “already running” start scenario.
task-sdk/tests/task_sdk/api/test_client.py Updates assertions for ServerResponseError.detail shape; adds tests for TaskInstanceOperations.start() raising TaskAlreadyRunningError.
task-sdk/src/airflow/sdk/exceptions.py Introduces TaskAlreadyRunningError.
task-sdk/src/airflow/sdk/api/client.py Adds conflict-to-TaskAlreadyRunningError translation in task_instances.start(); enhances error parsing to support dict detail bodies.

@jscheffl
Copy link
Contributor

@jscheffl would you like to proceed with this PR? We need to make a decision so that #64052 would be included in the up coming provider wave

I know there are still ongoing problems but still would like to have more feedback fro other contributors on this. Feeling myself un-comfortable to merge and thus would rather risk to keep the problem open before merging too early. I think CoPilit responses are ok if being ignored.

@kaxil WDYT? Can we ad-hoc merge to get Celery fixes into provider wave?

@anishgirianish anishgirianish force-pushed the fix/58441-fix-celery-task-marked-as-failed branch 2 times, most recently from 252df08 to 53a7e1b Compare March 25, 2026 16:13
@anishgirianish anishgirianish requested a review from kaxil March 25, 2026 16:59
Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should change the PR title though -- it isn't fixing rather providing mechanism for executors to handle it:

My suggestion:

Raise TaskAlreadyRunningError when starting an already-running task instance

@kaxil kaxil changed the title Fix running task marked as failed on Celery broker redelivery #58441 Raise TaskAlreadyRunningError when starting an already-running task instance Mar 25, 2026
@kaxil kaxil changed the title Raise TaskAlreadyRunningError when starting an already-running task instance Raise TaskAlreadyRunningError when starting an already-running task instance Mar 25, 2026
Handle 409 CONFLICT (task already running) from the API server gracefully
by raising TaskAlreadyRunningError instead of letting it propagate as a
generic failure.

closes: apache#58441
@anishgirianish anishgirianish force-pushed the fix/58441-fix-celery-task-marked-as-failed branch from c7ff3b0 to d5d0450 Compare March 25, 2026 19:23
@jscheffl jscheffl merged commit f5ff967 into apache:main Mar 25, 2026
112 checks passed
@jscheffl
Copy link
Contributor

@vatsrahul1001 This is tagged mileston 3.1.9, will you consider this for 3.2.0 as well?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Celery: Running task marked as failed on broker redelivery

8 participants