Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
* receivers.
*/
class ExecutableStageProcessor
implements Processor<byte[], KStreamsPayload<byte[]>, byte[], KStreamsPayload<byte[]>> {
implements Processor<byte[], KStreamsPayload<?>, byte[], KStreamsPayload<?>> {

private static final Logger LOG = LoggerFactory.getLogger(ExecutableStageProcessor.class);

Expand All @@ -68,9 +68,13 @@ class ExecutableStageProcessor

// pendingOutputs is enqueued by SDK harness threads (inside the OutputReceiverFactory callback)
// and drained by the Kafka Streams processing thread on bundle close; needs to be thread-safe.
private final Queue<WindowedValue<byte[]>> pendingOutputs = new ConcurrentLinkedQueue<>();
// The element type is intentionally wildcarded: the runner does not need to know the runtime
// value type — the bundle factory handles all coder application at the Fn-API boundary using
// the PCollection coders from the ExecutableStagePayload. Pretending the type was byte[] was
// only safe because the Impulse output coder happens to be ByteArrayCoder.
private final Queue<WindowedValue<?>> pendingOutputs = new ConcurrentLinkedQueue<>();

private @Nullable ProcessorContext<byte[], KStreamsPayload<byte[]>> context;
private @Nullable ProcessorContext<byte[], KStreamsPayload<?>> context;
private @Nullable ExecutableStageContext stageContext;
private @Nullable StageBundleFactory stageBundleFactory;
private @Nullable RemoteBundle currentBundle;
Expand All @@ -81,16 +85,16 @@ class ExecutableStageProcessor
}

@Override
public void init(ProcessorContext<byte[], KStreamsPayload<byte[]>> context) {
public void init(ProcessorContext<byte[], KStreamsPayload<?>> context) {
this.context = context;
ExecutableStage executableStage = ExecutableStage.fromPayload(stagePayload);
this.stageContext = KafkaStreamsExecutableStageContextFactory.getInstance().get(jobInfo);
this.stageBundleFactory = stageContext.getStageBundleFactory(executableStage);
}

@Override
public void process(Record<byte[], KStreamsPayload<byte[]>> record) {
KStreamsPayload<byte[]> payload = record.value();
public void process(Record<byte[], KStreamsPayload<?>> record) {
KStreamsPayload<?> payload = record.value();
if (payload.isWatermark()) {
// NOTE: flushing the bundle on every received watermark is provisional. Once the
// WatermarkManager lands, a stage will receive watermarks from multiple parent instances and
Expand Down Expand Up @@ -122,7 +126,7 @@ public <OutputT> FnDataReceiver<OutputT> create(String pCollectionId) {
// after the bundle closes.
return receivedElement -> {
if (receivedElement != null) {
pendingOutputs.add((WindowedValue<byte[]>) receivedElement);
pendingOutputs.add((WindowedValue<?>) receivedElement);
}
};
}
Expand All @@ -143,7 +147,7 @@ private FnDataReceiver<WindowedValue<?>> mainInputReceiver() {
return receiver;
}

private void closeBundleAndFlush(Record<byte[], KStreamsPayload<byte[]>> record) {
private void closeBundleAndFlush(Record<byte[], KStreamsPayload<?>> record) {
RemoteBundle bundle = currentBundle;
if (bundle == null) {
return;
Expand All @@ -157,26 +161,25 @@ private void closeBundleAndFlush(Record<byte[], KStreamsPayload<byte[]>> record)
} finally {
currentBundle = null;
}
ProcessorContext<byte[], KStreamsPayload<byte[]>> ctx = checkInitialized(context);
ProcessorContext<byte[], KStreamsPayload<?>> ctx = checkInitialized(context);
// The harness has finished the bundle (close() returned) so no further enqueues happen.
// ConcurrentLinkedQueue's weakly-consistent iterator is therefore safe to drain via forEach.
pendingOutputs.forEach(
output ->
ctx.forward(
new Record<byte[], KStreamsPayload<byte[]>>(
record.key(), KStreamsPayload.data(output), record.timestamp())));
pendingOutputs.clear();
// Drain via poll() so each element is removed as it is forwarded.
WindowedValue<?> output;
while ((output = pendingOutputs.poll()) != null) {
ctx.forward(
new Record<byte[], KStreamsPayload<?>>(
record.key(), KStreamsPayload.data(output), record.timestamp()));
}
}

private void forwardWatermark(
Record<byte[], KStreamsPayload<byte[]>> record, long watermarkMillis) {
private void forwardWatermark(Record<byte[], KStreamsPayload<?>> record, long watermarkMillis) {
// TODO(#38743 / WatermarkManager): a watermark must reach every parallel instance of every
// downstream processor, but ctx.forward routes to one downstream partition per Kafka Streams'
// partitioning. The simplest correct approach is to fan the watermark out to all downstream
// partitions; that wiring lands with the WatermarkManager sub-issue (per Jan on PR #38764).
ProcessorContext<byte[], KStreamsPayload<byte[]>> ctx = checkInitialized(context);
ProcessorContext<byte[], KStreamsPayload<?>> ctx = checkInitialized(context);
ctx.forward(
new Record<byte[], KStreamsPayload<byte[]>>(
new Record<byte[], KStreamsPayload<?>>(
record.key(), KStreamsPayload.watermark(watermarkMillis), record.timestamp()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@
*/
package org.apache.beam.runners.kafka.streams.translation;

import com.google.auto.service.AutoService;
import java.util.Map;
import java.util.Set;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions;
import org.apache.beam.sdk.util.construction.NativeTransforms;
import org.apache.beam.sdk.util.construction.PTransformTranslation;
import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
import org.apache.beam.sdk.util.construction.graph.GreedyPipelineFuser;
import org.apache.beam.sdk.util.construction.graph.PipelineNode;
import org.apache.beam.sdk.util.construction.graph.QueryablePipeline;
import org.apache.beam.sdk.util.construction.graph.TrivialNativeTransformExpander;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;

/**
* Translates a portable Beam pipeline into a Kafka Streams {@link
Expand All @@ -45,6 +50,7 @@ public KafkaStreamsPipelineTranslator() {
this(
ImmutableMap.<String, PTransformTranslator>builder()
.put(PTransformTranslation.IMPULSE_TRANSFORM_URN, new ImpulseTranslator())
.put(PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN, new RedistributeTranslator())
.put(ExecutableStage.URN, new ExecutableStageTranslator())
.build());
}
Expand All @@ -59,11 +65,28 @@ public KafkaStreamsTranslationContext createTranslationContext(
}

/**
* Fuses the pipeline so that stateless user code is grouped into {@code ExecutableStage} nodes.
* Returns the set of URNs this translator handles natively. {@link
* TrivialNativeTransformExpander} uses this set to strip the sub-transforms of runner-native
* composites (e.g. {@code Redistribute.arbitrarily}) before fusion, so they survive into
* translation as leaves instead of being expanded into primitives the runner does not implement
* yet (e.g. GroupByKey).
*/
public Set<String> knownUrns() {
return urnToTranslator.keySet();
}

/**
* Prepares the pipeline for translation:
*
* <ol>
* <li>Trim sub-transforms of runner-native composites listed in {@link #knownUrns()} so the
* fuser leaves them as primitives.
* <li>Fuse remaining stateless user code into {@code ExecutableStage} nodes via {@link
* GreedyPipelineFuser}.
* </ol>
*
* <p>Runner-executed primitives that have their own translator (e.g. Impulse) are left intact;
* everything else is fused. If the pipeline already contains {@code ExecutableStage} transforms
* it is returned unchanged.
* <p>If the pipeline already contains {@code ExecutableStage} transforms it is returned
* unchanged.
*/
public RunnerApi.Pipeline prepareForTranslation(RunnerApi.Pipeline pipeline) {
boolean alreadyFused =
Expand All @@ -72,7 +95,8 @@ public RunnerApi.Pipeline prepareForTranslation(RunnerApi.Pipeline pipeline) {
if (alreadyFused) {
return pipeline;
}
return GreedyPipelineFuser.fuse(pipeline).toPipeline();
RunnerApi.Pipeline trimmed = TrivialNativeTransformExpander.forKnownUrns(pipeline, knownUrns());
return GreedyPipelineFuser.fuse(trimmed).toPipeline();
}

/**
Expand All @@ -99,4 +123,23 @@ public void translate(KafkaStreamsTranslationContext context, RunnerApi.Pipeline
translator.translate(node.getId(), pipeline, context);
}
}

/**
* Tells the SDK that URNs handled directly by the Kafka Streams runner should be treated as
* primitives by {@link QueryablePipeline}. Mirrors Flink's {@code IsFlinkNativeTransform}
* pattern. Without this, {@link TrivialNativeTransformExpander} strips a composite's
* sub-transforms but {@link QueryablePipeline} still does not recognise the composite itself as a
* producer of its outputs, and pipeline validation fails with "consumed but never produced".
*/
@AutoService(NativeTransforms.IsNativeTransform.class)
public static class IsKafkaStreamsNativeTransform implements NativeTransforms.IsNativeTransform {
private static final Set<String> URNS =
ImmutableSet.of(PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN);

@Override
public boolean test(RunnerApi.PTransform pTransform) {
String urn = PTransformTranslation.urnForTransformOrNull(pTransform);
return urn != null && URNS.contains(urn);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.kafka.streams.translation;

import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;

/**
* Runner-native translator for {@code beam:transform:redistribute_arbitrarily:v1}.
*
* <p>The default Beam expansion of {@code Redistribute.arbitrarily()} goes through {@code
* GroupByKey} for materialization. The Kafka Streams runner does not need to do any actual
* redistribution in the single-instance topology — Kafka Streams is already handling per-task
* processing — so we provide a passthrough: the output PCollection is mapped to the same processor
* that already produces the input PCollection. No topology node is added.
*
* <p>The translator is registered in {@link KafkaStreamsPipelineTranslator#knownUrns()} so {@link
* org.apache.beam.sdk.util.construction.graph.TrivialNativeTransformExpander} strips the
* sub-transforms of {@code Redistribute} before the fuser runs. That keeps the Redistribute
* boundary intact and lets the fuser split adjacent stages correctly.
*
* <p>The keyed variant ({@code redistribute_by_key:v1}) is intentionally not handled here: it
* implies a rehash/partition step that the runner can implement on top of GroupByKey when that
* lands, but cannot reasonably no-op the way the arbitrary variant can.
*/
class RedistributeTranslator implements PTransformTranslator {

@Override
public void translate(
String transformId, RunnerApi.Pipeline pipeline, KafkaStreamsTranslationContext context) {
RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(transformId);
String inputPCollectionId = Iterables.getOnlyElement(transform.getInputsMap().values());
String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId);
String outputPCollectionId = Iterables.getOnlyElement(transform.getOutputsMap().values());
Comment on lines +48 to +49

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Add a defensive null check for parentProcessor. If the input PCollection does not have a registered producer processor (e.g., due to an unexpected pipeline structure or translation order issue), this will fail fast with a clear error message instead of causing a downstream NullPointerException.

    String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId);
    if (parentProcessor == null) {
      throw new IllegalStateException(
          String.format(
              "No producer processor found for input PCollection %s of transform %s",
              inputPCollectionId, transformId));
    }
    String outputPCollectionId = Iterables.getOnlyElement(transform.getOutputsMap().values());

// Passthrough: downstream lookups for the output PCollection resolve to the producer of the
// input PCollection. No KS Processor / state store / source is added.
context.registerPCollectionProducer(outputPCollectionId, parentProcessor);
}
}
Loading
Loading