-
Notifications
You must be signed in to change notification settings - Fork 4.6k
[GSoC 2026] Kafka Streams runner — Redistribute translator + ExecutableStage type-agnostic edge #38843
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
junaiddshaukat
wants to merge
3
commits into
apache:feat/18479-kafka-streams-runner-skeleton
Choose a base branch
from
junaiddshaukat:feat/ks-redistribute-and-typeagnostic
base: feat/18479-kafka-streams-runner-skeleton
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
[GSoC 2026] Kafka Streams runner — Redistribute translator + ExecutableStage type-agnostic edge #38843
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
012a751
Add Redistribute (arbitrarily) translator + type-agnostic ExecutableS…
junaiddshaukat 6248cf1
Drain pendingOutputs via poll() loop instead of forEach + clear
junaiddshaukat 3aff2b9
Assert ChainedExecutableStageTest pipeline actually has two Executabl…
junaiddshaukat File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
54 changes: 54 additions & 0 deletions
54
...c/main/java/org/apache/beam/runners/kafka/streams/translation/RedistributeTranslator.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.runners.kafka.streams.translation; | ||
|
|
||
| import org.apache.beam.model.pipeline.v1.RunnerApi; | ||
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; | ||
|
|
||
| /** | ||
| * Runner-native translator for {@code beam:transform:redistribute_arbitrarily:v1}. | ||
| * | ||
| * <p>The default Beam expansion of {@code Redistribute.arbitrarily()} goes through {@code | ||
| * GroupByKey} for materialization. The Kafka Streams runner does not need to do any actual | ||
| * redistribution in the single-instance topology — Kafka Streams is already handling per-task | ||
| * processing — so we provide a passthrough: the output PCollection is mapped to the same processor | ||
| * that already produces the input PCollection. No topology node is added. | ||
| * | ||
| * <p>The translator is registered in {@link KafkaStreamsPipelineTranslator#knownUrns()} so {@link | ||
| * org.apache.beam.sdk.util.construction.graph.TrivialNativeTransformExpander} strips the | ||
| * sub-transforms of {@code Redistribute} before the fuser runs. That keeps the Redistribute | ||
| * boundary intact and lets the fuser split adjacent stages correctly. | ||
| * | ||
| * <p>The keyed variant ({@code redistribute_by_key:v1}) is intentionally not handled here: it | ||
| * implies a rehash/partition step that the runner can implement on top of GroupByKey when that | ||
| * lands, but cannot reasonably no-op the way the arbitrary variant can. | ||
| */ | ||
| class RedistributeTranslator implements PTransformTranslator { | ||
|
|
||
| @Override | ||
| public void translate( | ||
| String transformId, RunnerApi.Pipeline pipeline, KafkaStreamsTranslationContext context) { | ||
| RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(transformId); | ||
| String inputPCollectionId = Iterables.getOnlyElement(transform.getInputsMap().values()); | ||
| String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId); | ||
| String outputPCollectionId = Iterables.getOnlyElement(transform.getOutputsMap().values()); | ||
| // Passthrough: downstream lookups for the output PCollection resolve to the producer of the | ||
| // input PCollection. No KS Processor / state store / source is added. | ||
| context.registerPCollectionProducer(outputPCollectionId, parentProcessor); | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a defensive null check for
parentProcessor. If the input PCollection does not have a registered producer processor (e.g., due to an unexpected pipeline structure or translation order issue), this will fail fast with a clear error message instead of causing a downstreamNullPointerException.