diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/HistoryPropagationScope.java b/durabletask-client/src/main/java/io/dapr/durabletask/HistoryPropagationScope.java new file mode 100644 index 0000000000..92122b2092 --- /dev/null +++ b/durabletask-client/src/main/java/io/dapr/durabletask/HistoryPropagationScope.java @@ -0,0 +1,60 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dapr.durabletask; + +import io.dapr.durabletask.implementation.protobuf.Orchestration; + +/** + * Controls how execution history is propagated to a child workflow or activity. + */ +public enum HistoryPropagationScope { + /** + * No propagation. The child receives no history from the caller. + */ + NONE, + + /** + * Propagate the caller's own history events only. The child does not see + * any ancestral history (trust boundary). + */ + OWN_HISTORY, + + /** + * Propagate the caller's own history events AND the full ancestral chain. + * Any propagated history this workflow received from its parent is forwarded to the child. + */ + LINEAGE; + + Orchestration.HistoryPropagationScope toProto() { + switch (this) { + case OWN_HISTORY: + return Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_OWN_HISTORY; + case LINEAGE: + return Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_LINEAGE; + default: + return Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_NONE; + } + } + + static HistoryPropagationScope fromProto(Orchestration.HistoryPropagationScope proto) { + switch (proto) { + case HISTORY_PROPAGATION_SCOPE_OWN_HISTORY: + return OWN_HISTORY; + case HISTORY_PROPAGATION_SCOPE_LINEAGE: + return LINEAGE; + default: + return NONE; + } + } +} diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/PropagatedHistory.java b/durabletask-client/src/main/java/io/dapr/durabletask/PropagatedHistory.java new file mode 100644 index 0000000000..cf5777d757 --- /dev/null +++ b/durabletask-client/src/main/java/io/dapr/durabletask/PropagatedHistory.java @@ -0,0 +1,184 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dapr.durabletask; + +import io.dapr.durabletask.implementation.protobuf.HistoryEvents; +import io.dapr.durabletask.implementation.protobuf.Orchestration; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Represents propagated execution history from a parent workflow to a child workflow or activity. + * Provides query methods for inspecting ancestor execution history. + */ +public final class PropagatedHistory { + private final List events; + private final HistoryPropagationScope scope; + private final List chunks; + + PropagatedHistory(List events, + HistoryPropagationScope scope, + List chunks) { + this.events = Collections.unmodifiableList(new ArrayList<>(events)); + this.scope = scope; + this.chunks = Collections.unmodifiableList(new ArrayList<>(chunks)); + } + + static PropagatedHistory fromProto(HistoryEvents.PropagatedHistory proto) { + List chunks = proto.getChunksList().stream() + .map(c -> new PropagatedHistoryChunk( + c.getAppId(), + c.getStartEventIndex(), + c.getEventCount(), + c.getInstanceId(), + c.getWorkflowName())) + .collect(Collectors.toList()); + + HistoryPropagationScope scope = HistoryPropagationScope.fromProto(proto.getScope()); + + return new PropagatedHistory(proto.getEventsList(), scope, chunks); + } + + /** + * Gets the raw history events that were propagated. + * + * @return an unmodifiable list of history events + */ + public List getEvents() { + return this.events; + } + + /** + * Gets the propagation scope that was used to produce this history. + * + * @return the history propagation scope + */ + public HistoryPropagationScope getScope() { + return this.scope; + } + + /** + * Gets the workflow chunks identifying which app/workflow produced which events. + * + * @return an unmodifiable list of history chunks + */ + public List getWorkflows() { + return this.chunks; + } + + /** + * Gets a deduplicated, ordered list of app IDs in the propagation chain. + * + * @return list of app IDs in the order they appear in chunks + */ + public List getAppIDs() { + Set seen = new LinkedHashSet<>(); + for (PropagatedHistoryChunk chunk : this.chunks) { + if (chunk.getAppId() != null && !chunk.getAppId().isEmpty()) { + seen.add(chunk.getAppId()); + } + } + return new ArrayList<>(seen); + } + + /** + * Gets the first workflow chunk matching the given workflow name. + * Returns the last match (most recent occurrence) if multiple exist. + * + * @param name the workflow name to search for + * @return an Optional containing the matching chunk, or empty if not found + */ + public Optional getWorkflowByName(String name) { + List matches = getWorkflowsByName(name); + if (matches.isEmpty()) { + return Optional.empty(); + } + return Optional.of(matches.get(matches.size() - 1)); + } + + /** + * Gets all workflow chunks matching the given workflow name. + * + * @param name the workflow name to search for + * @return a list of matching chunks in execution order + */ + public List getWorkflowsByName(String name) { + return this.chunks.stream() + .filter(c -> name.equals(c.getWorkflowName())) + .collect(Collectors.toList()); + } + + /** + * Gets the history events produced by the given app ID. + * + * @param appId the app ID to filter by + * @return a list of history events from the specified app + */ + public List getEventsByAppID(String appId) { + List result = new ArrayList<>(); + for (PropagatedHistoryChunk chunk : this.chunks) { + if (appId.equals(chunk.getAppId())) { + addEventsFromChunk(chunk, result); + } + } + return result; + } + + /** + * Gets the history events produced by the given workflow instance ID. + * + * @param instanceId the instance ID to filter by + * @return a list of history events from the specified instance + */ + public List getEventsByInstanceID(String instanceId) { + List result = new ArrayList<>(); + for (PropagatedHistoryChunk chunk : this.chunks) { + if (instanceId.equals(chunk.getInstanceId())) { + addEventsFromChunk(chunk, result); + } + } + return result; + } + + /** + * Gets the history events produced by the given workflow name. + * + * @param workflowName the workflow name to filter by + * @return a list of history events from the specified workflow + */ + public List getEventsByWorkflowName(String workflowName) { + List result = new ArrayList<>(); + for (PropagatedHistoryChunk chunk : this.chunks) { + if (workflowName.equals(chunk.getWorkflowName())) { + addEventsFromChunk(chunk, result); + } + } + return result; + } + + private void addEventsFromChunk(PropagatedHistoryChunk chunk, + List result) { + int start = chunk.getStartEventIndex(); + int end = Math.min(start + chunk.getEventCount(), this.events.size()); + for (int i = start; i < end; i++) { + result.add(this.events.get(i)); + } + } +} diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/PropagatedHistoryChunk.java b/durabletask-client/src/main/java/io/dapr/durabletask/PropagatedHistoryChunk.java new file mode 100644 index 0000000000..b004965c78 --- /dev/null +++ b/durabletask-client/src/main/java/io/dapr/durabletask/PropagatedHistoryChunk.java @@ -0,0 +1,80 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dapr.durabletask; + +/** + * Represents a chunk of propagated history events from a specific workflow instance. + */ +public final class PropagatedHistoryChunk { + private final String appId; + private final int startEventIndex; + private final int eventCount; + private final String instanceId; + private final String workflowName; + + PropagatedHistoryChunk(String appId, int startEventIndex, int eventCount, + String instanceId, String workflowName) { + this.appId = appId; + this.startEventIndex = startEventIndex; + this.eventCount = eventCount; + this.instanceId = instanceId; + this.workflowName = workflowName; + } + + /** + * Gets the app ID that produced the events in this chunk. + * + * @return the app ID + */ + public String getAppId() { + return this.appId; + } + + /** + * Gets the index of the first event in this chunk (inclusive) into the + * propagated history events list. + * + * @return the start event index + */ + public int getStartEventIndex() { + return this.startEventIndex; + } + + /** + * Gets the number of events in this chunk. + * + * @return the event count + */ + public int getEventCount() { + return this.eventCount; + } + + /** + * Gets the workflow instance ID that produced the events in this chunk. + * + * @return the instance ID + */ + public String getInstanceId() { + return this.instanceId; + } + + /** + * Gets the workflow name that produced the events in this chunk. + * + * @return the workflow name + */ + public String getWorkflowName() { + return this.workflowName; + } +} diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityContext.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityContext.java index 08f5ebb832..3fc317bd2c 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityContext.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityContext.java @@ -13,6 +13,8 @@ package io.dapr.durabletask; +import java.util.Optional; + /** * Interface that provides {@link TaskActivity} implementations with activity context, such as an activity's name and * its input. @@ -53,4 +55,11 @@ public interface TaskActivityContext { * @return trace parent id */ String getTraceParent(); + + /** + * Gets the propagated history from a parent workflow, if any was propagated. + * + * @return an Optional containing the propagated history, or empty if none was propagated + */ + Optional getPropagatedHistory(); } diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java index c4c66e892a..a617e9ae99 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java @@ -13,7 +13,11 @@ package io.dapr.durabletask; +import io.dapr.durabletask.implementation.protobuf.HistoryEvents; + +import javax.annotation.Nullable; import java.util.HashMap; +import java.util.Optional; import java.util.logging.Logger; public final class TaskActivityExecutor { @@ -50,6 +54,24 @@ public TaskActivityExecutor( */ public String execute(String taskName, String input, String taskExecutionId, int taskId, String traceParent) throws Throwable { + return execute(taskName, input, taskExecutionId, taskId, traceParent, null); + } + + /** + * Executes an activity task with optional propagated history. + * + * @param taskName the name of the activity task to execute + * @param input the serialized input payload for the activity task + * @param taskExecutionId Unique ID for the task execution. + * @param taskId Auto-incrementing ID for the task. + * @param traceParent The traceparent header value. + * @param propagatedHistory Propagated history from a parent workflow, or null. + * @return the serialized output payload for the activity task, or null if the activity task returned null. + * @throws Throwable if an unhandled exception occurs during activity task execution. + */ + public String execute(String taskName, String input, + String taskExecutionId, int taskId, String traceParent, + @Nullable HistoryEvents.PropagatedHistory propagatedHistory) throws Throwable { TaskActivityFactory factory = this.activityFactories.get(taskName); if (factory == null) { throw new IllegalStateException( @@ -62,8 +84,10 @@ public String execute(String taskName, String input, String.format("The task factory '%s' returned a null TaskActivity object.", taskName)); } + PropagatedHistory parsed = propagatedHistory != null + ? PropagatedHistory.fromProto(propagatedHistory) : null; TaskActivityContextImpl context = new TaskActivityContextImpl( - taskName, input, taskExecutionId, taskId, traceParent); + taskName, input, taskExecutionId, taskId, traceParent, parsed); // Unhandled exceptions are allowed to escape Object output = activity.run(context); @@ -80,16 +104,19 @@ private class TaskActivityContextImpl implements TaskActivityContext { private final String taskExecutionId; private final int taskId; private final String traceParent; + private final PropagatedHistory propagatedHistory; private final DataConverter dataConverter = TaskActivityExecutor.this.dataConverter; public TaskActivityContextImpl(String activityName, String rawInput, - String taskExecutionId, int taskId, String traceParent) { + String taskExecutionId, int taskId, String traceParent, + @Nullable PropagatedHistory propagatedHistory) { this.name = activityName; this.rawInput = rawInput; this.taskExecutionId = taskExecutionId; this.taskId = taskId; this.traceParent = traceParent; + this.propagatedHistory = propagatedHistory; } @Override @@ -120,5 +147,10 @@ public int getTaskId() { public String getTraceParent() { return this.traceParent; } + + @Override + public Optional getPropagatedHistory() { + return Optional.ofNullable(this.propagatedHistory); + } } } diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOptions.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOptions.java index e23ee54b77..0f51e6830d 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOptions.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOptions.java @@ -20,11 +20,14 @@ public final class TaskOptions { private final RetryPolicy retryPolicy; private final RetryHandler retryHandler; private final String appID; + private final HistoryPropagationScope historyPropagationScope; - private TaskOptions(RetryPolicy retryPolicy, RetryHandler retryHandler, String appID) { + private TaskOptions(RetryPolicy retryPolicy, RetryHandler retryHandler, String appID, + HistoryPropagationScope historyPropagationScope) { this.retryPolicy = retryPolicy; this.retryHandler = retryHandler; this.appID = appID; + this.historyPropagationScope = historyPropagationScope; } /** @@ -114,6 +117,20 @@ boolean hasAppID() { return this.appID != null && !this.appID.isEmpty(); } + /** + * Gets the configured {@link HistoryPropagationScope} value or {@code null} if none was configured. + * + * @return the configured history propagation scope + */ + public HistoryPropagationScope getHistoryPropagationScope() { + return this.historyPropagationScope; + } + + boolean hasHistoryPropagationScope() { + return this.historyPropagationScope != null + && this.historyPropagationScope != HistoryPropagationScope.NONE; + } + /** * Builder for creating {@code TaskOptions} instances. */ @@ -121,6 +138,7 @@ public static final class Builder { private RetryPolicy retryPolicy; private RetryHandler retryHandler; private String appID; + private HistoryPropagationScope historyPropagationScope; private Builder() { // Private constructor -enforces using TaskOptions.builder() @@ -159,13 +177,25 @@ public Builder appID(String appID) { return this; } + /** + * Sets the history propagation scope for the task. + * + * @param scope the propagation scope to use + * @return this builder instance for method chaining + */ + public Builder historyPropagationScope(HistoryPropagationScope scope) { + this.historyPropagationScope = scope; + return this; + } + /** * Builds a new {@code TaskOptions} instance with the configured values. * * @return a new TaskOptions instance */ public TaskOptions build() { - return new TaskOptions(this.retryPolicy, this.retryHandler, this.appID); + return new TaskOptions(this.retryPolicy, this.retryHandler, this.appID, + this.historyPropagationScope); } } } diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java index bb8c1b9873..3c7ecf4aef 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java @@ -19,6 +19,7 @@ import java.time.ZonedDateTime; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.UUID; /** @@ -630,4 +631,11 @@ default Task waitForExternalEvent(String name, Class dataType) { * Clears the orchestration's custom status. */ void clearCustomStatus(); + + /** + * Gets the propagated history from a parent workflow, if any was propagated. + * + * @return an Optional containing the propagated history, or empty if none was propagated + */ + Optional getPropagatedHistory(); } diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java index b62e127cd9..e51ccc66a6 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java @@ -40,6 +40,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Queue; import java.util.Set; import java.util.UUID; @@ -127,7 +128,23 @@ public TaskOrchestrationExecutor( */ public TaskOrchestratorResult execute(List pastEvents, List newEvents) { - ContextImplTask context = new ContextImplTask(pastEvents, newEvents); + return execute(pastEvents, newEvents, null); + } + + /** + * Executes the orchestration with the given past and new events, plus optional propagated history. + * + * @param pastEvents list of past history events + * @param newEvents list of new history events + * @param propagatedHistory propagated history from a parent workflow, or null + * @return the result of the orchestrator execution + */ + public TaskOrchestratorResult execute(List pastEvents, + List newEvents, + @Nullable HistoryEvents.PropagatedHistory propagatedHistory) { + PropagatedHistory parsed = propagatedHistory != null + ? PropagatedHistory.fromProto(propagatedHistory) : null; + ContextImplTask context = new ContextImplTask(pastEvents, newEvents, parsed); boolean completed = false; try { @@ -200,9 +217,13 @@ private class ContextImplTask implements TaskOrchestrationContext { private String versionName; + private final PropagatedHistory propagatedHistory; + public ContextImplTask(List pastEvents, - List newEvents) { + List newEvents, + @Nullable PropagatedHistory propagatedHistory) { this.historyEventPlayer = new OrchestrationHistoryIterator(pastEvents, newEvents); + this.propagatedHistory = propagatedHistory; } @Override @@ -245,6 +266,11 @@ public String getAppId() { return this.appId; } + @Override + public Optional getPropagatedHistory() { + return Optional.ofNullable(this.propagatedHistory); + } + private void setAppId(String appId) { this.appId = appId; } @@ -406,6 +432,12 @@ public Task callActivity( this.appId, targetAppId)); } + // Set history propagation scope if specified + if (options != null && options.hasHistoryPropagationScope()) { + scheduleTaskBuilder.setHistoryPropagationScope( + options.getHistoryPropagationScope().toProto()); + } + TaskFactory taskFactory = () -> { int id = this.sequenceNumber++; OrchestratorActions.WorkflowAction.Builder actionBuilder = OrchestratorActions.WorkflowAction @@ -577,6 +609,12 @@ public Task callSubOrchestrator( createSubOrchestrationActionBuilder.setRouter(routerBuilder.build()); } + // Set history propagation scope if specified + if (options != null && options.hasHistoryPropagationScope()) { + createSubOrchestrationActionBuilder.setHistoryPropagationScope( + options.getHistoryPropagationScope().toProto()); + } + TaskFactory taskFactory = () -> { int id = this.sequenceNumber++; OrchestratorActions.WorkflowAction.Builder actionBuilder = OrchestratorActions.WorkflowAction diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/runner/ActivityRunner.java b/durabletask-client/src/main/java/io/dapr/durabletask/runner/ActivityRunner.java index 54407edd04..2bd5b5adc0 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/runner/ActivityRunner.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/runner/ActivityRunner.java @@ -111,7 +111,9 @@ private void executeActivity() throws Throwable { activityRequest.getInput().getValue(), activityRequest.getTaskExecutionId(), activityRequest.getTaskId(), - activityRequest.getParentTraceContext().getTraceParent()); + activityRequest.getParentTraceContext().getTraceParent(), + activityRequest.hasPropagatedHistory() + ? activityRequest.getPropagatedHistory() : null); } catch (Throwable e) { failureDetails = Orchestration.TaskFailureDetails.newBuilder() .setErrorType(e.getClass().getName()) diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/runner/OrchestratorRunner.java b/durabletask-client/src/main/java/io/dapr/durabletask/runner/OrchestratorRunner.java index f98bb50522..e46cbe978b 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/runner/OrchestratorRunner.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/runner/OrchestratorRunner.java @@ -57,7 +57,9 @@ public OrchestratorRunner( public void run() { TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute( orchestratorRequest.getPastEventsList(), - orchestratorRequest.getNewEventsList()); + orchestratorRequest.getNewEventsList(), + orchestratorRequest.hasPropagatedHistory() + ? orchestratorRequest.getPropagatedHistory() : null); var versionBuilder = Orchestration.WorkflowVersion.newBuilder(); diff --git a/durabletask-client/src/test/java/io/dapr/durabletask/ActivityHistoryPropagationTest.java b/durabletask-client/src/test/java/io/dapr/durabletask/ActivityHistoryPropagationTest.java new file mode 100644 index 0000000000..72c2c063b6 --- /dev/null +++ b/durabletask-client/src/test/java/io/dapr/durabletask/ActivityHistoryPropagationTest.java @@ -0,0 +1,108 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dapr.durabletask; + +import com.google.protobuf.Timestamp; +import io.dapr.durabletask.implementation.protobuf.HistoryEvents; +import io.dapr.durabletask.implementation.protobuf.Orchestration; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Optional; +import java.util.logging.Logger; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Unit tests for PropagatedHistory surfacing on TaskActivityExecutor. + */ +class ActivityHistoryPropagationTest { + + private static final Logger logger = Logger.getLogger(ActivityHistoryPropagationTest.class.getName()); + + @Test + void execute_withPropagatedHistory_surfacesOnContext() throws Throwable { + final Optional[] captured = new Optional[]{Optional.empty()}; + + HashMap factories = new HashMap<>(); + factories.put("MyActivity", () -> ctx -> { + captured[0] = ctx.getPropagatedHistory(); + return "done"; + }); + + TaskActivityExecutor executor = new TaskActivityExecutor(factories, new JacksonDataConverter(), logger); + + HistoryEvents.HistoryEvent event = HistoryEvents.HistoryEvent.newBuilder() + .setEventId(0) + .setTimestamp(Timestamp.newBuilder().setSeconds(1000).build()) + .setTaskScheduled(HistoryEvents.TaskScheduledEvent.newBuilder().setName("ValidateCard").build()) + .build(); + + HistoryEvents.PropagatedHistory propagatedHistoryProto = HistoryEvents.PropagatedHistory.newBuilder() + .addEvents(event) + .setScope(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_OWN_HISTORY) + .addChunks(HistoryEvents.PropagatedHistoryChunk.newBuilder() + .setAppId("parent-app") + .setStartEventIndex(0) + .setEventCount(1) + .setInstanceId("parent-inst-1") + .setWorkflowName("ProcessPayment") + .build()) + .build(); + + executor.execute("MyActivity", "\"input\"", "exec-1", 0, "traceparent", propagatedHistoryProto); + + assertTrue(captured[0].isPresent()); + PropagatedHistory history = captured[0].get(); + assertEquals(HistoryPropagationScope.OWN_HISTORY, history.getScope()); + assertEquals(1, history.getEvents().size()); + assertEquals("ValidateCard", history.getEvents().get(0).getTaskScheduled().getName()); + assertEquals(1, history.getWorkflows().size()); + assertEquals("parent-app", history.getWorkflows().get(0).getAppId()); + } + + @Test + void execute_withoutPropagatedHistory_returnsEmptyOptional() throws Throwable { + final Optional[] captured = new Optional[]{Optional.empty()}; + + HashMap factories = new HashMap<>(); + factories.put("MyActivity", () -> ctx -> { + captured[0] = ctx.getPropagatedHistory(); + return "done"; + }); + + TaskActivityExecutor executor = new TaskActivityExecutor(factories, new JacksonDataConverter(), logger); + + executor.execute("MyActivity", "\"input\"", "exec-1", 0, "traceparent"); + + assertFalse(captured[0].isPresent()); + } + + @Test + void execute_withNullPropagatedHistory_returnsEmptyOptional() throws Throwable { + final Optional[] captured = new Optional[]{Optional.empty()}; + + HashMap factories = new HashMap<>(); + factories.put("MyActivity", () -> ctx -> { + captured[0] = ctx.getPropagatedHistory(); + return "done"; + }); + + TaskActivityExecutor executor = new TaskActivityExecutor(factories, new JacksonDataConverter(), logger); + + executor.execute("MyActivity", "\"input\"", "exec-1", 0, "traceparent", null); + + assertFalse(captured[0].isPresent()); + } +} diff --git a/durabletask-client/src/test/java/io/dapr/durabletask/HistoryPropagationIntegrationTest.java b/durabletask-client/src/test/java/io/dapr/durabletask/HistoryPropagationIntegrationTest.java new file mode 100644 index 0000000000..b5aae7e720 --- /dev/null +++ b/durabletask-client/src/test/java/io/dapr/durabletask/HistoryPropagationIntegrationTest.java @@ -0,0 +1,534 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dapr.durabletask; + +import com.google.protobuf.StringValue; +import com.google.protobuf.Timestamp; +import io.dapr.durabletask.implementation.protobuf.HistoryEvents; +import io.dapr.durabletask.implementation.protobuf.Orchestration; +import io.dapr.durabletask.implementation.protobuf.OrchestratorActions; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactories; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactory; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; +import java.util.logging.Logger; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Integration-style tests for the full workflow history propagation flow. + * Tests the end-to-end behavior of parent workflows setting propagation scope + * on actions, and child workflows/activities receiving propagated history. + */ +class HistoryPropagationIntegrationTest { + + private static final Logger logger = Logger.getLogger(HistoryPropagationIntegrationTest.class.getName()); + private static final Duration MAX_TIMER_INTERVAL = Duration.ofDays(3); + private static final Instant TEST_INSTANT = Instant.parse("2024-01-01T00:00:00Z"); + + private static Timestamp ts(Instant instant) { + return Timestamp.newBuilder() + .setSeconds(instant.getEpochSecond()) + .setNanos(instant.getNano()) + .build(); + } + + private static HistoryEvents.HistoryEvent workflowStarted() { + return HistoryEvents.HistoryEvent.newBuilder() + .setEventId(-1) + .setTimestamp(ts(TEST_INSTANT)) + .setWorkflowStarted(HistoryEvents.WorkflowStartedEvent.newBuilder().build()) + .build(); + } + + private static HistoryEvents.HistoryEvent executionStarted(String name, String instanceId) { + return HistoryEvents.HistoryEvent.newBuilder() + .setEventId(-1) + .setTimestamp(ts(TEST_INSTANT)) + .setExecutionStarted(HistoryEvents.ExecutionStartedEvent.newBuilder() + .setName(name) + .setWorkflowInstance( + Orchestration.WorkflowInstance.newBuilder().setInstanceId(instanceId).build()) + .build()) + .build(); + } + + private static HistoryEvents.HistoryEvent orchestratorCompleted() { + return HistoryEvents.HistoryEvent.newBuilder() + .setEventId(-1) + .setTimestamp(ts(TEST_INSTANT)) + .setWorkflowCompleted(HistoryEvents.WorkflowCompletedEvent.newBuilder().build()) + .build(); + } + + private TaskOrchestrationExecutor createExecutor(String name, TaskOrchestration orchestration, String appId) { + TaskOrchestrationFactories factories = new TaskOrchestrationFactories(); + factories.addOrchestration(new TaskOrchestrationFactory() { + @Override + public String getName() { return name; } + @Override + public TaskOrchestration create() { return orchestration; } + @Override + public String getVersionName() { return null; } + @Override + public Boolean isLatestVersion() { return false; } + }); + return new TaskOrchestrationExecutor(factories, new JacksonDataConverter(), MAX_TIMER_INTERVAL, logger, appId); + } + + /** + * Integration test: Parent workflow with Lineage scope → + * child workflow receives full history including parent's events. + * + * This simulates the full flow: + * 1. Parent orchestrates and sets LINEAGE scope on child workflow call + * 2. The runtime (simulated) builds PropagatedHistory from parent's history + * 3. Child workflow receives and can query the propagated history + */ + @Test + void lineageScope_parentToChild_childReceivesParentEvents() { + // === PHASE 1: Parent workflow schedules child with LINEAGE scope === + final String parentName = "ProcessPayment"; + final String childName = "FraudDetection"; + + TaskOrchestration parentOrchestration = ctx -> { + TaskOptions opts = TaskOptions.builder() + .historyPropagationScope(HistoryPropagationScope.LINEAGE) + .build(); + ctx.callSubOrchestrator(childName, "payment-data", "child-inst-1", opts, String.class); + }; + + TaskOrchestrationExecutor parentExecutor = createExecutor(parentName, parentOrchestration, "payment-app"); + + List parentNewEvents = List.of( + workflowStarted(), + executionStarted(parentName, "parent-inst-1"), + orchestratorCompleted() + ); + + TaskOrchestratorResult parentResult = parentExecutor.execute(new ArrayList<>(), parentNewEvents); + List parentActions = new ArrayList<>(parentResult.getActions()); + + // Verify parent set LINEAGE scope on the CreateChildWorkflowAction + assertEquals(1, parentActions.size()); + OrchestratorActions.CreateChildWorkflowAction createChild = parentActions.get(0).getCreateChildWorkflow(); + assertTrue(createChild.hasHistoryPropagationScope()); + assertEquals(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_LINEAGE, + createChild.getHistoryPropagationScope()); + + // === PHASE 2: Simulate runtime building propagated history from parent === + // In a real scenario, the Dapr runtime builds this from the parent's execution history + HistoryEvents.HistoryEvent parentTaskScheduled = HistoryEvents.HistoryEvent.newBuilder() + .setEventId(0) + .setTimestamp(ts(TEST_INSTANT)) + .setTaskScheduled(HistoryEvents.TaskScheduledEvent.newBuilder() + .setName("ValidateCard") + .build()) + .build(); + HistoryEvents.HistoryEvent parentTaskCompleted = HistoryEvents.HistoryEvent.newBuilder() + .setEventId(1) + .setTimestamp(ts(TEST_INSTANT)) + .setTaskCompleted(HistoryEvents.TaskCompletedEvent.newBuilder() + .setTaskScheduledId(0) + .setResult(StringValue.of("\"valid\"")) + .build()) + .build(); + + HistoryEvents.PropagatedHistory propagatedHistory = HistoryEvents.PropagatedHistory.newBuilder() + .addEvents(parentTaskScheduled) + .addEvents(parentTaskCompleted) + .setScope(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_LINEAGE) + .addChunks(HistoryEvents.PropagatedHistoryChunk.newBuilder() + .setAppId("payment-app") + .setStartEventIndex(0) + .setEventCount(2) + .setInstanceId("parent-inst-1") + .setWorkflowName("ProcessPayment") + .build()) + .build(); + + // === PHASE 3: Child workflow receives and queries propagated history === + final Optional[] childHistory = new Optional[]{Optional.empty()}; + + TaskOrchestration childOrchestration = ctx -> { + childHistory[0] = ctx.getPropagatedHistory(); + ctx.complete("fraud-check-passed"); + }; + + TaskOrchestrationExecutor childExecutor = createExecutor(childName, childOrchestration, "fraud-app"); + + List childNewEvents = List.of( + workflowStarted(), + executionStarted(childName, "child-inst-1"), + orchestratorCompleted() + ); + + childExecutor.execute(new ArrayList<>(), childNewEvents, propagatedHistory); + + // Verify child received the propagated history + assertTrue(childHistory[0].isPresent()); + PropagatedHistory history = childHistory[0].get(); + assertEquals(HistoryPropagationScope.LINEAGE, history.getScope()); + assertEquals(2, history.getEvents().size()); + + // Verify child can query by workflow name + Optional parentChunk = history.getWorkflowByName("ProcessPayment"); + assertTrue(parentChunk.isPresent()); + assertEquals("payment-app", parentChunk.get().getAppId()); + assertEquals("parent-inst-1", parentChunk.get().getInstanceId()); + + // Verify child can filter events by app ID + List paymentAppEvents = history.getEventsByAppID("payment-app"); + assertEquals(2, paymentAppEvents.size()); + + // Verify deduplicated app IDs + List appIds = history.getAppIDs(); + assertEquals(1, appIds.size()); + assertEquals("payment-app", appIds.get(0)); + } + + /** + * Integration test: Parent workflow with OWN_HISTORY scope → + * child workflow only receives caller's events (trust boundary). + * Grandparent history is NOT forwarded. + */ + @Test + void ownHistoryScope_parentToChild_childReceivesOnlyCallerEvents() { + // === PHASE 1: Parent workflow schedules child with OWN_HISTORY scope === + final String parentName = "SettlePayment"; + final String childName = "RecordTransaction"; + + TaskOrchestration parentOrchestration = ctx -> { + TaskOptions opts = TaskOptions.builder() + .historyPropagationScope(HistoryPropagationScope.OWN_HISTORY) + .build(); + ctx.callSubOrchestrator(childName, "settlement-data", "child-inst-2", opts, String.class); + }; + + TaskOrchestrationExecutor parentExecutor = createExecutor(parentName, parentOrchestration, "settlement-app"); + + List parentNewEvents = List.of( + workflowStarted(), + executionStarted(parentName, "parent-inst-2"), + orchestratorCompleted() + ); + + TaskOrchestratorResult parentResult = parentExecutor.execute(new ArrayList<>(), parentNewEvents); + List parentActions = new ArrayList<>(parentResult.getActions()); + + // Verify parent set OWN_HISTORY scope + OrchestratorActions.CreateChildWorkflowAction createChild = parentActions.get(0).getCreateChildWorkflow(); + assertEquals(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_OWN_HISTORY, + createChild.getHistoryPropagationScope()); + + // === PHASE 2: Simulate runtime building OWN_HISTORY propagated history === + // Only the caller's (parent's) events - no grandparent chain + HistoryEvents.HistoryEvent parentEvent = HistoryEvents.HistoryEvent.newBuilder() + .setEventId(0) + .setTimestamp(ts(TEST_INSTANT)) + .setTaskScheduled(HistoryEvents.TaskScheduledEvent.newBuilder() + .setName("ApproveSettlement") + .build()) + .build(); + + HistoryEvents.PropagatedHistory propagatedHistory = HistoryEvents.PropagatedHistory.newBuilder() + .addEvents(parentEvent) + .setScope(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_OWN_HISTORY) + .addChunks(HistoryEvents.PropagatedHistoryChunk.newBuilder() + .setAppId("settlement-app") + .setStartEventIndex(0) + .setEventCount(1) + .setInstanceId("parent-inst-2") + .setWorkflowName("SettlePayment") + .build()) + .build(); + + // === PHASE 3: Child receives only OWN_HISTORY (no grandparent) === + final Optional[] childHistory = new Optional[]{Optional.empty()}; + + TaskOrchestration childOrchestration = ctx -> { + childHistory[0] = ctx.getPropagatedHistory(); + ctx.complete("recorded"); + }; + + TaskOrchestrationExecutor childExecutor = createExecutor(childName, childOrchestration, "record-app"); + + List childNewEvents = List.of( + workflowStarted(), + executionStarted(childName, "child-inst-2"), + orchestratorCompleted() + ); + + childExecutor.execute(new ArrayList<>(), childNewEvents, propagatedHistory); + + assertTrue(childHistory[0].isPresent()); + PropagatedHistory history = childHistory[0].get(); + assertEquals(HistoryPropagationScope.OWN_HISTORY, history.getScope()); + + // Only one chunk from the immediate parent, no grandparent + assertEquals(1, history.getWorkflows().size()); + assertEquals("SettlePayment", history.getWorkflows().get(0).getWorkflowName()); + assertEquals(1, history.getEvents().size()); + } + + /** + * Integration test: Activity receives propagated history. + * Parent workflow sets LINEAGE scope on activity call → activity can inspect history. + */ + @Test + void lineageScope_parentToActivity_activityReceivesHistory() throws Throwable { + // === PHASE 1: Parent sets LINEAGE scope on activity call === + final String parentName = "ProcessPayment"; + final String activityName = "SettlePayment"; + + TaskOrchestration parentOrchestration = ctx -> { + TaskOptions opts = TaskOptions.builder() + .historyPropagationScope(HistoryPropagationScope.LINEAGE) + .build(); + ctx.callActivity(activityName, "settle-data", opts, String.class); + }; + + TaskOrchestrationExecutor parentExecutor = createExecutor(parentName, parentOrchestration, "payment-app"); + + List parentNewEvents = List.of( + workflowStarted(), + executionStarted(parentName, "parent-inst-3"), + orchestratorCompleted() + ); + + TaskOrchestratorResult parentResult = parentExecutor.execute(new ArrayList<>(), parentNewEvents); + List parentActions = new ArrayList<>(parentResult.getActions()); + + // Verify the ScheduleTaskAction has LINEAGE scope + OrchestratorActions.ScheduleTaskAction scheduleTask = parentActions.get(0).getScheduleTask(); + assertTrue(scheduleTask.hasHistoryPropagationScope()); + assertEquals(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_LINEAGE, + scheduleTask.getHistoryPropagationScope()); + + // === PHASE 2: Simulate runtime building propagated history for activity === + HistoryEvents.HistoryEvent event = HistoryEvents.HistoryEvent.newBuilder() + .setEventId(0) + .setTimestamp(ts(TEST_INSTANT)) + .setTaskScheduled(HistoryEvents.TaskScheduledEvent.newBuilder() + .setName("ValidateCard") + .build()) + .build(); + + HistoryEvents.PropagatedHistory propagatedHistoryProto = HistoryEvents.PropagatedHistory.newBuilder() + .addEvents(event) + .setScope(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_LINEAGE) + .addChunks(HistoryEvents.PropagatedHistoryChunk.newBuilder() + .setAppId("payment-app") + .setStartEventIndex(0) + .setEventCount(1) + .setInstanceId("parent-inst-3") + .setWorkflowName("ProcessPayment") + .build()) + .build(); + + // === PHASE 3: Activity receives and queries propagated history === + final Optional[] activityHistory = new Optional[]{Optional.empty()}; + + HashMap factories = new HashMap<>(); + factories.put(activityName, () -> ctx -> { + activityHistory[0] = ctx.getPropagatedHistory(); + return "settled"; + }); + + TaskActivityExecutor activityExecutor = new TaskActivityExecutor(factories, new JacksonDataConverter(), logger); + + activityExecutor.execute(activityName, "\"settle-data\"", "exec-1", 0, "", propagatedHistoryProto); + + assertTrue(activityHistory[0].isPresent()); + PropagatedHistory history = activityHistory[0].get(); + assertEquals(HistoryPropagationScope.LINEAGE, history.getScope()); + assertEquals(1, history.getEvents().size()); + assertEquals("ValidateCard", history.getEvents().get(0).getTaskScheduled().getName()); + + Optional wf = history.getWorkflowByName("ProcessPayment"); + assertTrue(wf.isPresent()); + assertEquals("payment-app", wf.get().getAppId()); + } + + /** + * Integration test: Multi-tier lineage propagation. + * Grandparent → Parent → Child all propagate with LINEAGE scope. + * Child should see events from both grandparent and parent via multiple chunks. + */ + @Test + void lineageScope_multiTier_childSeesFullAncestorChain() { + final String childName = "AuditRecord"; + final Optional[] childHistory = new Optional[]{Optional.empty()}; + + TaskOrchestration childOrchestration = ctx -> { + childHistory[0] = ctx.getPropagatedHistory(); + ctx.complete("audited"); + }; + + TaskOrchestrationExecutor childExecutor = createExecutor(childName, childOrchestration, "audit-app"); + + // Build multi-tier propagated history (grandparent + parent) + HistoryEvents.HistoryEvent gpEvent = HistoryEvents.HistoryEvent.newBuilder() + .setEventId(0) + .setTimestamp(ts(TEST_INSTANT)) + .setTaskScheduled(HistoryEvents.TaskScheduledEvent.newBuilder() + .setName("InitiatePayment") + .build()) + .build(); + HistoryEvents.HistoryEvent parentEvent = HistoryEvents.HistoryEvent.newBuilder() + .setEventId(1) + .setTimestamp(ts(TEST_INSTANT)) + .setTaskScheduled(HistoryEvents.TaskScheduledEvent.newBuilder() + .setName("ValidateCard") + .build()) + .build(); + HistoryEvents.HistoryEvent parentEvent2 = HistoryEvents.HistoryEvent.newBuilder() + .setEventId(2) + .setTimestamp(ts(TEST_INSTANT)) + .setTaskCompleted(HistoryEvents.TaskCompletedEvent.newBuilder() + .setTaskScheduledId(1) + .setResult(StringValue.of("\"valid\"")) + .build()) + .build(); + + HistoryEvents.PropagatedHistory propagatedHistory = HistoryEvents.PropagatedHistory.newBuilder() + .addEvents(gpEvent) + .addEvents(parentEvent) + .addEvents(parentEvent2) + .setScope(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_LINEAGE) + .addChunks(HistoryEvents.PropagatedHistoryChunk.newBuilder() + .setAppId("gateway-app") + .setStartEventIndex(0) + .setEventCount(1) + .setInstanceId("gp-inst-1") + .setWorkflowName("GatewayWorkflow") + .build()) + .addChunks(HistoryEvents.PropagatedHistoryChunk.newBuilder() + .setAppId("payment-app") + .setStartEventIndex(1) + .setEventCount(2) + .setInstanceId("parent-inst-1") + .setWorkflowName("ProcessPayment") + .build()) + .build(); + + List childNewEvents = List.of( + workflowStarted(), + executionStarted(childName, "child-inst-audit"), + orchestratorCompleted() + ); + + childExecutor.execute(new ArrayList<>(), childNewEvents, propagatedHistory); + + assertTrue(childHistory[0].isPresent()); + PropagatedHistory history = childHistory[0].get(); + + // Verify full ancestor chain + assertEquals(HistoryPropagationScope.LINEAGE, history.getScope()); + assertEquals(3, history.getEvents().size()); + assertEquals(2, history.getWorkflows().size()); + + // Verify app IDs in order + List appIds = history.getAppIDs(); + assertEquals(2, appIds.size()); + assertEquals("gateway-app", appIds.get(0)); + assertEquals("payment-app", appIds.get(1)); + + // Query by workflow name + Optional gp = history.getWorkflowByName("GatewayWorkflow"); + assertTrue(gp.isPresent()); + assertEquals("gateway-app", gp.get().getAppId()); + assertEquals(1, gp.get().getEventCount()); + + Optional parent = history.getWorkflowByName("ProcessPayment"); + assertTrue(parent.isPresent()); + assertEquals("payment-app", parent.get().getAppId()); + assertEquals(2, parent.get().getEventCount()); + + // Query events by instance ID + List parentEvents = history.getEventsByInstanceID("parent-inst-1"); + assertEquals(2, parentEvents.size()); + assertEquals("ValidateCard", parentEvents.get(0).getTaskScheduled().getName()); + } + + /** + * Integration test: Combining cross-app routing with history propagation. + * Both features should work together without interference. + */ + @Test + void historyPropagation_combinedWithCrossAppRouting_bothWork() { + final String parentName = "ParentOrchestrator"; + final String childName = "ChildOrchestrator"; + final String sourceAppId = "source-app"; + final String targetAppId = "target-app"; + + TaskOrchestration parentOrchestration = ctx -> { + TaskOptions opts = TaskOptions.builder() + .appID(targetAppId) + .historyPropagationScope(HistoryPropagationScope.LINEAGE) + .build(); + ctx.callSubOrchestrator(childName, "input", "child-inst-combined", opts, String.class); + }; + + TaskOrchestrationExecutor parentExecutor = createExecutor(parentName, parentOrchestration, sourceAppId); + + Orchestration.TaskRouter router = Orchestration.TaskRouter.newBuilder() + .setSourceAppID(sourceAppId) + .build(); + + HistoryEvents.HistoryEvent execStarted = HistoryEvents.HistoryEvent.newBuilder() + .setEventId(-1) + .setTimestamp(ts(TEST_INSTANT)) + .setExecutionStarted(HistoryEvents.ExecutionStartedEvent.newBuilder() + .setName(parentName) + .setWorkflowInstance( + Orchestration.WorkflowInstance.newBuilder().setInstanceId("parent-combined").build()) + .build()) + .setRouter(router) + .build(); + + List parentNewEvents = List.of( + workflowStarted(), + execStarted, + orchestratorCompleted() + ); + + TaskOrchestratorResult parentResult = parentExecutor.execute(new ArrayList<>(), parentNewEvents); + List actions = new ArrayList<>(parentResult.getActions()); + assertEquals(1, actions.size()); + + OrchestratorActions.WorkflowAction action = actions.get(0); + OrchestratorActions.CreateChildWorkflowAction createChild = action.getCreateChildWorkflow(); + + // Verify BOTH cross-app routing and history propagation are set + assertTrue(createChild.hasRouter()); + assertEquals(sourceAppId, createChild.getRouter().getSourceAppID()); + assertEquals(targetAppId, createChild.getRouter().getTargetAppID()); + + assertTrue(createChild.hasHistoryPropagationScope()); + assertEquals(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_LINEAGE, + createChild.getHistoryPropagationScope()); + + // Verify OrchestratorAction-level router + assertTrue(action.hasRouter()); + assertEquals(sourceAppId, action.getRouter().getSourceAppID()); + assertEquals(targetAppId, action.getRouter().getTargetAppID()); + } +} diff --git a/durabletask-client/src/test/java/io/dapr/durabletask/HistoryPropagationTest.java b/durabletask-client/src/test/java/io/dapr/durabletask/HistoryPropagationTest.java new file mode 100644 index 0000000000..442828539d --- /dev/null +++ b/durabletask-client/src/test/java/io/dapr/durabletask/HistoryPropagationTest.java @@ -0,0 +1,657 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dapr.durabletask; + +import com.google.protobuf.StringValue; +import com.google.protobuf.Timestamp; +import io.dapr.durabletask.implementation.protobuf.HistoryEvents; +import io.dapr.durabletask.implementation.protobuf.Orchestration; +import io.dapr.durabletask.implementation.protobuf.OrchestratorActions; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactories; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactory; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.logging.Logger; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Unit tests for workflow history propagation in TaskOrchestrationExecutor. + */ +class HistoryPropagationTest { + + private static final Logger logger = Logger.getLogger(HistoryPropagationTest.class.getName()); + private static final Duration MAX_TIMER_INTERVAL = Duration.ofDays(3); + + private static HistoryEvents.HistoryEvent orchestratorStarted() { + return HistoryEvents.HistoryEvent.newBuilder() + .setEventId(-1) + .setTimestamp(Timestamp.newBuilder().setSeconds(1000).build()) + .setWorkflowStarted(HistoryEvents.WorkflowStartedEvent.newBuilder().build()) + .build(); + } + + private static HistoryEvents.HistoryEvent executionStarted(String name, String instanceId, String input) { + return HistoryEvents.HistoryEvent.newBuilder() + .setEventId(-1) + .setTimestamp(Timestamp.newBuilder().setSeconds(1000).build()) + .setExecutionStarted(HistoryEvents.ExecutionStartedEvent.newBuilder() + .setName(name) + .setWorkflowInstance( + Orchestration.WorkflowInstance.newBuilder().setInstanceId(instanceId).build()) + .setInput(StringValue.of(input)) + .build()) + .build(); + } + + private static HistoryEvents.HistoryEvent orchestratorCompleted() { + return HistoryEvents.HistoryEvent.newBuilder() + .setEventId(-1) + .setTimestamp(Timestamp.newBuilder().setSeconds(1000).build()) + .setWorkflowCompleted(HistoryEvents.WorkflowCompletedEvent.newBuilder().build()) + .build(); + } + + private TaskOrchestrationExecutor createExecutor(String orchestratorName, TaskOrchestration orchestration) { + return createExecutor(orchestratorName, orchestration, null); + } + + private TaskOrchestrationExecutor createExecutor(String orchestratorName, TaskOrchestration orchestration, + String appId) { + TaskOrchestrationFactories factories = new TaskOrchestrationFactories(); + factories.addOrchestration(new TaskOrchestrationFactory() { + @Override + public String getName() { + return orchestratorName; + } + + @Override + public TaskOrchestration create() { + return orchestration; + } + + @Override + public String getVersionName() { + return null; + } + + @Override + public Boolean isLatestVersion() { + return false; + } + }); + return new TaskOrchestrationExecutor(factories, new JacksonDataConverter(), MAX_TIMER_INTERVAL, logger, appId); + } + + // ================================================================================== + // Tests for HistoryPropagationScope on ScheduleTaskAction (activity calls) + // ================================================================================== + + @Test + void callActivity_withLineageScope_setsHistoryPropagationScopeOnAction() { + final String orchestratorName = "ParentOrchestrator"; + final String activityName = "MyActivity"; + + TaskOrchestration orchestration = ctx -> { + TaskOptions options = TaskOptions.builder() + .historyPropagationScope(HistoryPropagationScope.LINEAGE) + .build(); + ctx.callActivity(activityName, "input", options, String.class); + }; + + TaskOrchestrationExecutor executor = createExecutor(orchestratorName, orchestration); + + List newEvents = List.of( + orchestratorStarted(), + executionStarted(orchestratorName, "instance-1", "\"hello\""), + orchestratorCompleted() + ); + + TaskOrchestratorResult result = executor.execute(new ArrayList<>(), newEvents); + + List actions = new ArrayList<>(result.getActions()); + assertEquals(1, actions.size()); + + OrchestratorActions.WorkflowAction action = actions.get(0); + assertTrue(action.hasScheduleTask()); + + OrchestratorActions.ScheduleTaskAction scheduleTask = action.getScheduleTask(); + assertEquals(activityName, scheduleTask.getName()); + assertTrue(scheduleTask.hasHistoryPropagationScope()); + assertEquals(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_LINEAGE, + scheduleTask.getHistoryPropagationScope()); + } + + @Test + void callActivity_withOwnHistoryScope_setsHistoryPropagationScopeOnAction() { + final String orchestratorName = "ParentOrchestrator"; + final String activityName = "MyActivity"; + + TaskOrchestration orchestration = ctx -> { + TaskOptions options = TaskOptions.builder() + .historyPropagationScope(HistoryPropagationScope.OWN_HISTORY) + .build(); + ctx.callActivity(activityName, "input", options, String.class); + }; + + TaskOrchestrationExecutor executor = createExecutor(orchestratorName, orchestration); + + List newEvents = List.of( + orchestratorStarted(), + executionStarted(orchestratorName, "instance-1", "\"hello\""), + orchestratorCompleted() + ); + + TaskOrchestratorResult result = executor.execute(new ArrayList<>(), newEvents); + + List actions = new ArrayList<>(result.getActions()); + OrchestratorActions.ScheduleTaskAction scheduleTask = actions.get(0).getScheduleTask(); + assertTrue(scheduleTask.hasHistoryPropagationScope()); + assertEquals(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_OWN_HISTORY, + scheduleTask.getHistoryPropagationScope()); + } + + @Test + void callActivity_withoutScope_doesNotSetHistoryPropagationScope() { + final String orchestratorName = "ParentOrchestrator"; + final String activityName = "MyActivity"; + + TaskOrchestration orchestration = ctx -> { + ctx.callActivity(activityName, "input", null, String.class); + }; + + TaskOrchestrationExecutor executor = createExecutor(orchestratorName, orchestration); + + List newEvents = List.of( + orchestratorStarted(), + executionStarted(orchestratorName, "instance-1", "\"hello\""), + orchestratorCompleted() + ); + + TaskOrchestratorResult result = executor.execute(new ArrayList<>(), newEvents); + + List actions = new ArrayList<>(result.getActions()); + OrchestratorActions.ScheduleTaskAction scheduleTask = actions.get(0).getScheduleTask(); + assertFalse(scheduleTask.hasHistoryPropagationScope()); + } + + // ================================================================================== + // Tests for HistoryPropagationScope on CreateChildWorkflowAction (sub-orchestrations) + // ================================================================================== + + @Test + void callSubOrchestrator_withLineageScope_setsHistoryPropagationScopeOnAction() { + final String orchestratorName = "ParentOrchestrator"; + final String childName = "ChildOrchestrator"; + + TaskOrchestration orchestration = ctx -> { + TaskOptions options = TaskOptions.builder() + .historyPropagationScope(HistoryPropagationScope.LINEAGE) + .build(); + ctx.callSubOrchestrator(childName, "input", "child-1", options, String.class); + }; + + TaskOrchestrationExecutor executor = createExecutor(orchestratorName, orchestration); + + List newEvents = List.of( + orchestratorStarted(), + executionStarted(orchestratorName, "parent-1", "\"hello\""), + orchestratorCompleted() + ); + + TaskOrchestratorResult result = executor.execute(new ArrayList<>(), newEvents); + + List actions = new ArrayList<>(result.getActions()); + assertEquals(1, actions.size()); + + OrchestratorActions.WorkflowAction action = actions.get(0); + assertTrue(action.hasCreateChildWorkflow()); + + OrchestratorActions.CreateChildWorkflowAction createChild = action.getCreateChildWorkflow(); + assertEquals(childName, createChild.getName()); + assertTrue(createChild.hasHistoryPropagationScope()); + assertEquals(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_LINEAGE, + createChild.getHistoryPropagationScope()); + } + + @Test + void callSubOrchestrator_withOwnHistoryScope_setsHistoryPropagationScopeOnAction() { + final String orchestratorName = "ParentOrchestrator"; + final String childName = "ChildOrchestrator"; + + TaskOrchestration orchestration = ctx -> { + TaskOptions options = TaskOptions.builder() + .historyPropagationScope(HistoryPropagationScope.OWN_HISTORY) + .build(); + ctx.callSubOrchestrator(childName, "input", "child-1", options, String.class); + }; + + TaskOrchestrationExecutor executor = createExecutor(orchestratorName, orchestration); + + List newEvents = List.of( + orchestratorStarted(), + executionStarted(orchestratorName, "parent-1", "\"hello\""), + orchestratorCompleted() + ); + + TaskOrchestratorResult result = executor.execute(new ArrayList<>(), newEvents); + + List actions = new ArrayList<>(result.getActions()); + OrchestratorActions.CreateChildWorkflowAction createChild = actions.get(0).getCreateChildWorkflow(); + assertTrue(createChild.hasHistoryPropagationScope()); + assertEquals(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_OWN_HISTORY, + createChild.getHistoryPropagationScope()); + } + + @Test + void callSubOrchestrator_withoutScope_doesNotSetHistoryPropagationScope() { + final String orchestratorName = "ParentOrchestrator"; + final String childName = "ChildOrchestrator"; + + TaskOrchestration orchestration = ctx -> { + ctx.callSubOrchestrator(childName, "input", "child-1", null, String.class); + }; + + TaskOrchestrationExecutor executor = createExecutor(orchestratorName, orchestration); + + List newEvents = List.of( + orchestratorStarted(), + executionStarted(orchestratorName, "parent-1", "\"hello\""), + orchestratorCompleted() + ); + + TaskOrchestratorResult result = executor.execute(new ArrayList<>(), newEvents); + + List actions = new ArrayList<>(result.getActions()); + OrchestratorActions.CreateChildWorkflowAction createChild = actions.get(0).getCreateChildWorkflow(); + assertFalse(createChild.hasHistoryPropagationScope()); + } + + // ================================================================================== + // Tests for PropagatedHistory received by orchestrator + // ================================================================================== + + @Test + void execute_withPropagatedHistory_surfacesHistoryOnContext() { + final String orchestratorName = "ChildOrchestrator"; + final Optional[] captured = new Optional[]{Optional.empty()}; + + TaskOrchestration orchestration = ctx -> { + captured[0] = ctx.getPropagatedHistory(); + ctx.complete(null); + }; + + TaskOrchestrationExecutor executor = createExecutor(orchestratorName, orchestration); + + // Build a PropagatedHistory proto + HistoryEvents.HistoryEvent taskScheduled = HistoryEvents.HistoryEvent.newBuilder() + .setEventId(0) + .setTimestamp(Timestamp.newBuilder().setSeconds(1000).build()) + .setTaskScheduled(HistoryEvents.TaskScheduledEvent.newBuilder() + .setName("ValidateCard") + .build()) + .build(); + + HistoryEvents.PropagatedHistory propagatedHistoryProto = HistoryEvents.PropagatedHistory.newBuilder() + .addEvents(taskScheduled) + .setScope(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_LINEAGE) + .addChunks(HistoryEvents.PropagatedHistoryChunk.newBuilder() + .setAppId("payment-app") + .setStartEventIndex(0) + .setEventCount(1) + .setInstanceId("parent-instance-1") + .setWorkflowName("ProcessPayment") + .build()) + .build(); + + List newEvents = List.of( + orchestratorStarted(), + executionStarted(orchestratorName, "child-1", "\"input\""), + orchestratorCompleted() + ); + + executor.execute(new ArrayList<>(), newEvents, propagatedHistoryProto); + + // Verify the propagated history was surfaced on the context + assertTrue(captured[0].isPresent()); + PropagatedHistory history = captured[0].get(); + + assertEquals(HistoryPropagationScope.LINEAGE, history.getScope()); + assertEquals(1, history.getEvents().size()); + assertEquals("ValidateCard", history.getEvents().get(0).getTaskScheduled().getName()); + + assertEquals(1, history.getWorkflows().size()); + PropagatedHistoryChunk chunk = history.getWorkflows().get(0); + assertEquals("payment-app", chunk.getAppId()); + assertEquals("ProcessPayment", chunk.getWorkflowName()); + assertEquals("parent-instance-1", chunk.getInstanceId()); + assertEquals(0, chunk.getStartEventIndex()); + assertEquals(1, chunk.getEventCount()); + } + + @Test + void execute_withoutPropagatedHistory_returnsEmptyOptional() { + final String orchestratorName = "ChildOrchestrator"; + final Optional[] captured = new Optional[]{Optional.empty()}; + + TaskOrchestration orchestration = ctx -> { + captured[0] = ctx.getPropagatedHistory(); + ctx.complete(null); + }; + + TaskOrchestrationExecutor executor = createExecutor(orchestratorName, orchestration); + + List newEvents = List.of( + orchestratorStarted(), + executionStarted(orchestratorName, "child-1", "\"input\""), + orchestratorCompleted() + ); + + executor.execute(new ArrayList<>(), newEvents); + + assertFalse(captured[0].isPresent()); + } + + // ================================================================================== + // Tests for PropagatedHistory query methods + // ================================================================================== + + @Test + void propagatedHistory_getAppIDs_returnsDeduplicated() { + HistoryEvents.HistoryEvent event1 = HistoryEvents.HistoryEvent.newBuilder() + .setEventId(0) + .setTimestamp(Timestamp.newBuilder().setSeconds(1000).build()) + .setTaskScheduled(HistoryEvents.TaskScheduledEvent.newBuilder().setName("A").build()) + .build(); + HistoryEvents.HistoryEvent event2 = HistoryEvents.HistoryEvent.newBuilder() + .setEventId(1) + .setTimestamp(Timestamp.newBuilder().setSeconds(1001).build()) + .setTaskScheduled(HistoryEvents.TaskScheduledEvent.newBuilder().setName("B").build()) + .build(); + + HistoryEvents.PropagatedHistory proto = HistoryEvents.PropagatedHistory.newBuilder() + .addEvents(event1) + .addEvents(event2) + .setScope(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_LINEAGE) + .addChunks(HistoryEvents.PropagatedHistoryChunk.newBuilder() + .setAppId("app1") + .setStartEventIndex(0) + .setEventCount(1) + .setInstanceId("inst1") + .setWorkflowName("WF1") + .build()) + .addChunks(HistoryEvents.PropagatedHistoryChunk.newBuilder() + .setAppId("app1") + .setStartEventIndex(1) + .setEventCount(1) + .setInstanceId("inst2") + .setWorkflowName("WF2") + .build()) + .build(); + + PropagatedHistory history = PropagatedHistory.fromProto(proto); + + List appIds = history.getAppIDs(); + assertEquals(1, appIds.size()); + assertEquals("app1", appIds.get(0)); + } + + @Test + void propagatedHistory_getWorkflowByName_returnsLastMatch() { + HistoryEvents.HistoryEvent event1 = HistoryEvents.HistoryEvent.newBuilder() + .setEventId(0) + .setTimestamp(Timestamp.newBuilder().setSeconds(1000).build()) + .setTaskScheduled(HistoryEvents.TaskScheduledEvent.newBuilder().setName("A").build()) + .build(); + HistoryEvents.HistoryEvent event2 = HistoryEvents.HistoryEvent.newBuilder() + .setEventId(1) + .setTimestamp(Timestamp.newBuilder().setSeconds(1001).build()) + .setTaskScheduled(HistoryEvents.TaskScheduledEvent.newBuilder().setName("B").build()) + .build(); + + HistoryEvents.PropagatedHistory proto = HistoryEvents.PropagatedHistory.newBuilder() + .addEvents(event1) + .addEvents(event2) + .setScope(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_LINEAGE) + .addChunks(HistoryEvents.PropagatedHistoryChunk.newBuilder() + .setAppId("app1") + .setStartEventIndex(0) + .setEventCount(1) + .setInstanceId("inst1") + .setWorkflowName("ProcessPayment") + .build()) + .addChunks(HistoryEvents.PropagatedHistoryChunk.newBuilder() + .setAppId("app2") + .setStartEventIndex(1) + .setEventCount(1) + .setInstanceId("inst2") + .setWorkflowName("ProcessPayment") + .build()) + .build(); + + PropagatedHistory history = PropagatedHistory.fromProto(proto); + + Optional result = history.getWorkflowByName("ProcessPayment"); + assertTrue(result.isPresent()); + assertEquals("inst2", result.get().getInstanceId()); + assertEquals("app2", result.get().getAppId()); + } + + @Test + void propagatedHistory_getWorkflowByName_returnsEmptyForMissing() { + HistoryEvents.PropagatedHistory proto = HistoryEvents.PropagatedHistory.newBuilder() + .setScope(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_OWN_HISTORY) + .addChunks(HistoryEvents.PropagatedHistoryChunk.newBuilder() + .setAppId("app1") + .setStartEventIndex(0) + .setEventCount(0) + .setInstanceId("inst1") + .setWorkflowName("WF1") + .build()) + .build(); + + PropagatedHistory history = PropagatedHistory.fromProto(proto); + + Optional result = history.getWorkflowByName("NonExistent"); + assertFalse(result.isPresent()); + } + + @Test + void propagatedHistory_getEventsByAppID_filtersCorrectly() { + HistoryEvents.HistoryEvent event1 = HistoryEvents.HistoryEvent.newBuilder() + .setEventId(0) + .setTimestamp(Timestamp.newBuilder().setSeconds(1000).build()) + .setTaskScheduled(HistoryEvents.TaskScheduledEvent.newBuilder().setName("A").build()) + .build(); + HistoryEvents.HistoryEvent event2 = HistoryEvents.HistoryEvent.newBuilder() + .setEventId(1) + .setTimestamp(Timestamp.newBuilder().setSeconds(1001).build()) + .setTaskScheduled(HistoryEvents.TaskScheduledEvent.newBuilder().setName("B").build()) + .build(); + HistoryEvents.HistoryEvent event3 = HistoryEvents.HistoryEvent.newBuilder() + .setEventId(2) + .setTimestamp(Timestamp.newBuilder().setSeconds(1002).build()) + .setTaskScheduled(HistoryEvents.TaskScheduledEvent.newBuilder().setName("C").build()) + .build(); + + HistoryEvents.PropagatedHistory proto = HistoryEvents.PropagatedHistory.newBuilder() + .addEvents(event1) + .addEvents(event2) + .addEvents(event3) + .setScope(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_LINEAGE) + .addChunks(HistoryEvents.PropagatedHistoryChunk.newBuilder() + .setAppId("app1") + .setStartEventIndex(0) + .setEventCount(2) + .setInstanceId("inst1") + .setWorkflowName("WF1") + .build()) + .addChunks(HistoryEvents.PropagatedHistoryChunk.newBuilder() + .setAppId("app2") + .setStartEventIndex(2) + .setEventCount(1) + .setInstanceId("inst2") + .setWorkflowName("WF2") + .build()) + .build(); + + PropagatedHistory history = PropagatedHistory.fromProto(proto); + + List app1Events = history.getEventsByAppID("app1"); + assertEquals(2, app1Events.size()); + assertEquals("A", app1Events.get(0).getTaskScheduled().getName()); + assertEquals("B", app1Events.get(1).getTaskScheduled().getName()); + + List app2Events = history.getEventsByAppID("app2"); + assertEquals(1, app2Events.size()); + assertEquals("C", app2Events.get(0).getTaskScheduled().getName()); + } + + @Test + void propagatedHistory_getEventsByInstanceID_filtersCorrectly() { + HistoryEvents.HistoryEvent event1 = HistoryEvents.HistoryEvent.newBuilder() + .setEventId(0) + .setTimestamp(Timestamp.newBuilder().setSeconds(1000).build()) + .setTaskScheduled(HistoryEvents.TaskScheduledEvent.newBuilder().setName("A").build()) + .build(); + HistoryEvents.HistoryEvent event2 = HistoryEvents.HistoryEvent.newBuilder() + .setEventId(1) + .setTimestamp(Timestamp.newBuilder().setSeconds(1001).build()) + .setTaskScheduled(HistoryEvents.TaskScheduledEvent.newBuilder().setName("B").build()) + .build(); + + HistoryEvents.PropagatedHistory proto = HistoryEvents.PropagatedHistory.newBuilder() + .addEvents(event1) + .addEvents(event2) + .setScope(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_LINEAGE) + .addChunks(HistoryEvents.PropagatedHistoryChunk.newBuilder() + .setAppId("app1") + .setStartEventIndex(0) + .setEventCount(1) + .setInstanceId("inst-abc") + .setWorkflowName("WF1") + .build()) + .addChunks(HistoryEvents.PropagatedHistoryChunk.newBuilder() + .setAppId("app1") + .setStartEventIndex(1) + .setEventCount(1) + .setInstanceId("inst-xyz") + .setWorkflowName("WF2") + .build()) + .build(); + + PropagatedHistory history = PropagatedHistory.fromProto(proto); + + List events = history.getEventsByInstanceID("inst-abc"); + assertEquals(1, events.size()); + assertEquals("A", events.get(0).getTaskScheduled().getName()); + } + + @Test + void propagatedHistory_getEventsByWorkflowName_filtersCorrectly() { + HistoryEvents.HistoryEvent event1 = HistoryEvents.HistoryEvent.newBuilder() + .setEventId(0) + .setTimestamp(Timestamp.newBuilder().setSeconds(1000).build()) + .setTaskScheduled(HistoryEvents.TaskScheduledEvent.newBuilder().setName("A").build()) + .build(); + + HistoryEvents.PropagatedHistory proto = HistoryEvents.PropagatedHistory.newBuilder() + .addEvents(event1) + .setScope(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_OWN_HISTORY) + .addChunks(HistoryEvents.PropagatedHistoryChunk.newBuilder() + .setAppId("app1") + .setStartEventIndex(0) + .setEventCount(1) + .setInstanceId("inst1") + .setWorkflowName("ProcessPayment") + .build()) + .build(); + + PropagatedHistory history = PropagatedHistory.fromProto(proto); + + List events = history.getEventsByWorkflowName("ProcessPayment"); + assertEquals(1, events.size()); + assertEquals("A", events.get(0).getTaskScheduled().getName()); + + List empty = history.getEventsByWorkflowName("NonExistent"); + assertTrue(empty.isEmpty()); + } + + // ================================================================================== + // Tests for HistoryPropagationScope enum + // ================================================================================== + + @Test + void historyPropagationScope_toProto_convertsCorrectly() { + assertEquals(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_NONE, + HistoryPropagationScope.NONE.toProto()); + assertEquals(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_OWN_HISTORY, + HistoryPropagationScope.OWN_HISTORY.toProto()); + assertEquals(Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_LINEAGE, + HistoryPropagationScope.LINEAGE.toProto()); + } + + @Test + void historyPropagationScope_fromProto_convertsCorrectly() { + assertEquals(HistoryPropagationScope.NONE, + HistoryPropagationScope.fromProto( + Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_NONE)); + assertEquals(HistoryPropagationScope.OWN_HISTORY, + HistoryPropagationScope.fromProto( + Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_OWN_HISTORY)); + assertEquals(HistoryPropagationScope.LINEAGE, + HistoryPropagationScope.fromProto( + Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_LINEAGE)); + } + + // ================================================================================== + // Tests for TaskOptions builder with historyPropagationScope + // ================================================================================== + + @Test + void taskOptions_builder_setsHistoryPropagationScope() { + TaskOptions options = TaskOptions.builder() + .historyPropagationScope(HistoryPropagationScope.LINEAGE) + .appID("myApp") + .build(); + + assertEquals(HistoryPropagationScope.LINEAGE, options.getHistoryPropagationScope()); + assertTrue(options.hasHistoryPropagationScope()); + assertEquals("myApp", options.getAppID()); + } + + @Test + void taskOptions_builder_withoutScope_hasNoHistoryPropagationScope() { + TaskOptions options = TaskOptions.builder() + .appID("myApp") + .build(); + + assertNull(options.getHistoryPropagationScope()); + assertFalse(options.hasHistoryPropagationScope()); + } + + @Test + void taskOptions_noneScope_isNotConsideredSet() { + TaskOptions options = TaskOptions.builder() + .historyPropagationScope(HistoryPropagationScope.NONE) + .build(); + + assertEquals(HistoryPropagationScope.NONE, options.getHistoryPropagationScope()); + assertFalse(options.hasHistoryPropagationScope()); + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripWorkflow.java b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripWorkflow.java index f375363edd..f390fa1b71 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripWorkflow.java +++ b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripWorkflow.java @@ -16,44 +16,35 @@ import io.dapr.durabletask.TaskFailedException; import io.dapr.workflows.Workflow; import io.dapr.workflows.WorkflowStub; -import io.dapr.workflows.WorkflowTaskOptions; -import io.dapr.workflows.WorkflowTaskRetryPolicy; - -import java.util.List; -import java.util.ArrayList; -import java.util.Collections; -import java.time.Duration; public class BookTripWorkflow implements Workflow { @Override public WorkflowStub create() { return ctx -> { ctx.getLogger().info("Starting Workflow: " + ctx.getName()); - List compensations = new ArrayList<>(); - - // Define retry policy for compensation activities - WorkflowTaskRetryPolicy compensationRetryPolicy = WorkflowTaskRetryPolicy.newBuilder() - .setFirstRetryInterval(Duration.ofSeconds(1)) - .setMaxNumberOfAttempts(3) - .build(); - - WorkflowTaskOptions compensationOptions = new WorkflowTaskOptions(compensationRetryPolicy); + CompensationHelper compensationHelper = new CompensationHelper(); try { // Book flight - String flightResult = ctx.callActivity(BookFlightActivity.class.getName(), null, String.class).await(); + String flightResult = ctx.callActivity( + BookFlightActivity.class.getName(), null, String.class).await(); ctx.getLogger().info("Flight booking completed: {}", flightResult); - compensations.add("CancelFlight"); + compensationHelper.addCompensation("CancelFlight", () -> + ctx.callActivity(CancelFlightActivity.class.getName(), null, String.class).await()); // Book hotel - String hotelResult = ctx.callActivity(BookHotelActivity.class.getName(), null, String.class).await(); + String hotelResult = ctx.callActivity( + BookHotelActivity.class.getName(), null, String.class).await(); ctx.getLogger().info("Hotel booking completed: {}", hotelResult); - compensations.add("CancelHotel"); + compensationHelper.addCompensation("CancelHotel", () -> + ctx.callActivity(CancelHotelActivity.class.getName(), null, String.class).await()); // Book car - String carResult = ctx.callActivity(BookCarActivity.class.getName(), null, String.class).await(); + String carResult = ctx.callActivity( + BookCarActivity.class.getName(), null, String.class).await(); ctx.getLogger().info("Car booking completed: {}", carResult); - compensations.add("CancelCar"); + compensationHelper.addCompensation("CancelCar", () -> + ctx.callActivity(CancelCarActivity.class.getName(), null, String.class).await()); String result = String.format("%s, %s, %s", flightResult, hotelResult, carResult); ctx.getLogger().info("Trip booked successfully: {}", result); @@ -62,44 +53,7 @@ public WorkflowStub create() { } catch (TaskFailedException e) { ctx.getLogger().info("******** executing compensation logic ********"); ctx.getLogger().error("Activity failed: {}", e.getMessage()); - - // Execute compensations in reverse order - Collections.reverse(compensations); - for (String compensation : compensations) { - try { - switch (compensation) { - case "CancelCar": - String carCancelResult = ctx.callActivity( - CancelCarActivity.class.getName(), - null, - compensationOptions, - String.class).await(); - ctx.getLogger().info("Car cancellation completed: {}", carCancelResult); - break; - - case "CancelHotel": - String hotelCancelResult = ctx.callActivity( - CancelHotelActivity.class.getName(), - null, - compensationOptions, - String.class).await(); - ctx.getLogger().info("Hotel cancellation completed: {}", hotelCancelResult); - break; - - case "CancelFlight": - String flightCancelResult = ctx.callActivity( - CancelFlightActivity.class.getName(), - null, - compensationOptions, - String.class).await(); - ctx.getLogger().info("Flight cancellation completed: {}", flightCancelResult); - break; - } - } catch (TaskFailedException ex) { - // Only catch TaskFailedException for actual activity failures - ctx.getLogger().error("Activity failed during compensation: {}", ex.getMessage()); - } - } + compensationHelper.compensate(); ctx.complete("Workflow failed, compensation applied"); } }; diff --git a/examples/src/main/java/io/dapr/examples/workflows/compensation/CompensationHelper.java b/examples/src/main/java/io/dapr/examples/workflows/compensation/CompensationHelper.java new file mode 100644 index 0000000000..15901acd81 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/compensation/CompensationHelper.java @@ -0,0 +1,37 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.compensation; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class CompensationHelper { + + private final Map compensations = new LinkedHashMap<>(); + + public void addCompensation(String name, Runnable compensation) { + compensations.put(name, compensation); + } + + public void compensate() { + List keys = new ArrayList<>(compensations.keySet()); + Collections.reverse(keys); + for (String key : keys) { + compensations.get(key).run(); + } + } +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java index 229caaf88c..13b3acc9aa 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java @@ -8,13 +8,16 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and -limitations under the License. -*/ + * limitations under the License. + */ package io.dapr.workflows; +import io.dapr.durabletask.PropagatedHistory; import org.slf4j.Logger; +import java.util.Optional; + public interface WorkflowActivityContext { Logger getLogger(); @@ -26,4 +29,11 @@ public interface WorkflowActivityContext { T getInput(Class targetType); String getTraceParent(); + + /** + * Gets the propagated history from a parent workflow, if any was propagated. + * + * @return an Optional containing the propagated history, or empty if none was propagated + */ + Optional getPropagatedHistory(); } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java index 1d93f71c85..fd7c8a6d92 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java @@ -14,6 +14,7 @@ package io.dapr.workflows; import io.dapr.durabletask.CompositeTaskFailedException; +import io.dapr.durabletask.PropagatedHistory; import io.dapr.durabletask.Task; import io.dapr.durabletask.TaskCanceledException; import io.dapr.durabletask.TaskFailedException; @@ -25,6 +26,7 @@ import java.time.ZonedDateTime; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.UUID; /** @@ -539,4 +541,11 @@ default UUID newUuid() { * @return true if already applied */ boolean isPatched(String patchName); + + /** + * Gets the propagated history from a parent workflow, if any was propagated. + * + * @return an Optional containing the propagated history, or empty if none was propagated + */ + Optional getPropagatedHistory(); } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskOptions.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskOptions.java index fe5643d110..0dbb5887c0 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskOptions.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskOptions.java @@ -8,27 +8,30 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and -limitations under the License. -*/ + * limitations under the License. + */ package io.dapr.workflows; +import io.dapr.durabletask.HistoryPropagationScope; + public class WorkflowTaskOptions { private final WorkflowTaskRetryPolicy retryPolicy; private final WorkflowTaskRetryHandler retryHandler; private final String appId; + private final HistoryPropagationScope historyPropagationScope; public WorkflowTaskOptions(WorkflowTaskRetryPolicy retryPolicy, WorkflowTaskRetryHandler retryHandler) { - this(retryPolicy, retryHandler, null); + this(retryPolicy, retryHandler, null, null); } public WorkflowTaskOptions(WorkflowTaskRetryPolicy retryPolicy) { - this(retryPolicy, null, null); + this(retryPolicy, null, null, null); } public WorkflowTaskOptions(WorkflowTaskRetryHandler retryHandler) { - this(null, retryHandler, null); + this(null, retryHandler, null, null); } /** @@ -37,28 +40,42 @@ public WorkflowTaskOptions(WorkflowTaskRetryHandler retryHandler) { * @param appId the ID of the app to call the activity in */ public WorkflowTaskOptions(String appId) { - this(null, null, appId); + this(null, null, appId, null); } /** - * Constructor for WorkflowTaskOptions with retry policy, retry handler, and app ID. - * - * @param retryPolicy the retry policy - * @param retryHandler the retry handler - * @param appId the app ID for cross-app activity calls - */ + * Constructor for WorkflowTaskOptions with retry policy, retry handler, and app ID. + * + * @param retryPolicy the retry policy + * @param retryHandler the retry handler + * @param appId the app ID for cross-app activity calls + */ public WorkflowTaskOptions(WorkflowTaskRetryPolicy retryPolicy, WorkflowTaskRetryHandler retryHandler, String appId) { + this(retryPolicy, retryHandler, appId, null); + } + + /** + * Constructor for WorkflowTaskOptions with all options. + * + * @param retryPolicy the retry policy + * @param retryHandler the retry handler + * @param appId the app ID for cross-app activity calls + * @param historyPropagationScope the history propagation scope + */ + public WorkflowTaskOptions(WorkflowTaskRetryPolicy retryPolicy, WorkflowTaskRetryHandler retryHandler, + String appId, HistoryPropagationScope historyPropagationScope) { this.retryPolicy = retryPolicy; this.retryHandler = retryHandler; this.appId = appId; + this.historyPropagationScope = historyPropagationScope; } public WorkflowTaskOptions(WorkflowTaskRetryPolicy retryPolicy, String appId) { - this(retryPolicy, null, appId); + this(retryPolicy, null, appId, null); } public WorkflowTaskOptions(WorkflowTaskRetryHandler retryHandler, String appId) { - this(null, retryHandler, appId); + this(null, retryHandler, appId, null); } public WorkflowTaskRetryPolicy getRetryPolicy() { @@ -73,4 +90,38 @@ public String getAppId() { return appId; } + public HistoryPropagationScope getHistoryPropagationScope() { + return historyPropagationScope; + } + + /** + * Creates a WorkflowTaskOptions configured to propagate execution history + * with the specified scope. + * + * @param scope the history propagation scope + * @return a new WorkflowTaskOptions with history propagation configured + */ + public static WorkflowTaskOptions withHistoryPropagation(HistoryPropagationScope scope) { + return new WorkflowTaskOptions(null, null, null, scope); + } + + /** + * Creates a WorkflowTaskOptions configured to propagate the caller's own history + * plus the full ancestor chain. + * + * @return a new WorkflowTaskOptions with lineage propagation + */ + public static WorkflowTaskOptions propagateLineage() { + return withHistoryPropagation(HistoryPropagationScope.LINEAGE); + } + + /** + * Creates a WorkflowTaskOptions configured to propagate only the caller's own history + * (trust boundary - ancestors are dropped). + * + * @return a new WorkflowTaskOptions with own-history propagation + */ + public static WorkflowTaskOptions propagateOwnHistory() { + return withHistoryPropagation(HistoryPropagationScope.OWN_HISTORY); + } } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java index b1f08c5af7..4763e90527 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java @@ -13,11 +13,14 @@ package io.dapr.workflows.runtime; +import io.dapr.durabletask.PropagatedHistory; import io.dapr.durabletask.TaskActivityContext; import io.dapr.workflows.WorkflowActivityContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Optional; + /** * Wrapper for Durable Task Framework {@link TaskActivityContext}. */ @@ -107,4 +110,9 @@ public String getTraceParent() { public String getTaskExecutionId() { return this.innerContext.getTaskExecutionId(); } + + @Override + public Optional getPropagatedHistory() { + return this.innerContext.getPropagatedHistory(); + } } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java index 507fadc933..1ee6591969 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java @@ -14,6 +14,7 @@ package io.dapr.workflows.runtime; import io.dapr.durabletask.CompositeTaskFailedException; +import io.dapr.durabletask.PropagatedHistory; import io.dapr.durabletask.RetryHandler; import io.dapr.durabletask.RetryPolicy; import io.dapr.durabletask.Task; @@ -35,6 +36,7 @@ import java.time.Instant; import java.time.ZonedDateTime; import java.util.List; +import java.util.Optional; import java.util.UUID; public class DefaultWorkflowContext implements WorkflowContext { @@ -261,6 +263,7 @@ private TaskOptions toTaskOptions(WorkflowTaskOptions options) { .retryPolicy(retryPolicy) .retryHandler(retryHandler) .appID(options.getAppId()) + .historyPropagationScope(options.getHistoryPropagationScope()) .build(); } @@ -326,4 +329,9 @@ public void setCustomStatus(Object status) { public boolean isPatched(String patchName) { return this.innerContext.isPatched(patchName); } + + @Override + public Optional getPropagatedHistory() { + return this.innerContext.getPropagatedHistory(); + } } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java index f1a54690b9..920f3c0804 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java @@ -14,6 +14,8 @@ package io.dapr.workflows; import io.dapr.durabletask.CompositeTaskFailedException; +import io.dapr.durabletask.HistoryPropagationScope; +import io.dapr.durabletask.PropagatedHistory; import io.dapr.durabletask.RetryContext; import io.dapr.durabletask.RetryHandler; import io.dapr.durabletask.Task; @@ -32,6 +34,7 @@ import java.time.ZonedDateTime; import java.util.Arrays; import java.util.List; +import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -150,6 +153,11 @@ public void setCustomStatus(Object status) { public boolean isPatched(String patchName) { return false; } + + @Override + public Optional getPropagatedHistory() { + return Optional.empty(); + } }; context = new DefaultWorkflowContext(mockInnerContext); contextWithClass = new DefaultWorkflowContext(mockInnerContext, testWorkflowContext.getClass()); @@ -563,4 +571,40 @@ public void callActivityRetryPolicyDefaultMaxRetryIntervalShouldBeZeroWhenNotSet assertEquals(Duration.ZERO, captor.getValue().getRetryPolicy().getMaxRetryInterval()); } + + @Test + public void callActivityWithHistoryPropagationScope() { + String expectedName = "TestActivity"; + String expectedInput = "TestInput"; + WorkflowTaskOptions options = WorkflowTaskOptions.propagateLineage(); + ArgumentCaptor captor = ArgumentCaptor.forClass(TaskOptions.class); + + context.callActivity(expectedName, expectedInput, options, String.class); + + verify(mockInnerContext, times(1)) + .callActivity(eq(expectedName), eq(expectedInput), captor.capture(), eq(String.class)); + + assertEquals(HistoryPropagationScope.LINEAGE, captor.getValue().getHistoryPropagationScope()); + } + + @Test + public void getPropagatedHistoryDelegatesToInnerContext() { + PropagatedHistory mockHistory = mock(PropagatedHistory.class); + when(mockInnerContext.getPropagatedHistory()).thenReturn(Optional.of(mockHistory)); + + Optional result = context.getPropagatedHistory(); + + assertTrue(result.isPresent()); + assertEquals(mockHistory, result.get()); + verify(mockInnerContext, times(1)).getPropagatedHistory(); + } + + @Test + public void getPropagatedHistoryReturnsEmptyWhenNone() { + when(mockInnerContext.getPropagatedHistory()).thenReturn(Optional.empty()); + + Optional result = context.getPropagatedHistory(); + + assertTrue(result.isEmpty()); + } } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/WorkflowTaskOptionsTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/WorkflowTaskOptionsTest.java index 5ef82ecf78..55f7e6b649 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/WorkflowTaskOptionsTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/WorkflowTaskOptionsTest.java @@ -13,6 +13,7 @@ package io.dapr.workflows; +import io.dapr.durabletask.HistoryPropagationScope; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.*; @@ -92,11 +93,64 @@ void testConstructorWithRetryPolicyAndAppId() { void testConstructorWithRetryHandlerAndAppId() { WorkflowTaskRetryHandler retryHandler = (context) -> true; String appId = "test-app"; - + WorkflowTaskOptions options = new WorkflowTaskOptions(retryHandler, appId); - + assertNull(options.getRetryPolicy()); assertEquals(retryHandler, options.getRetryHandler()); assertEquals(appId, options.getAppId()); } + + @Test + void testWithHistoryPropagation_lineage() { + WorkflowTaskOptions options = WorkflowTaskOptions.withHistoryPropagation(HistoryPropagationScope.LINEAGE); + + assertEquals(HistoryPropagationScope.LINEAGE, options.getHistoryPropagationScope()); + assertNull(options.getRetryPolicy()); + assertNull(options.getRetryHandler()); + assertNull(options.getAppId()); + } + + @Test + void testWithHistoryPropagation_ownHistory() { + WorkflowTaskOptions options = WorkflowTaskOptions.withHistoryPropagation(HistoryPropagationScope.OWN_HISTORY); + + assertEquals(HistoryPropagationScope.OWN_HISTORY, options.getHistoryPropagationScope()); + } + + @Test + void testPropagateLineage_factory() { + WorkflowTaskOptions options = WorkflowTaskOptions.propagateLineage(); + + assertEquals(HistoryPropagationScope.LINEAGE, options.getHistoryPropagationScope()); + } + + @Test + void testPropagateOwnHistory_factory() { + WorkflowTaskOptions options = WorkflowTaskOptions.propagateOwnHistory(); + + assertEquals(HistoryPropagationScope.OWN_HISTORY, options.getHistoryPropagationScope()); + } + + @Test + void testConstructorWithAllParametersIncludingScope() { + WorkflowTaskRetryPolicy retryPolicy = WorkflowTaskRetryPolicy.newBuilder().build(); + WorkflowTaskRetryHandler retryHandler = (context) -> true; + String appId = "test-app"; + + WorkflowTaskOptions options = new WorkflowTaskOptions(retryPolicy, retryHandler, appId, + HistoryPropagationScope.LINEAGE); + + assertEquals(retryPolicy, options.getRetryPolicy()); + assertEquals(retryHandler, options.getRetryHandler()); + assertEquals(appId, options.getAppId()); + assertEquals(HistoryPropagationScope.LINEAGE, options.getHistoryPropagationScope()); + } + + @Test + void testConstructorWithoutScope_returnsNull() { + WorkflowTaskOptions options = new WorkflowTaskOptions("test-app"); + + assertNull(options.getHistoryPropagationScope()); + } } diff --git a/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/WorkflowPatternsRestController.java b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/WorkflowPatternsRestController.java index 4bb1b0a241..9381152f18 100644 --- a/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/WorkflowPatternsRestController.java +++ b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/WorkflowPatternsRestController.java @@ -15,6 +15,7 @@ import io.dapr.spring.workflows.config.EnableDaprWorkflows; import io.dapr.springboot.examples.wfp.chain.ChainWorkflow; +import io.dapr.springboot.examples.wfp.compensation.BookTripWorkflow; import io.dapr.springboot.examples.wfp.child.ParentWorkflow; import io.dapr.springboot.examples.wfp.continueasnew.CleanUpLog; import io.dapr.springboot.examples.wfp.continueasnew.ContinueAsNewWorkflow; @@ -191,6 +192,19 @@ public Decision suspendResumeContinue(@RequestParam("orderId") String orderId, @ return workflowInstanceStatus.readOutputAs(Decision.class); } + /** + * Run Compensation Demo Workflow (Book Trip with Saga pattern). + * @return the output of the BookTripWorkflow execution + */ + @PostMapping("wfp/compensation") + public String compensation() throws TimeoutException { + String instanceId = daprWorkflowClient.scheduleNewWorkflow(BookTripWorkflow.class); + logger.info("Workflow instance " + instanceId + " started"); + return daprWorkflowClient + .waitForWorkflowCompletion(instanceId, Duration.ofSeconds(30), true) + .readOutputAs(String.class); + } + @PostMapping("wfp/durationtimer") public String durationTimerWorkflow() { return daprWorkflowClient.scheduleNewWorkflow(DurationTimerWorkflow.class); diff --git a/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/compensation/BookCarActivity.java b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/compensation/BookCarActivity.java new file mode 100644 index 0000000000..0026e674df --- /dev/null +++ b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/compensation/BookCarActivity.java @@ -0,0 +1,43 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.springboot.examples.wfp.compensation; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +import org.springframework.stereotype.Component; + +@Component +public class BookCarActivity implements WorkflowActivity { + private static final Logger logger = LoggerFactory.getLogger(BookCarActivity.class); + + @Override + public Object run(WorkflowActivityContext ctx) { + logger.info("Starting Activity: " + ctx.getName()); + + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + logger.info("Forcing Failure to trigger compensation for activity: " + ctx.getName()); + throw new RuntimeException("Failed to book car"); + } +} diff --git a/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/compensation/BookFlightActivity.java b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/compensation/BookFlightActivity.java new file mode 100644 index 0000000000..450942f6e0 --- /dev/null +++ b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/compensation/BookFlightActivity.java @@ -0,0 +1,43 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.springboot.examples.wfp.compensation; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeUnit; + +@Component +public class BookFlightActivity implements WorkflowActivity { + private static final Logger logger = LoggerFactory.getLogger(BookFlightActivity.class); + + @Override + public Object run(WorkflowActivityContext ctx) { + logger.info("Starting Activity: " + ctx.getName()); + + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + String result = "Flight booked successfully"; + logger.info("Activity completed with result: " + result); + return result; + } +} diff --git a/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/compensation/BookHotelActivity.java b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/compensation/BookHotelActivity.java new file mode 100644 index 0000000000..b4434ad17f --- /dev/null +++ b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/compensation/BookHotelActivity.java @@ -0,0 +1,42 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.springboot.examples.wfp.compensation; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +@Component +public class BookHotelActivity implements WorkflowActivity { + private static final Logger logger = LoggerFactory.getLogger(BookHotelActivity.class); + + @Override + public Object run(WorkflowActivityContext ctx) { + logger.info("Starting Activity: " + ctx.getName()); + + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("Activity '{}' was interrupted.", ctx.getName(), e); + throw new RuntimeException("Activity was interrupted", e); + } + + String result = "Hotel booked successfully"; + logger.info("Activity completed with result: " + result); + return result; + } +} diff --git a/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/compensation/BookTripWorkflow.java b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/compensation/BookTripWorkflow.java new file mode 100644 index 0000000000..9f2253053f --- /dev/null +++ b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/compensation/BookTripWorkflow.java @@ -0,0 +1,97 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.springboot.examples.wfp.compensation; + +import io.dapr.durabletask.TaskFailedException; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import io.dapr.workflows.WorkflowTaskOptions; +import io.dapr.workflows.WorkflowTaskRetryPolicy; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +@Component +public class BookTripWorkflow implements Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + List compensations = new ArrayList<>(); + + WorkflowTaskRetryPolicy compensationRetryPolicy = WorkflowTaskRetryPolicy.newBuilder() + .setFirstRetryInterval(Duration.ofSeconds(1)) + .setMaxNumberOfAttempts(3) + .build(); + + WorkflowTaskOptions compensationOptions = new WorkflowTaskOptions(compensationRetryPolicy); + + try { + String flightResult = ctx.callActivity( + BookFlightActivity.class.getName(), null, String.class).await(); + ctx.getLogger().info("Flight booking completed: {}", flightResult); + compensations.add("CancelFlight"); + + String hotelResult = ctx.callActivity( + BookHotelActivity.class.getName(), null, String.class).await(); + ctx.getLogger().info("Hotel booking completed: {}", hotelResult); + compensations.add("CancelHotel"); + + String carResult = ctx.callActivity( + BookCarActivity.class.getName(), null, String.class).await(); + ctx.getLogger().info("Car booking completed: {}", carResult); + compensations.add("CancelCar"); + + String result = String.format("%s, %s, %s", flightResult, hotelResult, carResult); + ctx.getLogger().info("Trip booked successfully: {}", result); + ctx.complete(result); + + } catch (TaskFailedException e) { + ctx.getLogger().info("******** executing compensation logic ********"); + ctx.getLogger().error("Activity failed", e); + + Collections.reverse(compensations); + for (String compensation : compensations) { + try { + switch (compensation) { + case "CancelCar": + String carCancelResult = ctx.callActivity( + CancelCarActivity.class.getName(), null, compensationOptions, String.class).await(); + ctx.getLogger().info("Car cancellation completed: {}", carCancelResult); + break; + case "CancelHotel": + String hotelCancelResult = ctx.callActivity( + CancelHotelActivity.class.getName(), null, compensationOptions, String.class).await(); + ctx.getLogger().info("Hotel cancellation completed: {}", hotelCancelResult); + break; + case "CancelFlight": + String flightCancelResult = ctx.callActivity( + CancelFlightActivity.class.getName(), null, compensationOptions, String.class).await(); + ctx.getLogger().info("Flight cancellation completed: {}", flightCancelResult); + break; + default: + break; + } + } catch (TaskFailedException ex) { + ctx.getLogger().error("Activity failed during compensation", ex); + } + } + ctx.complete("Workflow failed, compensation applied"); + } + }; + } +} diff --git a/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/compensation/CancelCarActivity.java b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/compensation/CancelCarActivity.java new file mode 100644 index 0000000000..9c6143e2f2 --- /dev/null +++ b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/compensation/CancelCarActivity.java @@ -0,0 +1,43 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.springboot.examples.wfp.compensation; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeUnit; + +@Component +public class CancelCarActivity implements WorkflowActivity { + private static final Logger logger = LoggerFactory.getLogger(CancelCarActivity.class); + + @Override + public Object run(WorkflowActivityContext ctx) { + logger.info("Starting Activity: " + ctx.getName()); + + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + String result = "Car canceled successfully"; + logger.info("Activity completed with result: " + result); + return result; + } +} diff --git a/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/compensation/CancelFlightActivity.java b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/compensation/CancelFlightActivity.java new file mode 100644 index 0000000000..f24970613b --- /dev/null +++ b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/compensation/CancelFlightActivity.java @@ -0,0 +1,43 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.springboot.examples.wfp.compensation; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeUnit; + +@Component +public class CancelFlightActivity implements WorkflowActivity { + private static final Logger logger = LoggerFactory.getLogger(CancelFlightActivity.class); + + @Override + public Object run(WorkflowActivityContext ctx) { + logger.info("Starting Activity: " + ctx.getName()); + + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + String result = "Flight canceled successfully"; + logger.info("Activity completed with result: " + result); + return result; + } +} diff --git a/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/compensation/CancelHotelActivity.java b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/compensation/CancelHotelActivity.java new file mode 100644 index 0000000000..1d4b741dcb --- /dev/null +++ b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/compensation/CancelHotelActivity.java @@ -0,0 +1,43 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.springboot.examples.wfp.compensation; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeUnit; + +@Component +public class CancelHotelActivity implements WorkflowActivity { + private static final Logger logger = LoggerFactory.getLogger(CancelHotelActivity.class); + + @Override + public Object run(WorkflowActivityContext ctx) { + logger.info("Starting Activity: " + ctx.getName()); + + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + String result = "Hotel canceled successfully"; + logger.info("Activity completed with result: " + result); + return result; + } +} diff --git a/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/historypropagation/FraudCheckActivity.java b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/historypropagation/FraudCheckActivity.java new file mode 100644 index 0000000000..6ca777d6d6 --- /dev/null +++ b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/historypropagation/FraudCheckActivity.java @@ -0,0 +1,42 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dapr.springboot.examples.wfp.historypropagation; + +import io.dapr.durabletask.PropagatedHistory; +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.springframework.stereotype.Component; + +import java.util.Optional; + +/** + * Activity that performs fraud checks. + * Receives propagated history with OWN_HISTORY scope from FraudDetectionWorkflow. + */ +@Component +public class FraudCheckActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + ctx.getLogger().info("Running fraud check for: " + ctx.getInput(String.class)); + + Optional historyOpt = ctx.getPropagatedHistory(); + if (historyOpt.isPresent()) { + PropagatedHistory history = historyOpt.get(); + ctx.getLogger().info("Fraud check has access to propagated history with " + + history.getEvents().size() + " events (scope: " + history.getScope() + ")"); + } + + return "passed"; + } +} diff --git a/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/historypropagation/FraudDetectionWorkflow.java b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/historypropagation/FraudDetectionWorkflow.java new file mode 100644 index 0000000000..b0a9f4cb43 --- /dev/null +++ b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/historypropagation/FraudDetectionWorkflow.java @@ -0,0 +1,72 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dapr.springboot.examples.wfp.historypropagation; + +import io.dapr.durabletask.HistoryPropagationScope; +import io.dapr.durabletask.PropagatedHistory; +import io.dapr.durabletask.PropagatedHistoryChunk; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import io.dapr.workflows.WorkflowTaskOptions; +import org.springframework.stereotype.Component; + +import java.util.Optional; + +/** + * Child workflow demonstrating reception of propagated history with LINEAGE scope. + * + *

This workflow receives the full ancestor history chain from its parent + * (ProcessPaymentWorkflow). It inspects the history for audit purposes and + * then calls its own activity with OWN_HISTORY scope.

+ */ +@Component +public class FraudDetectionWorkflow implements Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting FraudDetection workflow: " + ctx.getInstanceId()); + + // Access propagated history from parent + Optional historyOpt = ctx.getPropagatedHistory(); + + if (historyOpt.isPresent()) { + PropagatedHistory history = historyOpt.get(); + ctx.getLogger().info("Received propagated history with scope: " + history.getScope()); + ctx.getLogger().info("History contains " + history.getEvents().size() + " events"); + ctx.getLogger().info("From apps: " + history.getAppIDs()); + + // Verify the parent workflow's card validation happened + Optional parentChunk = + history.getWorkflowByName("ProcessPaymentWorkflow"); + if (parentChunk.isPresent()) { + ctx.getLogger().info("Found parent workflow chunk from app: " + parentChunk.get().getAppId() + + " with " + parentChunk.get().getEventCount() + " events"); + } + } else { + ctx.getLogger().info("No propagated history received"); + } + + // Perform fraud check logic + String input = ctx.getInput(String.class); + String fraudCheckResult = ctx.callActivity( + FraudCheckActivity.class.getName(), + input, + WorkflowTaskOptions.propagateOwnHistory(), + String.class + ).await(); + + ctx.complete("fraud-check:" + fraudCheckResult); + }; + } +} diff --git a/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/historypropagation/ProcessPaymentWorkflow.java b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/historypropagation/ProcessPaymentWorkflow.java new file mode 100644 index 0000000000..beef4e459f --- /dev/null +++ b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/historypropagation/ProcessPaymentWorkflow.java @@ -0,0 +1,74 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dapr.springboot.examples.wfp.historypropagation; + +import io.dapr.durabletask.HistoryPropagationScope; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import io.dapr.workflows.WorkflowTaskOptions; +import org.springframework.stereotype.Component; + +/** + * Parent workflow demonstrating history propagation. + * + *

This workflow processes a payment and propagates its execution history to downstream + * workflows and activities for audit, fraud detection, and chain-of-custody verification.

+ * + *

Flow: ProcessPayment → ValidateCard (activity) → FraudDetection (child wf with LINEAGE) + * → SettlePayment (activity with OWN_HISTORY)

+ */ +@Component +public class ProcessPaymentWorkflow implements Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting ProcessPayment workflow: " + ctx.getInstanceId()); + + String paymentInput = ctx.getInput(String.class); + + // Step 1: Validate the card (no propagation needed for this step) + String validationResult = ctx.callActivity( + ValidateCardActivity.class.getName(), + paymentInput, + String.class + ).await(); + ctx.getLogger().info("Card validation result: " + validationResult); + + // Step 2: Call fraud detection child workflow with LINEAGE propagation. + // The child will receive this workflow's full execution history (including + // the ValidateCard activity above) plus any ancestor history. + String fraudResult = ctx.callChildWorkflow( + FraudDetectionWorkflow.class.getName(), + paymentInput, + null, + WorkflowTaskOptions.propagateLineage(), + String.class + ).await(); + ctx.getLogger().info("Fraud detection result: " + fraudResult); + + // Step 3: Settle the payment with OWN_HISTORY propagation. + // The activity receives only this workflow's events (trust boundary - + // no grandparent history if this workflow was itself called with LINEAGE). + String settlementResult = ctx.callActivity( + SettlePaymentActivity.class.getName(), + paymentInput, + WorkflowTaskOptions.propagateOwnHistory(), + String.class + ).await(); + ctx.getLogger().info("Settlement result: " + settlementResult); + + ctx.complete("Payment processed: " + settlementResult); + }; + } +} diff --git a/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/historypropagation/SettlePaymentActivity.java b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/historypropagation/SettlePaymentActivity.java new file mode 100644 index 0000000000..c544cbaa89 --- /dev/null +++ b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/historypropagation/SettlePaymentActivity.java @@ -0,0 +1,49 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dapr.springboot.examples.wfp.historypropagation; + +import io.dapr.durabletask.PropagatedHistory; +import io.dapr.durabletask.PropagatedHistoryChunk; +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.springframework.stereotype.Component; + +import java.util.Optional; + +/** + * Activity that settles a payment. + * Receives propagated history with OWN_HISTORY scope — only sees the immediate + * caller's events (trust boundary; grandparent history is dropped). + */ +@Component +public class SettlePaymentActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + ctx.getLogger().info("Settling payment for: " + ctx.getInput(String.class)); + + Optional historyOpt = ctx.getPropagatedHistory(); + if (historyOpt.isPresent()) { + PropagatedHistory history = historyOpt.get(); + ctx.getLogger().info("Settlement activity has propagated history (scope: " + + history.getScope() + ")"); + ctx.getLogger().info("Events from " + history.getWorkflows().size() + " workflow(s):"); + for (PropagatedHistoryChunk chunk : history.getWorkflows()) { + ctx.getLogger().info(" - " + chunk.getWorkflowName() + " (" + chunk.getAppId() + + "): " + chunk.getEventCount() + " events"); + } + } + + return "settled"; + } +} diff --git a/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/historypropagation/ValidateCardActivity.java b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/historypropagation/ValidateCardActivity.java new file mode 100644 index 0000000000..94ec8f1cb9 --- /dev/null +++ b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/historypropagation/ValidateCardActivity.java @@ -0,0 +1,31 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dapr.springboot.examples.wfp.historypropagation; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.springframework.stereotype.Component; + +/** + * Activity that validates a payment card. + * This activity does not receive propagated history (called without propagation scope). + */ +@Component +public class ValidateCardActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + ctx.getLogger().info("Validating card for: " + ctx.getInput(String.class)); + return "card-valid"; + } +}