[GSoC 2026] Kafka Streams runner — Redistribute translator + ExecutableStage type-agnostic edge#38843
Conversation
…tage edge - RedistributeTranslator: registers beam:transform:redistribute_arbitrarily:v1 as a runner-native passthrough — the output PCollection is mapped to the same processor that produces the input PCollection. No topology node is added; the single-instance topology has no actual redistribution to do. The keyed variant is intentionally not handled (rehashing semantics belong with the GroupByKey sub-issue). - KafkaStreamsPipelineTranslator: add knownUrns(); run TrivialNativeTransformExpander.forKnownUrns(pipeline, knownUrns()) before GreedyPipelineFuser so runner-native composites survive into translation as leaves; register IsKafkaStreamsNativeTransform (@autoservice) so QueryablePipeline recognises the Redistribute composite as a producer of its outputs after the expander strips its sub-transforms. Pattern mirrors Flink's IsFlinkNativeTransform. - ExecutableStageProcessor: drop the byte[] type bound on the processor / pendingOutputs / context / Record types. The runner does not need to know the runtime value type — the bundle factory handles all coder application at the Fn-API boundary using PCollection coders from ExecutableStagePayload (verified against Flink/Spark; per @je-ik on PR apache#38764). The earlier byte[] bound was a latent bug hidden by type erasure that only worked because Impulse output's value coder is ByteArrayCoder. - ChainedExecutableStageTest: high-level Beam SDK pipeline Impulse -> MapElements<byte[], Integer> -> Redistribute.arbitrarily() -> ParDo<Integer, Void>(record). The Redistribute boundary terminates the upstream ExecutableStage so the Integer payload actually flows stage-to-stage through the runner, exercising the new type-agnostic edge. Side-effect verification via SharedTestCollector<Integer>.
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request improves the Kafka Streams runner by enabling support for arbitrary data types across stage boundaries and introducing a native implementation for Redistribute.arbitrarily(). These changes allow for more complex pipeline structures and resolve previous limitations regarding type-specific casting, ensuring robust data flow between fused stages. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request updates the Kafka Streams runner to support non-byte[] value types flowing across ExecutableStage boundaries by replacing explicit byte[] types with wildcards in KStreamsPayload and WindowedValue. It also introduces a runner-native RedistributeTranslator that acts as a passthrough for Redistribute.arbitrarily(), and registers it so that the pipeline fuser handles it correctly. A new end-to-end test is added to verify this behavior. Feedback on the changes suggests using poll() in a loop to drain the pendingOutputs queue more robustly, and adding a defensive null check for parentProcessor in RedistributeTranslator to prevent potential null pointer exceptions.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| // The harness has finished the bundle (close() returned) so no further enqueues happen. | ||
| // ConcurrentLinkedQueue's weakly-consistent iterator is therefore safe to drain via forEach. | ||
| pendingOutputs.forEach( | ||
| output -> | ||
| ctx.forward( | ||
| new Record<byte[], KStreamsPayload<byte[]>>( | ||
| new Record<byte[], KStreamsPayload<?>>( | ||
| record.key(), KStreamsPayload.data(output), record.timestamp()))); | ||
| pendingOutputs.clear(); |
There was a problem hiding this comment.
Draining the queue using poll() in a loop is more idiomatic and robust than using forEach followed by clear(). It avoids relying on the weakly-consistent iterator of ConcurrentLinkedQueue and ensures elements are safely removed as they are processed, preventing any potential race conditions or memory retention issues if the queue is accessed concurrently.
| // The harness has finished the bundle (close() returned) so no further enqueues happen. | |
| // ConcurrentLinkedQueue's weakly-consistent iterator is therefore safe to drain via forEach. | |
| pendingOutputs.forEach( | |
| output -> | |
| ctx.forward( | |
| new Record<byte[], KStreamsPayload<byte[]>>( | |
| new Record<byte[], KStreamsPayload<?>>( | |
| record.key(), KStreamsPayload.data(output), record.timestamp()))); | |
| pendingOutputs.clear(); | |
| // The harness has finished the bundle (close() returned) so no further enqueues happen. | |
| WindowedValue<?> output; | |
| while ((output = pendingOutputs.poll()) != null) { | |
| ctx.forward( | |
| new Record<>( | |
| record.key(), KStreamsPayload.data(output), record.timestamp())); | |
| } |
There was a problem hiding this comment.
switched to poll() loop in the latest commit
| String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId); | ||
| String outputPCollectionId = Iterables.getOnlyElement(transform.getOutputsMap().values()); |
There was a problem hiding this comment.
Add a defensive null check for parentProcessor. If the input PCollection does not have a registered producer processor (e.g., due to an unexpected pipeline structure or translation order issue), this will fail fast with a clear error message instead of causing a downstream NullPointerException.
String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId);
if (parentProcessor == null) {
throw new IllegalStateException(
String.format(
"No producer processor found for input PCollection %s of transform %s",
inputPCollectionId, transformId));
}
String outputPCollectionId = Iterables.getOnlyElement(transform.getOutputsMap().values());
je-ik
left a comment
There was a problem hiding this comment.
Nice, I'd only verify that there are actually really two ExecutableStages (not fused).
|
Assigning reviewers: R: @kennknowles added as fallback since no labels match configuration Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Summary
Direct follow-up to #38764, per Slack with @je-ik:
Redistribute.arbitrarily()as a runner-native passthrough so the fuser terminates stages at the Redistribute boundary, enabling chained-stage E2E tests without dragging GBK in.byte[]type bound onExecutableStageProcessor/KStreamsPayloadat the processor edge so non-byte[] outputs (e.g.IntegerfromMapElements) flow stage-to-stage without the cast lying about runtime type.Impulse -> MapElements<Integer> -> Redistribute -> ParDoE2E test under the EMBEDDED Java SDK harness, asserting the Integer survives the runner round-trip viaSharedTestCollector.Wiring details
knownUrns()exposed;TrivialNativeTransformExpander.forKnownUrns(...)runs beforeGreedyPipelineFuser.fuse(...)so the Redistribute composite's sub-transforms (which include GBK, not yet supported) are stripped before fusion.@AutoService(NativeTransforms.IsNativeTransform.class)registers Redistribute URN soQueryablePipelinetreats the stripped composite as a primitive producer of its outputs. Mirrors Flink'sIsFlinkNativeTransform.Out of scope
Redistribute.byKey()URN — rehashing belongs with the GroupByKey sub-issue.Validation
./gradlew :runners:kafka-streams:checkgreen locally, 15 tests pass (incl. newChainedExecutableStageTest).Closes #38840
Refs #18479
cc @je-ik