Skip to content

feat(models): expose partition_key and partition_date on TaskInstance#68276

Open
bujjibabukatta wants to merge 1 commit into
apache:mainfrom
bujjibabukatta:fix/#68079
Open

feat(models): expose partition_key and partition_date on TaskInstance#68276
bujjibabukatta wants to merge 1 commit into
apache:mainfrom
bujjibabukatta:fix/#68079

Conversation

@bujjibabukatta

Copy link
Copy Markdown
Contributor

Add Asset partition attributes to TaskInstance
Fixes #68079, related to #68075.
What changed:
airflow-core/src/airflow/models/taskinstance.py — adds a partition_date property that parses partition_key (already proxied from dag_run) into a timezone-aware datetime, returning None for non-partitioned runs or unparseable keys.
airflow-core/src/airflow/utils/helpers.py — fixes log_filename_template_renderer in two ways:

Jinja branch now injects ts_nodash, ts, ts_nodash_with_tz, partition_key, and partition_date into the render context instead of returning a bare Template.render with no context.
f-string branch no longer crashes when logical_date is None (falls back to partition_date, then empty string).

Why: Asset-triggered DAGs using PartitionedAssetTimetable set logical_date=None, causing the scheduler to crash on ti.logical_date.isoformat() during log filename rendering. Users also had no way to reference partition attributes in templates at all.

@jroachgolf84 jroachgolf84 left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this is a good way to go about this. partition_date is a field that's already available as part of the DagRun model. We shouldn't need to impute it.

I had been playing around with this locally, and kinda came to the conclusion that I don't think it makes sense to add it. partition_key should always provide the info that is needed, we don't really need to add the partition_date.

For what it's worth, I used this DAG to test:

from airflow.sdk import (
    Asset,
    DAG,
    task,
    CronPartitionTimetable,
    PartitionedAssetTimetable,
)
from datetime import datetime

fix_issue_68079 = Asset(name="fix_issue_68079")

with DAG(
    dag_id="fix_issue_68079__upstream",
    start_date=datetime(2026, 1, 1),
    schedule=CronPartitionTimetable("15 * * * *", timezone="UTC"),
) as upstream_dag:

    @task(outlets=[fix_issue_68079])
    def transient():
        pass

    transient()


with DAG(
    dag_id="fix_issue_68079__downstream",
    start_date=datetime(2026, 1, 1),
    schedule=PartitionedAssetTimetable(assets=fix_issue_68079),
) as downstream_dag:

    @task
    def output_context(**context):
        print(f"context['ti']: {context['ti']}")
        print(f"context['dag_run']: {context['dag_run']}")
        print(f"context['partition_date']: {context['partition_date']}")
        print(f"context['partition_key']: {context['partition_key']}")

    output_context()

@jroachgolf84

Copy link
Copy Markdown
Collaborator

@bujjibabukatta - I closed the issue as "not planned".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add Asset partition attributes to TaskInstance

3 participants