Add async XCom accessors for async tasks#68299
Open
dabla wants to merge 3 commits into
Open
Conversation
jroachgolf84
approved these changes
Jun 9, 2026
bda0641 to
8831649
Compare
8831649 to
7ee450c
Compare
Contributor
Author
|
Same here, it would be good if @ashb or @amoghrajesh could validate this PR. |
ashb
reviewed
Jun 10, 2026
| ) | ||
|
|
||
| if not isinstance(msg, XComSequenceSliceResult): | ||
| raise TypeError(f"Expected XComSequenceSliceResult, received: {type(msg)} {msg}") |
Member
There was a problem hiding this comment.
Suggested change
| raise TypeError(f"Expected XComSequenceSliceResult, received: {type(msg)} {msg}") | |
| raise RuntimeError(f"Expected XComSequenceSliceResult, received: {type(msg)} {msg}") |
IMO TypeError isn't quite right here, as this is more of a "omg we got the wrong response back!" than a type error, which is usuall more "you called this with the wrong type"
| raise TypeError(f"Expected XComSequenceSliceResult, received: {type(msg)} {msg}") | ||
|
|
||
| if not msg.root: | ||
| return None |
Member
There was a problem hiding this comment.
Suggested change
| return None | |
| return [] |
I think?
…for xcom_pull and axcom_pull
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Related PR's
Related to:
#68214
#60268
#62922
Why
Async
@taskfunctions currently cannot efficiently pull or push XCom values because the existingxcom_pullandxcom_pushimplementations rely on synchronous communication with the task supervisor. Invoking these methods from an async task blocks the event loop, reducing the benefits of asynchronous execution.This change introduces asynchronous XCom operations, allowing async Python tasks to interact with XComs without blocking the event loop.
This capability is also an important building block for the upcoming Task Iteration feature (AIP-104). Task Iteration relies on the ability to persist intermediate progress and results during execution, which requires non-blocking XCom operations when used with async tasks. As such, this PR serves as preparatory work for AIP-104 while also providing immediate value by enabling fully asynchronous XCom interactions in existing async Python tasks.
In addition, this functionality lays the groundwork for future async operators. As more operators adopt native async implementations, they will be able to leverage asynchronous XCom operations without blocking the event loop, enabling more efficient and scalable async execution patterns throughout Airflow.
How
axcom_pull/axcom_pushtoRuntimeTaskInstance, awaiting the supervisor round-trip viaasendinstead of blocking.aget_one/aget_all/asettoBaseXComfor async XCom retrieval and storage.Was generative AI tooling used to co-author this PR?
Claude Opus 4.6
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.