Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,24 @@ echo "$(hatch env find)/bin/python"
```

### VS Code
#### Interpreter
If you're using VS Code, "Python: Select Interpreter" and use the hatch venv Python interpreter
as found with the `hatch env find` command.

Kiro and VS Code mangles the interpreter path if it contains spaces, which results in
errors finding the interpreter. You can create a local .venv file symlink _without_ spaces
in the path:

```bash
# create a symlink at: ./.venv/bin/python
rm -rf .venv && ln -s "$(hatch env find)" .venv
```

When you "Select Interpreter", enter path `./.venv/bin/python`.

You'll have to rerun this command whenever you recreate your hatch envs.

#### Linting
Hatch uses Ruff for static analysis.

You might want to install the [Ruff extension for VS Code](https://github.com/astral-sh/ruff-vscode)
Expand Down
40 changes: 19 additions & 21 deletions src/aws_durable_execution_sdk_python/concurrency/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ class ConcurrentExecutor(ABC, Generic[CallableType, ResultType]):

def __init__(
self,
operation_identifier: OperationIdentifier,
executables: list[Executable[CallableType]],
max_concurrency: int | None,
completion_config: CompletionConfig,
Expand All @@ -158,7 +157,6 @@ def __init__(
handle large BatchResult payloads efficiently. Matches TypeScript behavior in
run-in-child-context-handler.ts.
"""
self.operation_identifier = operation_identifier
self.executables = executables
self.max_concurrency = max_concurrency
self.completion_config = completion_config
Expand Down Expand Up @@ -398,36 +396,36 @@ def _execute_item_in_child_context(
"""
Execute a single item in a derived child context.

instead of relying on `executor_context.run_in_child_context`
we generate an operation_id for the child, and then call `child_handler`
directly. This avoids the hidden mutation of the context's internal counter.
we can do this because we explicitly control the generation of step_id and do it
using executable.index.
Instead of relying on `executor_context.run_in_child_context` we
generate an operation_id for the child, then call `child_handler`
directly. This avoids the hidden mutation of the context's
internal counter. We explicitly derive the child's operation_id
from `executable.index` so that the same input always produces
the same id regardless of the order branches actually run in.


invariant: `operation_id` for a given executable is deterministic,
and execution order invariant.
Invariant: `operation_id` for a given executable is deterministic
and execution-order invariant.
"""

operation_id = executor_context._create_step_id_for_logical_step( # noqa: SLF001
operation_id: str = executor_context._create_step_id_for_logical_step( # noqa: SLF001
executable.index
)
name = f"{self.name_prefix}{executable.index}"
non_virtual_parent_id = (
self.operation_identifier.operation_id
if self.nesting_type is NestingType.FLAT
else None
)
child_context = executor_context.create_child_context(
operation_id, non_virtual_parent_id
name: str = f"{self.name_prefix}{executable.index}"
is_virtual: bool = self.nesting_type is NestingType.FLAT

child_context: DurableContext = executor_context.create_child_context(
operation_id, is_virtual=is_virtual
)
# For NESTED this is for branch's START/SUCCEED/FAIL checkpoints (not the children of the branch).
# For FLAT `child_handler` skips checkpoints, so not used.
# Construct it unconditionally to keep the call simple.
operation_identifier = OperationIdentifier(
operation_id,
executor_context._parent_id, # noqa: SLF001
name,
)

def run_in_child_handler():
def run_in_child_handler() -> ResultType:
return self.execute_item(child_context, executable)

result: ResultType = child_handler(
Expand All @@ -438,7 +436,7 @@ def run_in_child_handler():
serdes=self.item_serdes or self.serdes,
sub_type=self.sub_type_iteration,
summary_generator=self.summary_generator,
is_virtual=self.nesting_type is NestingType.FLAT,
is_virtual=is_virtual,
),
)
child_context.state.track_replay(operation_id=operation_id)
Expand Down
42 changes: 35 additions & 7 deletions src/aws_durable_execution_sdk_python/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,39 @@ class TerminationMode(Enum):


class NestingType(Enum):
"""
How child operations should inherit context from their parent.
"""Control how child contexts are created for batch operations.

Applies to `map` and `parallel`. Each branch or iteration runs inside a
child context.

- NESTED: full checkpointed context
- FLAT: a virtual context that skips checkpoints for the branch/iteration.

- NESTED: Each child operation runs in its own isolated context
- FLAT: All child operations share the same parent context
"""

NESTED = "NESTED"
"""Create CONTEXT operations for each branch/iteration with full checkpointing.

Operations within each branch/iteration are wrapped in their own context.

- Observability: high — each branch/iteration appears as a separate
operation in execution history.
- Cost: higher — consumes more operations due to CONTEXT creation
overhead.
- Scale: lower maximum iterations due to operation limits.
"""

FLAT = "FLAT"
"""Skip CONTEXT operations for branches/iterations using virtual contexts.

Operations execute directly without individual context wrapping.

- Observability: lower — branches/iterations don't appear as separate
operations in execution history.
- Cost: ~30% lower — reduces operation consumption by skipping CONTEXT
overhead.
- Scale: higher maximum iterations possible within operation limits.
"""


@dataclass(frozen=True)
Expand Down Expand Up @@ -271,9 +295,13 @@ class ChildConfig(Generic[T]):
Used internally by map/parallel operations to handle large BatchResult payloads.
Signature: (result: T) -> str

is_virtual: Whether the child operation is virtual (doesn't represent a real operation).
Virtual contexts are used for concurrency operations and don't appear in
the final execution history. Default is False.
is_virtual: When True, skip all checkpoints (START, SUCCEED,
FAIL) for this child context and propagate the caller's reporting
parent id through to operations created inside the child. The
branch is a logical scope for step-id prefixing but does not
appear in the execution history. Used internally by
NestingType.FLAT branches. Use this to group operations without
adding a CONTEXT entry to the execution history.

See TypeScript reference: aws-durable-execution-sdk-js/src/types/index.ts
"""
Expand Down
88 changes: 68 additions & 20 deletions src/aws_durable_execution_sdk_python/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,21 @@ def __init__(
lambda_context: LambdaContext | None = None,
parent_id: str | None = None,
logger: Logger | None = None,
non_virtual_parent_id: str | None = None,
step_id_prefix: str | None = None,
) -> None:
self.state: ExecutionState = state
self.execution_context: ExecutionContext = execution_context
self.lambda_context = lambda_context
# operations inside this context use this id as their parent
self._parent_id: str | None = parent_id
# child operations use this to generate deterministic step ids.
# differs from `parent_id` only for virtual contexts.
self._step_id_prefix: str | None = (
step_id_prefix if step_id_prefix is not None else parent_id
)
# cached at construction to make invariant even if parent/prefix mutates.
self._is_virtual: bool = self._parent_id != self._step_id_prefix
self._step_counter: OrderedCounter = OrderedCounter()
self._non_virtual_parent_id = non_virtual_parent_id or parent_id

log_info = LogInfo(
execution_state=state,
Expand All @@ -256,6 +263,18 @@ def __init__(
info=log_info,
)

@property
def is_virtual(self) -> bool:
"""True if this context does not checkpoint its own start and completion.

You create a virtual context by `create_child_context(..., is_virtual=True)`.
FLAT-mode `map`/`parallel` branches uses virtual contexts. Inner operations
use the grandfather as parent (enclosing non-virtual ancestor, skipping the branch level
in the hierarchy), while step ids are still prefixed with the branch's own
operation id so replay stays deterministic.
"""
return self._is_virtual

# region factories
@staticmethod
def from_lambda_context(
Expand All @@ -272,22 +291,44 @@ def from_lambda_context(
)

def create_child_context(
self, parent_id: str, non_virtual_parent_id=None
self, operation_id: str, *, is_virtual: bool = False
) -> DurableContext:
"""Create a child context from the given parent."""
logger.debug("Creating child context for parent %s", parent_id)
"""Create a child context for the given operation.

Args:
operation_id: The operation id that owns the child context. Used as
the child's step-id prefix in all cases.
is_virtual: When `True`, create a virtual child whose inner
operations report to this context's own `_parent_id` (one
level up the hierarchy). When `False` (default), produce a
regular child whose inner operations report to
`operation_id`.

Returns:
A new `DurableContext` child for the current context.
"""
# For a virtual child, propagate the current `_parent_id` so its
# inner operations refer to the grandparent rather than the parent.
# For a regular non-virtual child, the child's own `operation_id` is
# the parent id for its inner operations (standard nesting).
child_parent_id: str | None = self._parent_id if is_virtual else operation_id
logger.debug(
"Creating child context for operation %s (is_virtual=%s)",
operation_id,
is_virtual,
)
return DurableContext(
state=self.state,
execution_context=self.execution_context,
lambda_context=self.lambda_context,
parent_id=parent_id,
parent_id=child_parent_id,
step_id_prefix=operation_id,
logger=self.logger.with_log_info(
LogInfo(
execution_state=self.state,
parent_id=parent_id,
parent_id=child_parent_id,
)
),
non_virtual_parent_id=non_virtual_parent_id,
)

# endregion factories
Expand Down Expand Up @@ -315,7 +356,8 @@ def _create_step_id_for_logical_step(self, step: int) -> str:
This allows us to recover operation ids or even look
forward without changing the internal state of this context.
"""
step_id = f"{self._parent_id}-{step}" if self._parent_id else str(step)
prefix: str | None = self._step_id_prefix
step_id: str = f"{prefix}-{step}" if prefix else str(step)
return hashlib.blake2b(step_id.encode()).hexdigest()[:64]

def _create_step_id(self) -> str:
Expand Down Expand Up @@ -353,7 +395,7 @@ def create_callback(
state=self.state,
operation_identifier=OperationIdentifier(
operation_id=operation_id,
parent_id=self._non_virtual_parent_id,
parent_id=self._parent_id,
name=name,
),
config=config,
Expand Down Expand Up @@ -395,7 +437,7 @@ def invoke(
state=self.state,
operation_identifier=OperationIdentifier(
operation_id=operation_id,
parent_id=self._non_virtual_parent_id,
parent_id=self._parent_id,
name=name,
),
config=config,
Expand All @@ -417,10 +459,10 @@ def map(
operation_id = self._create_step_id()
operation_identifier = OperationIdentifier(
operation_id=operation_id,
parent_id=self._non_virtual_parent_id,
parent_id=self._parent_id,
name=map_name,
)
map_context = self.create_child_context(parent_id=operation_id)
map_context = self.create_child_context(operation_id=operation_id)

def map_in_child_context() -> BatchResult[R]:
# map_context is a child_context of the context upon which `.map`
Expand Down Expand Up @@ -461,9 +503,9 @@ def parallel(
"""Execute multiple callables in parallel."""
# _create_step_id() is thread-safe. rest of method is safe, since using local copy of parent id
operation_id = self._create_step_id()
parallel_context = self.create_child_context(parent_id=operation_id)
parallel_context = self.create_child_context(operation_id=operation_id)
operation_identifier = OperationIdentifier(
operation_id=operation_id, parent_id=self._non_virtual_parent_id, name=name
operation_id=operation_id, parent_id=self._parent_id, name=name
)

def parallel_in_child_context() -> BatchResult[T]:
Expand Down Expand Up @@ -517,15 +559,21 @@ def run_in_child_context(
# _create_step_id() is thread-safe. rest of method is safe, since using local copy of parent id
operation_id = self._create_step_id()

is_virtual: bool = config.is_virtual if config else False

def callable_with_child_context():
return func(self.create_child_context(parent_id=operation_id))
return func(
self.create_child_context(
operation_id=operation_id, is_virtual=is_virtual
)
)

result: T = child_handler(
func=callable_with_child_context,
state=self.state,
operation_identifier=OperationIdentifier(
operation_id=operation_id,
parent_id=self._non_virtual_parent_id,
parent_id=self._parent_id,
name=step_name,
),
config=config,
Expand All @@ -550,7 +598,7 @@ def step(
state=self.state,
operation_identifier=OperationIdentifier(
operation_id=operation_id,
parent_id=self._non_virtual_parent_id,
parent_id=self._parent_id,
name=step_name,
),
context_logger=self.logger,
Expand All @@ -577,7 +625,7 @@ def wait(self, duration: Duration, name: str | None = None) -> None:
state=self.state,
operation_identifier=OperationIdentifier(
operation_id=operation_id,
parent_id=self._non_virtual_parent_id,
parent_id=self._parent_id,
name=name,
),
)
Expand Down Expand Up @@ -632,7 +680,7 @@ def wait_for_condition(
state=self.state,
operation_identifier=OperationIdentifier(
operation_id=operation_id,
parent_id=self._non_virtual_parent_id,
parent_id=self._parent_id,
name=name,
),
context_logger=self.logger,
Expand Down
Loading
Loading