Skip to content

Gene.bordegaray/2026/04/partition routed dynamic filters#21832

Open
gene-bordegaray wants to merge 3 commits intoapache:mainfrom
gene-bordegaray:gene.bordegaray/2026/04/partition_routed_dynamic_filters
Open

Gene.bordegaray/2026/04/partition routed dynamic filters#21832
gene-bordegaray wants to merge 3 commits intoapache:mainfrom
gene-bordegaray:gene.bordegaray/2026/04/partition_routed_dynamic_filters

Conversation

@gene-bordegaray
Copy link
Copy Markdown
Contributor

@gene-bordegaray gene-bordegaray commented Apr 24, 2026

Which issue does this PR close?

I have split it up into three commits to make it easier to review

Rationale for this change

What changes are included in this PR?

Extensively covered in #21207 via this comment: #21207 (comment)

Are these changes tested?

Yes:

  • Unit
  • Integration
  • SqlLogic

Are there any user-facing changes?

Yes:

datafusion_physical_plan::joins::DynamicFilterRoutingMode

Enum with Variants:

  • CaseHash
  • PartitionIndex

HashJoinExec

  • Public field: dynamic_filter_routing_mode: DynamicFilterRoutingMode

HashJoinExecBuilder

  • New builder method: with_dynamic_filter_routing_mode

DynamicFilterPhysicalExpr

  • New public methods: update_partitioned(), partition_filter()

Protos

  • DynamicFilterRoutingMode -> dynamic_filter_routing_mode = 11

@github-actions github-actions Bot added physical-expr Changes to the physical-expr crates optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) proto Related to proto crate datasource Changes to the datasource crate physical-plan Changes to the physical-plan crate labels Apr 24, 2026
@adriangb
Copy link
Copy Markdown
Contributor

I have not read the code in detail (1.6k LOC) but I wonder how this will work in a world where the partition index of an opener is much less clear. Morselized scans (#20529) mean that filters will get applied at the morsel stage. Morsels won't belong to partitions per se / will be shared across partitions. So it seems to me that this entire feature is incompatible with the direction in which DataFusion is headed, unless I'm missing something.

@gene-bordegaray
Copy link
Copy Markdown
Contributor Author

gene-bordegaray commented Apr 24, 2026

I have not read the code in detail (1.6k LOC) but I wonder how this will work in a world where the partition index of an opener is much less clear. Morselized scans (#20529) mean that filters will get applied at the morsel stage. Morsels won't belong to partitions per se / will be shared across partitions. So it seems to me that this entire feature is incompatible with the direction in which DataFusion is headed, unless I'm missing something.

The important thing to point out is that this only uses partition-index routing when the scan still has a constant partition identity, which is the preserved file partitioned case we are targetting here. The partitioned_by_file_group disables the work stealig thus should not collide with this use case.

If future morselized scans no longer have that identity for a morsel, they can fall back to the global dynamic filter, so this is still very valuable use case for partitioned data wee are seeing in production and should not be blocking the morselized driven work.

Let me know thoughts, maybe @alamb @NGA-TRAN @gabotechs have thoughts

mut self,
partition_batches: Vec<Vec<RecordBatch>>,
) -> Self {
self.partition_batches = partition_batches;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did this to eliminate review churn but can consolidate

@adriangb
Copy link
Copy Markdown
Contributor

I thought last call we had about this the conclusion was that the right way to go about this is to add native range partitioning support to DataFusion? I think that will be a big task but will be justifiable as benefiting a wider group of users and will ultimately land the codebase in a better place.

@gene-bordegaray
Copy link
Copy Markdown
Contributor Author

gene-bordegaray commented Apr 24, 2026

I thought last call we had about this the conclusion was that the right way to go about this is to add native range partitioning support to DataFusion? I think that will be a big task but will be justifiable as benefiting a wider group of users and will ultimately land the codebase in a better place.

We left off with this summary of the convo: #21207 (comment) and converged on an approach like this. Even with range partitioning, the need to route filters to a particular partition would be needed on partitioned data (without the RepartitonExec in the plan)

this work would support this if the new partitioning is ever needed.

@NGA-TRAN
Copy link
Copy Markdown
Contributor

@adriangb: Thanks for the proposal to add native range partitioning support to DataFusion. That’s one of the solutions Gene suggested. However, from our recent sync with @alamb and the ParadeDB team, Andrew recommended simply mapping the partition index of the build side and probe side — which is what this PR implements. The extra code is needed because we only apply this for range partitioning (not hash), and only when partition preservation is enabled and no repartitioning occurs beforehand, which aligns with the optimizer rules. This approach benefits all types of range partitions. Gene has a detailed analysis here: #21207 (comment)

@alamb: If the extended solution isn’t what you had in mind, what would you suggest? Should we move toward native range partitioning support as Adrian proposed, or is the current PR approach sufficient for now?

@gene-bordegaray
Copy link
Copy Markdown
Contributor Author

gene-bordegaray commented Apr 24, 2026

I would also want to highlight how important supporting partitioned data is for large, scalable systems.

We have this feature in use and have seen amazing results by eliminating repartitions with pre-partitioned data and pushing dynamic filters down to the correct partition of that unshuffled data:

Here are some metrics on this:

| Metric                | No Dyn Filter | With Dyn Filter | Reduction            |
|----------------------|--------------|-----------------|----------------------|
| Bytes per Worker     | ~14–16 GB    | ~75–170 MB      | ~100–130×            |
| Rows per Worker      | ~400–550M    | ~0.5–1.5K       | ~400,000–500,000×    |
| HashJoin Compute     | ~32–39s      | ~10–50ms        | ~1,000×              |
| Aggregation Compute  | ~1s          | ~10–50µs        | ~10,000–50,000×      |
| Leaf Fetch Time      | ~110–120s    | ~47–60s         | ~2×                  |

I am sure others that use partitioned data will appreciate such results and the contributors at Datadog plan to continue to strengthen DataFusion's support of pre-partitioned data 😄

@adriangb

@adriangb
Copy link
Copy Markdown
Contributor

@gene-bordegaray those are indeed amazing numbers!! I think part of the tension here is that this is not a minor feature, no matter how it’s implemented it seems (we’ve tried multiple ways). We (Pydantic) don’t have data that is laid out like this so it’s hard for me to understand the nuances of how this should all work and also justify the time to help get it all across the line. If there are other users of DataFusion other than DataDog that have similar systems or want to be able to have them bringing them to the table would be very helpful because they can help shape a solution that works for everyone and also donate review and development time.

@NGA-TRAN I actually discussed this with @alamb this morning. Maybe I am misunderstanding one or the both sides of the conversation, but I was trying to transmit the outcome of my discussion with him which was to approach this from the angle of adding general support for range partitioning.

@alamb
Copy link
Copy Markdown
Contributor

alamb commented Apr 24, 2026

@alamb: If the extended solution isn’t what you had in mind, what would you suggest? Should we move toward native range partitioning support as Adrian proposed, or is the current PR approach sufficient for now?

I am not quite sure what you are referring to here. .I am checking this PR and the discussions out now

@NGA-TRAN
Copy link
Copy Markdown
Contributor

NGA-TRAN commented Apr 24, 2026

@adriangb @alamb @gene-bordegaray: Let me know if this approach is feasible:

  1. Add a property on each join (boolean or integer) plus a global‑planning property indicating whether the build and probe sides share identical partitioning.
  2. At the start of planning, the global property defaults to false and is set to true only when both data sources are already repartitioned and partition‑preservation is enabled (the Datadog use case).
  3. When a join is created, its property is set from the global‑planning property.
  4. When a hash repartition is introduced, the global‑planning property is set to false.
  5. During join execution, if the property is true, map dynamic filters by partition index; otherwise, use today’s behavior.

I call it global-planning property but it is only a property of a plan so it can have a better name.

My assumption is that this would be simple and require minimal code changes.

Or the join property is always false. We add an optimization rule and traverse the plan bottom-up to set that property to true in the same fashion above

@alamb
Copy link
Copy Markdown
Contributor

alamb commented Apr 24, 2026

@adriangb @alamb @gene-bordegaray: Let me know if this approach is feasible:

It might be feasible, but I don't think it is general (aka would not be used by other users of DataFusion) and makes is more complicated for a single usecase. Thus I am not sure we should add it to this repo

If you want to proveed with this approach, I think you could add it as a user defined PhysicalOptimizer pass to fix up the plan / DynamicFilter for your uase case.

I also wrote up a summary of my thoughts here, including another possible alternate workaround

Let me know what you think

@alamb
Copy link
Copy Markdown
Contributor

alamb commented Apr 24, 2026

@alamb: If the extended solution isn’t what you had in mind, what would you suggest? Should we move toward native range partitioning support as Adrian proposed, or is the current PR approach sufficient for now?

I recommend a 2 phase strategy:

  1. Find some workaround (outside the DataFusion core) to unblock yourself
  2. Work on adding real range partitioning to the core

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate datasource Changes to the datasource crate optimizer Optimizer rules physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate proto Related to proto crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

support dynamic filtering on partitioned data from file source

4 participants