diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java index 7417088bb672..b8eb5413b27e 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java @@ -59,7 +59,7 @@ * receivers. */ class ExecutableStageProcessor - implements Processor, byte[], KStreamsPayload> { + implements Processor, byte[], KStreamsPayload> { private static final Logger LOG = LoggerFactory.getLogger(ExecutableStageProcessor.class); @@ -68,9 +68,13 @@ class ExecutableStageProcessor // pendingOutputs is enqueued by SDK harness threads (inside the OutputReceiverFactory callback) // and drained by the Kafka Streams processing thread on bundle close; needs to be thread-safe. - private final Queue> pendingOutputs = new ConcurrentLinkedQueue<>(); + // The element type is intentionally wildcarded: the runner does not need to know the runtime + // value type — the bundle factory handles all coder application at the Fn-API boundary using + // the PCollection coders from the ExecutableStagePayload. Pretending the type was byte[] was + // only safe because the Impulse output coder happens to be ByteArrayCoder. + private final Queue> pendingOutputs = new ConcurrentLinkedQueue<>(); - private @Nullable ProcessorContext> context; + private @Nullable ProcessorContext> context; private @Nullable ExecutableStageContext stageContext; private @Nullable StageBundleFactory stageBundleFactory; private @Nullable RemoteBundle currentBundle; @@ -81,7 +85,7 @@ class ExecutableStageProcessor } @Override - public void init(ProcessorContext> context) { + public void init(ProcessorContext> context) { this.context = context; ExecutableStage executableStage = ExecutableStage.fromPayload(stagePayload); this.stageContext = KafkaStreamsExecutableStageContextFactory.getInstance().get(jobInfo); @@ -89,8 +93,8 @@ public void init(ProcessorContext> context) { } @Override - public void process(Record> record) { - KStreamsPayload payload = record.value(); + public void process(Record> record) { + KStreamsPayload payload = record.value(); if (payload.isWatermark()) { // NOTE: flushing the bundle on every received watermark is provisional. Once the // WatermarkManager lands, a stage will receive watermarks from multiple parent instances and @@ -122,7 +126,7 @@ public FnDataReceiver create(String pCollectionId) { // after the bundle closes. return receivedElement -> { if (receivedElement != null) { - pendingOutputs.add((WindowedValue) receivedElement); + pendingOutputs.add((WindowedValue) receivedElement); } }; } @@ -143,7 +147,7 @@ private FnDataReceiver> mainInputReceiver() { return receiver; } - private void closeBundleAndFlush(Record> record) { + private void closeBundleAndFlush(Record> record) { RemoteBundle bundle = currentBundle; if (bundle == null) { return; @@ -157,26 +161,25 @@ private void closeBundleAndFlush(Record> record) } finally { currentBundle = null; } - ProcessorContext> ctx = checkInitialized(context); + ProcessorContext> ctx = checkInitialized(context); // The harness has finished the bundle (close() returned) so no further enqueues happen. - // ConcurrentLinkedQueue's weakly-consistent iterator is therefore safe to drain via forEach. - pendingOutputs.forEach( - output -> - ctx.forward( - new Record>( - record.key(), KStreamsPayload.data(output), record.timestamp()))); - pendingOutputs.clear(); + // Drain via poll() so each element is removed as it is forwarded. + WindowedValue output; + while ((output = pendingOutputs.poll()) != null) { + ctx.forward( + new Record>( + record.key(), KStreamsPayload.data(output), record.timestamp())); + } } - private void forwardWatermark( - Record> record, long watermarkMillis) { + private void forwardWatermark(Record> record, long watermarkMillis) { // TODO(#38743 / WatermarkManager): a watermark must reach every parallel instance of every // downstream processor, but ctx.forward routes to one downstream partition per Kafka Streams' // partitioning. The simplest correct approach is to fan the watermark out to all downstream // partitions; that wiring lands with the WatermarkManager sub-issue (per Jan on PR #38764). - ProcessorContext> ctx = checkInitialized(context); + ProcessorContext> ctx = checkInitialized(context); ctx.forward( - new Record>( + new Record>( record.key(), KStreamsPayload.watermark(watermarkMillis), record.timestamp())); } diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java index 5042f5426169..4e227749e1a3 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java @@ -17,16 +17,21 @@ */ package org.apache.beam.runners.kafka.streams.translation; +import com.google.auto.service.AutoService; import java.util.Map; +import java.util.Set; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions; +import org.apache.beam.sdk.util.construction.NativeTransforms; import org.apache.beam.sdk.util.construction.PTransformTranslation; import org.apache.beam.sdk.util.construction.graph.ExecutableStage; import org.apache.beam.sdk.util.construction.graph.GreedyPipelineFuser; import org.apache.beam.sdk.util.construction.graph.PipelineNode; import org.apache.beam.sdk.util.construction.graph.QueryablePipeline; +import org.apache.beam.sdk.util.construction.graph.TrivialNativeTransformExpander; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; /** * Translates a portable Beam pipeline into a Kafka Streams {@link @@ -45,6 +50,7 @@ public KafkaStreamsPipelineTranslator() { this( ImmutableMap.builder() .put(PTransformTranslation.IMPULSE_TRANSFORM_URN, new ImpulseTranslator()) + .put(PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN, new RedistributeTranslator()) .put(ExecutableStage.URN, new ExecutableStageTranslator()) .build()); } @@ -59,11 +65,28 @@ public KafkaStreamsTranslationContext createTranslationContext( } /** - * Fuses the pipeline so that stateless user code is grouped into {@code ExecutableStage} nodes. + * Returns the set of URNs this translator handles natively. {@link + * TrivialNativeTransformExpander} uses this set to strip the sub-transforms of runner-native + * composites (e.g. {@code Redistribute.arbitrarily}) before fusion, so they survive into + * translation as leaves instead of being expanded into primitives the runner does not implement + * yet (e.g. GroupByKey). + */ + public Set knownUrns() { + return urnToTranslator.keySet(); + } + + /** + * Prepares the pipeline for translation: + * + *
    + *
  1. Trim sub-transforms of runner-native composites listed in {@link #knownUrns()} so the + * fuser leaves them as primitives. + *
  2. Fuse remaining stateless user code into {@code ExecutableStage} nodes via {@link + * GreedyPipelineFuser}. + *
* - *

Runner-executed primitives that have their own translator (e.g. Impulse) are left intact; - * everything else is fused. If the pipeline already contains {@code ExecutableStage} transforms - * it is returned unchanged. + *

If the pipeline already contains {@code ExecutableStage} transforms it is returned + * unchanged. */ public RunnerApi.Pipeline prepareForTranslation(RunnerApi.Pipeline pipeline) { boolean alreadyFused = @@ -72,7 +95,8 @@ public RunnerApi.Pipeline prepareForTranslation(RunnerApi.Pipeline pipeline) { if (alreadyFused) { return pipeline; } - return GreedyPipelineFuser.fuse(pipeline).toPipeline(); + RunnerApi.Pipeline trimmed = TrivialNativeTransformExpander.forKnownUrns(pipeline, knownUrns()); + return GreedyPipelineFuser.fuse(trimmed).toPipeline(); } /** @@ -99,4 +123,23 @@ public void translate(KafkaStreamsTranslationContext context, RunnerApi.Pipeline translator.translate(node.getId(), pipeline, context); } } + + /** + * Tells the SDK that URNs handled directly by the Kafka Streams runner should be treated as + * primitives by {@link QueryablePipeline}. Mirrors Flink's {@code IsFlinkNativeTransform} + * pattern. Without this, {@link TrivialNativeTransformExpander} strips a composite's + * sub-transforms but {@link QueryablePipeline} still does not recognise the composite itself as a + * producer of its outputs, and pipeline validation fails with "consumed but never produced". + */ + @AutoService(NativeTransforms.IsNativeTransform.class) + public static class IsKafkaStreamsNativeTransform implements NativeTransforms.IsNativeTransform { + private static final Set URNS = + ImmutableSet.of(PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN); + + @Override + public boolean test(RunnerApi.PTransform pTransform) { + String urn = PTransformTranslation.urnForTransformOrNull(pTransform); + return urn != null && URNS.contains(urn); + } + } } diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/RedistributeTranslator.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/RedistributeTranslator.java new file mode 100644 index 000000000000..72c47db8d4b1 --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/RedistributeTranslator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; + +/** + * Runner-native translator for {@code beam:transform:redistribute_arbitrarily:v1}. + * + *

The default Beam expansion of {@code Redistribute.arbitrarily()} goes through {@code + * GroupByKey} for materialization. The Kafka Streams runner does not need to do any actual + * redistribution in the single-instance topology — Kafka Streams is already handling per-task + * processing — so we provide a passthrough: the output PCollection is mapped to the same processor + * that already produces the input PCollection. No topology node is added. + * + *

The translator is registered in {@link KafkaStreamsPipelineTranslator#knownUrns()} so {@link + * org.apache.beam.sdk.util.construction.graph.TrivialNativeTransformExpander} strips the + * sub-transforms of {@code Redistribute} before the fuser runs. That keeps the Redistribute + * boundary intact and lets the fuser split adjacent stages correctly. + * + *

The keyed variant ({@code redistribute_by_key:v1}) is intentionally not handled here: it + * implies a rehash/partition step that the runner can implement on top of GroupByKey when that + * lands, but cannot reasonably no-op the way the arbitrary variant can. + */ +class RedistributeTranslator implements PTransformTranslator { + + @Override + public void translate( + String transformId, RunnerApi.Pipeline pipeline, KafkaStreamsTranslationContext context) { + RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(transformId); + String inputPCollectionId = Iterables.getOnlyElement(transform.getInputsMap().values()); + String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId); + String outputPCollectionId = Iterables.getOnlyElement(transform.getOutputsMap().values()); + // Passthrough: downstream lookups for the output PCollection resolve to the producer of the + // input PCollection. No KS Processor / state store / source is added. + context.registerPCollectionProducer(outputPCollectionId, parentProcessor); + } +} diff --git a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ChainedExecutableStageTest.java b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ChainedExecutableStageTest.java new file mode 100644 index 000000000000..8ac2175dc391 --- /dev/null +++ b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ChainedExecutableStageTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.PortablePipelineOptions; +import org.apache.beam.sdk.testing.CrashingRunner; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Redistribute; +import org.apache.beam.sdk.util.construction.Environments; +import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; +import org.apache.beam.sdk.util.construction.PipelineTranslation; +import org.apache.beam.sdk.util.construction.graph.ExecutableStage; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.junit.Test; + +/** + * End-to-end test that exercises non-{@code byte[]} value types flowing across an {@code + * ExecutableStage} boundary. + * + *

Builds a pipeline {@code Impulse -> MapElements -> Redistribute.arbitrarily() + * -> ParDo(record into SharedTestCollector)}. The Redistribute boundary terminates + * the upstream {@code ExecutableStage} (because the runner registers Redistribute as a + * runner-native transform in {@link KafkaStreamsPipelineTranslator#knownUrns()}), so the + * Integer-emitting stage and the recording stage live in separate stages and the Integer payload + * actually flows stage-to-stage through the runner — not just through user code inside a single + * fused stage. That is what proves the {@code byte[]}-erasure cast we used to do was a latent bug, + * and that the type-agnostic {@code KStreamsPayload} edge handles it correctly. + */ +public class ChainedExecutableStageTest { + + private static final String JOB_ID = "ks-chained-stage-test"; + private static final String APPLICATION_ID = "ks-chained-stage-test"; + private static final int EXPECTED_VALUE = 42; + + /** + * Records the integer payload it sees so the test can verify it survived the harness round-trip + * across the Redistribute boundary. + */ + private static class RecordingFn extends DoFn { + private final SharedTestCollector collector; + + RecordingFn(SharedTestCollector collector) { + this.collector = collector; + } + + @ProcessElement + public void processElement(@Element Integer input) { + collector.record(input); + } + } + + @Test + public void chainedStagesPropagateIntegerValueAcrossRedistributeBoundary() throws Exception { + try (SharedTestCollector collector = SharedTestCollector.create()) { + Pipeline pipeline = Pipeline.create(pipelineOptions()); + pipeline + .apply("impulse", Impulse.create()) + .apply( + "to-integer", + MapElements.into(TypeDescriptors.integers()).via((byte[] ignored) -> EXPECTED_VALUE)) + .setTypeDescriptor(TypeDescriptor.of(Integer.class)) + .apply("redistribute", Redistribute.arbitrarily()) + .apply("record", ParDo.of(new RecordingFn(collector))); + + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline); + + KafkaStreamsPipelineOptions options = + pipeline.getOptions().as(KafkaStreamsPipelineOptions.class); + KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator(); + JobInfo jobInfo = + JobInfo.create( + JOB_ID, options.getJobName(), "", PipelineOptionsTranslation.toProto(options)); + KafkaStreamsTranslationContext context = + translator.createTranslationContext(jobInfo, options); + + RunnerApi.Pipeline preparedPipeline = translator.prepareForTranslation(pipelineProto); + + // Verify the Redistribute boundary actually split the user code into two separate stages. + // Without this, the test could pass even if the fuser collapsed MapElements and the + // RecordingFn into one stage — in which case the Integer payload would flow through user + // code inside a single stage and never cross the runner-side ExecutableStage edge that + // this test exists to exercise. + long executableStageCount = + preparedPipeline.getComponents().getTransformsMap().values().stream() + .filter(t -> ExecutableStage.URN.equals(t.getSpec().getUrn())) + .count(); + assertThat( + "expected two ExecutableStages (upstream MapElements + downstream RecordingFn ParDo) " + + "separated by the Redistribute boundary", + executableStageCount, + is(2L)); + + translator.translate(context, preparedPipeline); + + Topology topology = context.getTopology(); + try (TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig())) { + driver.advanceWallClockTime(Duration.ofSeconds(1)); + driver.advanceWallClockTime(Duration.ofSeconds(1)); + } + + List recorded = collector.recorded(); + assertThat(recorded.size(), is(1)); + assertThat(recorded.get(0), is(EXPECTED_VALUE)); + } + } + + private static PipelineOptions pipelineOptions() { + PipelineOptions options = + PipelineOptionsFactory.fromArgs("--applicationId=" + APPLICATION_ID).create(); + options.setRunner(CrashingRunner.class); + options.as(KafkaStreamsPipelineOptions.class).setApplicationId(APPLICATION_ID); + options + .as(PortablePipelineOptions.class) + .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED); + return options; + } + + private static Properties streamsConfig() { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put( + StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); + props.put( + StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); + return props; + } +}