Skip to content

Add XCom Push Support for Triggerer#64068

Open
jscheffl wants to merge 4 commits intoapache:mainfrom
jscheffl:feature/add-xcom-push-support-for-triggerer
Open

Add XCom Push Support for Triggerer#64068
jscheffl wants to merge 4 commits intoapache:mainfrom
jscheffl:feature/add-xcom-push-support-for-triggerer

Conversation

@jscheffl
Copy link
Contributor

In devlist discussion https://lists.apache.org/thread/6znvd5rtqnxt5r4hys7qn64j5mflr9g1 following PR #63489 to propose for directly enqueue tasks it came up that KPO mainly returns to worker to execute callbacks and retrieve XComs.

As callbacks would be hard to be executed in Triggerer but XComs are a blocker for most users when using KPO, this PR enabled returning XCom data from triggerers in Airflow Core. With this it would be possible to close and complete Pods with standard KPO if this is added to Airflow 3.2.0 scope.


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Claude Opus 4.6


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

Copy link
Member

@hussein-awala hussein-awala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the long term, I think we should consider adding a supervisor to the triggerer — similar to how the task SDK works — so that triggers can interact with XCom, variables, and other components through a proper SDK interface. That said, the current approach of pushing XCom at the models layer during event processing is a pragmatic solution and looks good for now.

Could you please document the new approach? (LGTM once it's done)

@dabla
Copy link
Contributor

dabla commented Mar 22, 2026

For the long term, I think we should consider adding a supervisor to the triggerer — similar to how the task SDK works — so that triggers can interact with XCom, variables, and other components through a proper SDK interface. That said, the current approach of pushing XCom at the models layer during event processing is a pragmatic solution and looks good for now.

Could you please document the new approach? (LGTM once it's done)

Like the idea!

@dabla
Copy link
Contributor

dabla commented Mar 22, 2026

@jscheffl Why not push XCom’s directly on task_instance of trigger? Why? Because now the XCom’s are passed via the EventTrigger which is fully persisted in DB. I would push them directly via task_instance on trigger, it won’t be persisted with the event in the DB, which can grow easily with XCom’s.

@jscheffl
Copy link
Contributor Author

@jscheffl Why not push XCom’s directly on task_instance of trigger? Why? Because now the XCom’s are passed via the EventTrigger which is fully persisted in DB. I would push them directly via task_instance on trigger, it won’t be persisted with the event in the DB, which can grow easily with XCom’s.

Does not work, the self.task_instance in the triggerer execution is from type airflow.executors.workloads.TaskInstance and this is a minimal stup with no xcom_push() implementation.

Also a manual XCom push is not working as in the triggerer code there is no DB session.

So, yes this proposal here has the trade-off that XCom is serialized once in the event and then another time from the event back to XCom.

@hussein-awala
Copy link
Member

A note on the API design: since xcoms is now on TriggerEvent (the base class), it's also available on events yielded by BaseEventTrigger subclasses (asset watchers). However, in submit_event(), the asset path only uses event.payload — any xcoms set on a watcher event would be silently ignored. This isn't a bug, but it could be confusing for users who set xcoms on a watcher trigger and expect them to land somewhere. It might be worth either:

  1. Adding a warning log in submit_event() when event.xcoms is set but there are no deferred task instances to push them to, or
  2. Documenting that xcoms only applies to task-deferred triggers, not asset watchers.

@jscheffl jscheffl force-pushed the feature/add-xcom-push-support-for-triggerer branch from 254c9c0 to e9085ea Compare March 22, 2026 21:30
@uranusjr
Copy link
Member

uranusjr commented Mar 23, 2026

At a quick glance I think this won’t work for custom XCom backends? Since those values can’t make through API call serde.

@dabla
Copy link
Contributor

dabla commented Mar 23, 2026

@jscheffl Why not push XCom’s directly on task_instance of trigger? Why? Because now the XCom’s are passed via the EventTrigger which is fully persisted in DB. I would push them directly via task_instance on trigger, it won’t be persisted with the event in the DB, which can grow easily with XCom’s.

Does not work, the self.task_instance in the triggerer execution is from type airflow.executors.workloads.TaskInstance and this is a minimal stup with no xcom_push() implementation.

Also a manual XCom push is not working as in the triggerer code there is no DB session.

So, yes this proposal here has the trade-off that XCom is serialized once in the event and then another time from the event back to XCom.

I could if this PR would get merged, we would just need to always convert the airflow.executors.workloads.TaskInstance to the RuntimeTaskInstance instead of only when start_from_trigger is enabled.

Here is the code that handles it: https://github.com/apache/airflow/pull/55068/changes#diff-e4cc497f1c786d142ce4c930f43e33b0bb4b53d375d274278fa82f4d5567608aR1005

@jscheffl
Copy link
Contributor Author

@jscheffl Why not push XCom’s directly on task_instance of trigger? Why? Because now the XCom’s are passed via the EventTrigger which is fully persisted in DB. I would push them directly via task_instance on trigger, it won’t be persisted with the event in the DB, which can grow easily with XCom’s.

Does not work, the self.task_instance in the triggerer execution is from type airflow.executors.workloads.TaskInstance and this is a minimal stup with no xcom_push() implementation.
Also a manual XCom push is not working as in the triggerer code there is no DB session.
So, yes this proposal here has the trade-off that XCom is serialized once in the event and then another time from the event back to XCom.

I could if this PR would get merged, we would just need to always convert the airflow.executors.workloads.TaskInstance to the RuntimeTaskInstance instead of only when start_from_trigger is enabled.

Here is the code that handles it: https://github.com/apache/airflow/pull/55068/changes#diff-e4cc497f1c786d142ce4c930f43e33b0bb4b53d375d274278fa82f4d5567608aR1005

Oh, ah, yeah! That would make it easier.

Whereas also respecting the comments from @uranusjr the integration of a custom XCom backend also is limited in Triggerer? Because usually no XCom backend implements IO in async... so I am not sure besides the other PR proposed this is also adding a hairball of complexity to "just push XCom" from Triggerer :-(

@uranusjr uranusjr modified the milestones: Airflow 3.2.0, Airflow 3.3.0 Mar 25, 2026
Copy link
Member

@uranusjr uranusjr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gonna move this to 3.3 for now, there are still multiple points to discuss and we’re too close to the split. Putting this red marker here to prevent someone from accidentally merging.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants