Skip to content

[GSoC 2026] Kafka Streams runner — Redistribute translator + ExecutableStage type-agnostic edge#38843

Open
junaiddshaukat wants to merge 3 commits into
apache:feat/18479-kafka-streams-runner-skeletonfrom
junaiddshaukat:feat/ks-redistribute-and-typeagnostic
Open

[GSoC 2026] Kafka Streams runner — Redistribute translator + ExecutableStage type-agnostic edge#38843
junaiddshaukat wants to merge 3 commits into
apache:feat/18479-kafka-streams-runner-skeletonfrom
junaiddshaukat:feat/ks-redistribute-and-typeagnostic

Conversation

@junaiddshaukat

Copy link
Copy Markdown
Contributor

Summary

Direct follow-up to #38764, per Slack with @je-ik:

  1. Implement Redistribute.arbitrarily() as a runner-native passthrough so the fuser terminates stages at the Redistribute boundary, enabling chained-stage E2E tests without dragging GBK in.
  2. Drop the byte[] type bound on ExecutableStageProcessor / KStreamsPayload at the processor edge so non-byte[] outputs (e.g. Integer from MapElements) flow stage-to-stage without the cast lying about runtime type.
  3. Add a chained Impulse -> MapElements<Integer> -> Redistribute -> ParDo E2E test under the EMBEDDED Java SDK harness, asserting the Integer survives the runner round-trip via SharedTestCollector.

Wiring details

  • knownUrns() exposed; TrivialNativeTransformExpander.forKnownUrns(...) runs before GreedyPipelineFuser.fuse(...) so the Redistribute composite's sub-transforms (which include GBK, not yet supported) are stripped before fusion.
  • @AutoService(NativeTransforms.IsNativeTransform.class) registers Redistribute URN so QueryablePipeline treats the stripped composite as a primitive producer of its outputs. Mirrors Flink's IsFlinkNativeTransform.

Out of scope

  • Redistribute.byKey() URN — rehashing belongs with the GroupByKey sub-issue.
  • WatermarkManager — separate sub-issue.

Validation

./gradlew :runners:kafka-streams:check green locally, 15 tests pass (incl. new ChainedExecutableStageTest).

Closes #38840
Refs #18479
cc @je-ik

…tage edge

- RedistributeTranslator: registers beam:transform:redistribute_arbitrarily:v1
  as a runner-native passthrough — the output PCollection is mapped to the
  same processor that produces the input PCollection. No topology node is
  added; the single-instance topology has no actual redistribution to do.
  The keyed variant is intentionally not handled (rehashing semantics belong
  with the GroupByKey sub-issue).
- KafkaStreamsPipelineTranslator: add knownUrns(); run
  TrivialNativeTransformExpander.forKnownUrns(pipeline, knownUrns()) before
  GreedyPipelineFuser so runner-native composites survive into translation
  as leaves; register IsKafkaStreamsNativeTransform (@autoservice) so
  QueryablePipeline recognises the Redistribute composite as a producer of
  its outputs after the expander strips its sub-transforms. Pattern
  mirrors Flink's IsFlinkNativeTransform.
- ExecutableStageProcessor: drop the byte[] type bound on the processor /
  pendingOutputs / context / Record types. The runner does not need to
  know the runtime value type — the bundle factory handles all coder
  application at the Fn-API boundary using PCollection coders from
  ExecutableStagePayload (verified against Flink/Spark; per @je-ik on
  PR apache#38764). The earlier byte[] bound was a latent bug hidden by type
  erasure that only worked because Impulse output's value coder is
  ByteArrayCoder.
- ChainedExecutableStageTest: high-level Beam SDK pipeline
  Impulse -> MapElements<byte[], Integer> -> Redistribute.arbitrarily()
  -> ParDo<Integer, Void>(record). The Redistribute boundary terminates
  the upstream ExecutableStage so the Integer payload actually flows
  stage-to-stage through the runner, exercising the new type-agnostic
  edge. Side-effect verification via SharedTestCollector<Integer>.
@gemini-code-assist

Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request improves the Kafka Streams runner by enabling support for arbitrary data types across stage boundaries and introducing a native implementation for Redistribute.arbitrarily(). These changes allow for more complex pipeline structures and resolve previous limitations regarding type-specific casting, ensuring robust data flow between fused stages.

Highlights

  • Redistribute.arbitrarily() Implementation: Implemented a runner-native passthrough for Redistribute.arbitrarily() to allow the fuser to terminate stages at the boundary, facilitating chained-stage E2E testing.
  • Type-Agnostic Processor Edge: Removed the byte[] type bound on ExecutableStageProcessor and KStreamsPayload, allowing non-byte[] outputs to flow between stages without incorrect runtime type casting.
  • New E2E Test: Added a chained E2E test (Impulse -> MapElements -> Redistribute -> ParDo) to verify that non-byte[] data types correctly survive the runner round-trip.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request updates the Kafka Streams runner to support non-byte[] value types flowing across ExecutableStage boundaries by replacing explicit byte[] types with wildcards in KStreamsPayload and WindowedValue. It also introduces a runner-native RedistributeTranslator that acts as a passthrough for Redistribute.arbitrarily(), and registers it so that the pipeline fuser handles it correctly. A new end-to-end test is added to verify this behavior. Feedback on the changes suggests using poll() in a loop to drain the pendingOutputs queue more robustly, and adding a defensive null check for parentProcessor in RedistributeTranslator to prevent potential null pointer exceptions.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines 165 to 172
// The harness has finished the bundle (close() returned) so no further enqueues happen.
// ConcurrentLinkedQueue's weakly-consistent iterator is therefore safe to drain via forEach.
pendingOutputs.forEach(
output ->
ctx.forward(
new Record<byte[], KStreamsPayload<byte[]>>(
new Record<byte[], KStreamsPayload<?>>(
record.key(), KStreamsPayload.data(output), record.timestamp())));
pendingOutputs.clear();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Draining the queue using poll() in a loop is more idiomatic and robust than using forEach followed by clear(). It avoids relying on the weakly-consistent iterator of ConcurrentLinkedQueue and ensures elements are safely removed as they are processed, preventing any potential race conditions or memory retention issues if the queue is accessed concurrently.

Suggested change
// The harness has finished the bundle (close() returned) so no further enqueues happen.
// ConcurrentLinkedQueue's weakly-consistent iterator is therefore safe to drain via forEach.
pendingOutputs.forEach(
output ->
ctx.forward(
new Record<byte[], KStreamsPayload<byte[]>>(
new Record<byte[], KStreamsPayload<?>>(
record.key(), KStreamsPayload.data(output), record.timestamp())));
pendingOutputs.clear();
// The harness has finished the bundle (close() returned) so no further enqueues happen.
WindowedValue<?> output;
while ((output = pendingOutputs.poll()) != null) {
ctx.forward(
new Record<>(
record.key(), KStreamsPayload.data(output), record.timestamp()));
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switched to poll() loop in the latest commit

Comment on lines +48 to +49
String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId);
String outputPCollectionId = Iterables.getOnlyElement(transform.getOutputsMap().values());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Add a defensive null check for parentProcessor. If the input PCollection does not have a registered producer processor (e.g., due to an unexpected pipeline structure or translation order issue), this will fail fast with a clear error message instead of causing a downstream NullPointerException.

    String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId);
    if (parentProcessor == null) {
      throw new IllegalStateException(
          String.format(
              "No producer processor found for input PCollection %s of transform %s",
              inputPCollectionId, transformId));
    }
    String outputPCollectionId = Iterables.getOnlyElement(transform.getOutputsMap().values());

@je-ik je-ik self-requested a review June 8, 2026 11:23

@je-ik je-ik left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I'd only verify that there are actually really two ExecutableStages (not fused).

@github-actions

github-actions Bot commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @kennknowles added as fallback since no labels match configuration

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants