Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.dapr.durabletask;

import io.dapr.durabletask.implementation.protobuf.Orchestration;

/**
* Controls how execution history is propagated to a child workflow or activity.
*/
public enum HistoryPropagationScope {
/**
* No propagation. The child receives no history from the caller.
*/
NONE,

/**
* Propagate the caller's own history events only. The child does not see
* any ancestral history (trust boundary).
*/
OWN_HISTORY,

/**
* Propagate the caller's own history events AND the full ancestral chain.
* Any propagated history this workflow received from its parent is forwarded to the child.
*/
LINEAGE;

Orchestration.HistoryPropagationScope toProto() {
switch (this) {
case OWN_HISTORY:
return Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_OWN_HISTORY;
case LINEAGE:
return Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_LINEAGE;
default:
return Orchestration.HistoryPropagationScope.HISTORY_PROPAGATION_SCOPE_NONE;
}
}

static HistoryPropagationScope fromProto(Orchestration.HistoryPropagationScope proto) {
switch (proto) {
case HISTORY_PROPAGATION_SCOPE_OWN_HISTORY:
return OWN_HISTORY;
case HISTORY_PROPAGATION_SCOPE_LINEAGE:
return LINEAGE;
default:
return NONE;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.dapr.durabletask;

import io.dapr.durabletask.implementation.protobuf.HistoryEvents;
import io.dapr.durabletask.implementation.protobuf.Orchestration;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Represents propagated execution history from a parent workflow to a child workflow or activity.
* Provides query methods for inspecting ancestor execution history.
*/
public final class PropagatedHistory {
private final List<HistoryEvents.HistoryEvent> events;
private final HistoryPropagationScope scope;
private final List<PropagatedHistoryChunk> chunks;

PropagatedHistory(List<HistoryEvents.HistoryEvent> events,
HistoryPropagationScope scope,
List<PropagatedHistoryChunk> chunks) {
this.events = Collections.unmodifiableList(new ArrayList<>(events));
this.scope = scope;
this.chunks = Collections.unmodifiableList(new ArrayList<>(chunks));
}

static PropagatedHistory fromProto(HistoryEvents.PropagatedHistory proto) {
List<PropagatedHistoryChunk> chunks = proto.getChunksList().stream()
.map(c -> new PropagatedHistoryChunk(
c.getAppId(),
c.getStartEventIndex(),
c.getEventCount(),
c.getInstanceId(),
c.getWorkflowName()))
.collect(Collectors.toList());

HistoryPropagationScope scope = HistoryPropagationScope.fromProto(proto.getScope());

return new PropagatedHistory(proto.getEventsList(), scope, chunks);
}

/**
* Gets the raw history events that were propagated.
*
* @return an unmodifiable list of history events
*/
public List<HistoryEvents.HistoryEvent> getEvents() {
return this.events;
}

/**
* Gets the propagation scope that was used to produce this history.
*
* @return the history propagation scope
*/
public HistoryPropagationScope getScope() {
return this.scope;
}

/**
* Gets the workflow chunks identifying which app/workflow produced which events.
*
* @return an unmodifiable list of history chunks
*/
public List<PropagatedHistoryChunk> getWorkflows() {
return this.chunks;
}

/**
* Gets a deduplicated, ordered list of app IDs in the propagation chain.
*
* @return list of app IDs in the order they appear in chunks
*/
public List<String> getAppIDs() {
Set<String> seen = new LinkedHashSet<>();
for (PropagatedHistoryChunk chunk : this.chunks) {
if (chunk.getAppId() != null && !chunk.getAppId().isEmpty()) {
seen.add(chunk.getAppId());
}
}
return new ArrayList<>(seen);
}

/**
* Gets the first workflow chunk matching the given workflow name.
* Returns the last match (most recent occurrence) if multiple exist.
*
* @param name the workflow name to search for
* @return an Optional containing the matching chunk, or empty if not found
*/
public Optional<PropagatedHistoryChunk> getWorkflowByName(String name) {
List<PropagatedHistoryChunk> matches = getWorkflowsByName(name);
if (matches.isEmpty()) {
return Optional.empty();
}
return Optional.of(matches.get(matches.size() - 1));
}

/**
* Gets all workflow chunks matching the given workflow name.
*
* @param name the workflow name to search for
* @return a list of matching chunks in execution order
*/
public List<PropagatedHistoryChunk> getWorkflowsByName(String name) {
return this.chunks.stream()
.filter(c -> name.equals(c.getWorkflowName()))
.collect(Collectors.toList());
}

/**
* Gets the history events produced by the given app ID.
*
* @param appId the app ID to filter by
* @return a list of history events from the specified app
*/
public List<HistoryEvents.HistoryEvent> getEventsByAppID(String appId) {
List<HistoryEvents.HistoryEvent> result = new ArrayList<>();
for (PropagatedHistoryChunk chunk : this.chunks) {
if (appId.equals(chunk.getAppId())) {
addEventsFromChunk(chunk, result);
}
}
return result;
}

/**
* Gets the history events produced by the given workflow instance ID.
*
* @param instanceId the instance ID to filter by
* @return a list of history events from the specified instance
*/
public List<HistoryEvents.HistoryEvent> getEventsByInstanceID(String instanceId) {
List<HistoryEvents.HistoryEvent> result = new ArrayList<>();
for (PropagatedHistoryChunk chunk : this.chunks) {
if (instanceId.equals(chunk.getInstanceId())) {
addEventsFromChunk(chunk, result);
}
}
return result;
}

/**
* Gets the history events produced by the given workflow name.
*
* @param workflowName the workflow name to filter by
* @return a list of history events from the specified workflow
*/
public List<HistoryEvents.HistoryEvent> getEventsByWorkflowName(String workflowName) {
List<HistoryEvents.HistoryEvent> result = new ArrayList<>();
for (PropagatedHistoryChunk chunk : this.chunks) {
if (workflowName.equals(chunk.getWorkflowName())) {
addEventsFromChunk(chunk, result);
}
}
return result;
}

private void addEventsFromChunk(PropagatedHistoryChunk chunk,
List<HistoryEvents.HistoryEvent> result) {
int start = chunk.getStartEventIndex();
int end = Math.min(start + chunk.getEventCount(), this.events.size());
for (int i = start; i < end; i++) {
result.add(this.events.get(i));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.dapr.durabletask;

/**
* Represents a chunk of propagated history events from a specific workflow instance.
*/
public final class PropagatedHistoryChunk {
private final String appId;
private final int startEventIndex;
private final int eventCount;
private final String instanceId;
private final String workflowName;

PropagatedHistoryChunk(String appId, int startEventIndex, int eventCount,
String instanceId, String workflowName) {
this.appId = appId;
this.startEventIndex = startEventIndex;
this.eventCount = eventCount;
this.instanceId = instanceId;
this.workflowName = workflowName;
}

/**
* Gets the app ID that produced the events in this chunk.
*
* @return the app ID
*/
public String getAppId() {
return this.appId;
}

/**
* Gets the index of the first event in this chunk (inclusive) into the
* propagated history events list.
*
* @return the start event index
*/
public int getStartEventIndex() {
return this.startEventIndex;
}

/**
* Gets the number of events in this chunk.
*
* @return the event count
*/
public int getEventCount() {
return this.eventCount;
}

/**
* Gets the workflow instance ID that produced the events in this chunk.
*
* @return the instance ID
*/
public String getInstanceId() {
return this.instanceId;
}

/**
* Gets the workflow name that produced the events in this chunk.
*
* @return the workflow name
*/
public String getWorkflowName() {
return this.workflowName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package io.dapr.durabletask;

import java.util.Optional;

/**
* Interface that provides {@link TaskActivity} implementations with activity context, such as an activity's name and
* its input.
Expand Down Expand Up @@ -53,4 +55,11 @@ public interface TaskActivityContext {
* @return trace parent id
*/
String getTraceParent();

/**
* Gets the propagated history from a parent workflow, if any was propagated.
*
* @return an Optional containing the propagated history, or empty if none was propagated
*/
Optional<PropagatedHistory> getPropagatedHistory();
}
Loading
Loading