-
Notifications
You must be signed in to change notification settings - Fork 216
Stand alone operations for Nexus #2872
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Evanthx
wants to merge
50
commits into
master
Choose a base branch
from
sano
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
50 commits
Select commit
Hold shift + click to select a range
b0cb675
Add standalone Nexus client (start/describe/cancel/terminate/list/cou…
Evanthx 6a81d4a
Renaming handle classes
Evanthx 9354504
Removing the renamed classes
Evanthx 360be4f
Using endpoints from the rule instead of creating them
Evanthx addccda
Moving common test code to SDKTestWorkflowRule
Evanthx e1e3fab
Ensuring we are using an external server, as the internal one doesn't…
Evanthx de4e785
PR changes for comments
Evanthx 0ef9588
Changing deadline behavior
Evanthx 7fc3e62
Addressing PR comments
Evanthx 734374b
Responding to PR comments
Evanthx 11cbcef
Removed a file
Evanthx 50664ac
PR comments
Evanthx 679a95d
PR responses
Evanthx f2c9c4a
Updated exception handling
Evanthx d2cdc2c
Test updates
Evanthx 1dbf1cb
Removing some timeouts
Evanthx 7439111
Refactor due to PR comment
Evanthx 91c0414
More tests
Evanthx 5e5e78c
Adding more tests
Evanthx 7d7fac0
More checking in a test
Evanthx 345fc85
Updated a test
Evanthx 5f2508c
Adding async tests
Evanthx 1289bd1
Some improvements
Evanthx d4d70d9
Adding tests
Evanthx 351242e
Adding tests
Evanthx 0f3fae5
Don't run the tests against the in-memory server as it doesn't suppor…
Evanthx b3a0854
Adding prereq check for SANO
Evanthx 181428a
Checking for more SANO operations
Evanthx fb694fa
Test fix
Evanthx 3aa1c54
Restoring test
Evanthx 627a610
Now requiring UUID
Evanthx a6d529a
Some more checks on ID
Evanthx a372d27
Cleaned up some inputs
Evanthx c4ed084
Some cleanup from PRs
Evanthx 1015ea6
PR responses
Evanthx 428dd46
From doing a pass over the code
Evanthx 5d51fc5
Changed poll to getResult
Evanthx ca7e289
Adding cancel tests
Evanthx 912463d
Skipping some SANO nexus tests unless the server has them implemented
Evanthx 6a22bed
Updated test server to one that supports SANO with Nexus
Evanthx 694bf8c
Addressing some PR comments
Evanthx 27ec3eb
Default namespace is now "default"
Evanthx 041a221
Some cleanup for PR comments
Evanthx c09f80f
Addressing PR comments
Evanthx e443c5f
PR changes
Evanthx 9c06c4f
PR comments
Evanthx 80a5317
Address a PR comment
Evanthx 0581e6a
Address PR comments
Evanthx 573104d
Responding to PR comments
Evanthx 05cae63
Code review and unit test coverage check
Evanthx File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
153 changes: 153 additions & 0 deletions
153
temporal-sdk/src/main/java/io/temporal/client/NexusClient.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,153 @@ | ||
| package io.temporal.client; | ||
|
|
||
| import io.temporal.common.Experimental; | ||
| import io.temporal.serviceclient.WorkflowServiceStubs; | ||
| import java.lang.reflect.Type; | ||
| import java.util.stream.Stream; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| /** | ||
| * Client for managing standalone Nexus operation executions. Obtain an instance via {@link | ||
| * #newInstance(WorkflowServiceStubs)} or {@link #newInstance(WorkflowServiceStubs, | ||
| * NexusClientOptions)}. Do not create this object per request; share it for the lifetime of the | ||
| * process. | ||
| * | ||
| * <p>Standalone Nexus operations run independently of any workflow — they are scheduled, monitored, | ||
| * and managed directly through this client (and the service-bound clients it produces) rather than | ||
| * from within a workflow execution. | ||
| * | ||
| * <p>To start operations, build a service-bound client and call {@code start}/{@code execute}: | ||
| * | ||
| * <pre>{@code | ||
| * NexusClient client = NexusClient.newInstance(stubs, options); | ||
| * | ||
| * // Typed: bind to an @ServiceInterface and invoke a method reference. | ||
| * NexusServiceClient<MyService> svc = | ||
| * client.newNexusServiceClient(MyService.class, "my-endpoint"); | ||
| * String result = svc.execute(MyService::greet, "world"); | ||
| * | ||
| * // Untyped: dispatch by operation name string. | ||
| * UntypedNexusServiceClient untyped = | ||
| * client.newUntypedNexusServiceClient("my-endpoint", "MyService"); | ||
| * UntypedNexusOperationHandle handle = untyped.start("greet", null, "world"); | ||
| * }</pre> | ||
| * | ||
| * <p>To act on an existing operation (describe, cancel, terminate, get result), obtain a handle via | ||
| * {@link #getHandle}: | ||
| * | ||
| * <pre>{@code | ||
| * NexusOperationHandle<String> handle = client.getHandle(operationId, runId, String.class); | ||
| * String result = handle.getResult(); | ||
| * handle.cancel("user requested"); | ||
| * }</pre> | ||
| * | ||
| * <p>For visibility queries across all operations in the namespace, see {@link | ||
| * #listNexusOperationExecutions} and {@link #countNexusOperationExecutions}. | ||
| * | ||
| * @see NexusServiceClient | ||
| * @see UntypedNexusServiceClient | ||
| * @see NexusOperationHandle | ||
| */ | ||
| @Experimental | ||
| public interface NexusClient { | ||
|
|
||
| /** | ||
| * Creates a client with default {@link NexusClientOptions}. | ||
| * | ||
| * @param service gRPC stubs connected to a Temporal Service endpoint | ||
| */ | ||
| static NexusClient newInstance(WorkflowServiceStubs service) { | ||
| return NexusClientImpl.newInstance(service, NexusClientOptions.getDefaultInstance()); | ||
| } | ||
|
|
||
| /** | ||
| * Creates a client with the supplied options. | ||
| * | ||
| * @param service gRPC stubs connected to a Temporal Service endpoint | ||
| * @param options namespace, data converter, interceptors, and defaults applied to operations | ||
| * started through this client | ||
| */ | ||
| static NexusClient newInstance(WorkflowServiceStubs service, NexusClientOptions options) { | ||
| return NexusClientImpl.newInstance(service, options); | ||
| } | ||
|
|
||
| /** Returns the underlying gRPC stubs this client routes RPCs through. */ | ||
| WorkflowServiceStubs getWorkflowServiceStubs(); | ||
|
|
||
| /** | ||
| * Returns an untyped handle to an existing operation execution, optionally pinned to a specific | ||
| * run. | ||
| * | ||
| * @param operationId the user-assigned operation ID | ||
| * @param runId the server-assigned run ID, or {@code null} to target the latest run | ||
| * @return an untyped handle | ||
| */ | ||
| UntypedNexusOperationHandle getHandle(String operationId, @Nullable String runId); | ||
|
|
||
| /** | ||
| * Returns a typed handle to an existing operation execution, bound to {@code resultClass}. | ||
| * | ||
| * @param operationId the user-assigned operation ID | ||
| * @param runId the server-assigned run ID, or {@code null} to target the latest run | ||
| * @param resultClass expected result type | ||
| * @param <R> result type | ||
| */ | ||
| <R> NexusOperationHandle<R> getHandle( | ||
| String operationId, @Nullable String runId, Class<R> resultClass); | ||
|
|
||
| /** | ||
| * Returns a typed handle to an existing operation execution, bound to {@code resultClass}/{@code | ||
| * resultType}. Use the {@code resultType} variant when the result is a generic type whose | ||
| * parameters cannot be captured by {@link Class} alone (e.g. {@code List<String>}). | ||
| * | ||
| * @param operationId the user-assigned operation ID | ||
| * @param runId the server-assigned run ID, or {@code null} to target the latest run | ||
| * @param resultClass expected result class | ||
| * @param resultType generic type for deserialization; may be {@code null} | ||
| * @param <R> result type | ||
| */ | ||
| <R> NexusOperationHandle<R> getHandle( | ||
| String operationId, @Nullable String runId, Class<R> resultClass, @Nullable Type resultType); | ||
|
|
||
| /** | ||
| * Builds a typed service-bound client targeting the given endpoint, dispatching operations by | ||
| * method reference on the {@code @ServiceInterface}-annotated {@code service}. Reuses this | ||
| * client's stubs, options, and interceptor chain. | ||
| * | ||
| * @param service the {@code @ServiceInterface}-annotated service type | ||
| * @param endpoint Nexus endpoint name registered on the Temporal Service | ||
| * @param <T> the service interface type | ||
| */ | ||
| <T> NexusServiceClient<T> newNexusServiceClient(Class<T> service, String endpoint); | ||
|
|
||
| /** | ||
| * Builds an untyped service-bound client targeting the given endpoint and service. Use this to | ||
| * dispatch operations by name string when no service interface is available. | ||
| * | ||
| * @param endpoint Nexus endpoint name registered on the Temporal Service | ||
| * @param serviceName Nexus service name on that endpoint | ||
| */ | ||
| UntypedNexusServiceClient newUntypedNexusServiceClient(String endpoint, String serviceName); | ||
|
maciejdudko marked this conversation as resolved.
|
||
|
|
||
| /** | ||
| * Returns a stream of standalone Nexus operation executions matching the given visibility query. | ||
| * The stream paginates lazily over server-side results — pages are fetched on demand as the | ||
| * stream is consumed. | ||
| * | ||
| * @param query Temporal visibility query string, or {@code null} to return all executions in the | ||
| * client namespace | ||
| * @return a lazy stream of matching executions | ||
| */ | ||
| Stream<NexusOperationExecutionMetadata> listNexusOperationExecutions(@Nullable String query); | ||
|
|
||
| /** | ||
| * Returns the count of standalone Nexus operation executions matching the given visibility query, | ||
| * optionally with aggregation groups. | ||
| * | ||
| * @param query Temporal visibility query string, or {@code null} to count all executions in the | ||
| * client namespace | ||
| * @return execution count, optionally with aggregation groups when the query uses {@code GROUP | ||
| * BY} | ||
| */ | ||
| NexusOperationExecutionCount countNexusOperationExecutions(@Nullable String query); | ||
| } | ||
140 changes: 140 additions & 0 deletions
140
temporal-sdk/src/main/java/io/temporal/client/NexusClientImpl.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,140 @@ | ||
| package io.temporal.client; | ||
|
|
||
| import static io.temporal.internal.WorkflowThreadMarker.enforceNonWorkflowThread; | ||
|
|
||
| import com.uber.m3.tally.Scope; | ||
| import io.temporal.common.Experimental; | ||
| import io.temporal.common.interceptors.NexusClientCallsInterceptor; | ||
| import io.temporal.common.interceptors.NexusClientCallsInterceptor.CountNexusOperationExecutionsInput; | ||
| import io.temporal.common.interceptors.NexusClientCallsInterceptor.CountNexusOperationExecutionsOutput; | ||
| import io.temporal.common.interceptors.NexusClientCallsInterceptor.ListNexusOperationExecutionsInput; | ||
| import io.temporal.common.interceptors.NexusClientCallsInterceptor.ListNexusOperationExecutionsOutput; | ||
| import io.temporal.common.interceptors.NexusClientInterceptor; | ||
| import io.temporal.internal.WorkflowThreadMarker; | ||
| import io.temporal.internal.client.NamespaceInjectWorkflowServiceStubs; | ||
| import io.temporal.internal.client.NexusOperationHandleImpl; | ||
| import io.temporal.internal.client.RootNexusClientInvoker; | ||
| import io.temporal.internal.client.external.GenericWorkflowClient; | ||
| import io.temporal.internal.client.external.GenericWorkflowClientImpl; | ||
| import io.temporal.serviceclient.MetricsTag; | ||
| import io.temporal.serviceclient.WorkflowServiceStubs; | ||
| import java.util.List; | ||
| import java.util.stream.Stream; | ||
| import javax.annotation.Nullable; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| @Experimental | ||
| public class NexusClientImpl implements NexusClient { | ||
|
|
||
| private static final Logger log = LoggerFactory.getLogger(NexusClientImpl.class); | ||
|
|
||
| private final WorkflowServiceStubs workflowServiceStubs; | ||
| private final NexusClientOptions options; | ||
| private final GenericWorkflowClient genericClient; | ||
| private final Scope metricsScope; | ||
| private final NexusClientCallsInterceptor nexusClientCallsInvoker; | ||
| private final List<NexusClientInterceptor> interceptors; | ||
|
|
||
| public static NexusClient newInstance(WorkflowServiceStubs service, NexusClientOptions options) { | ||
| enforceNonWorkflowThread(); | ||
| return WorkflowThreadMarker.protectFromWorkflowThread( | ||
| new NexusClientImpl(service, options), NexusClient.class); | ||
| } | ||
|
|
||
| NexusClientImpl(WorkflowServiceStubs workflowServiceStubs, NexusClientOptions options) { | ||
| workflowServiceStubs = | ||
| new NamespaceInjectWorkflowServiceStubs(workflowServiceStubs, options.getNamespace()); | ||
| this.workflowServiceStubs = workflowServiceStubs; | ||
| this.options = options; | ||
| this.metricsScope = | ||
| workflowServiceStubs | ||
| .getOptions() | ||
| .getMetricsScope() | ||
| .tagged(MetricsTag.defaultTags(options.getNamespace())); | ||
| this.genericClient = new GenericWorkflowClientImpl(workflowServiceStubs, metricsScope); | ||
| this.interceptors = options.getInterceptors(); | ||
| this.nexusClientCallsInvoker = initializeClientInvoker(); | ||
| if (log.isDebugEnabled()) { | ||
| log.debug( | ||
| "NexusClient initialized: namespace={}, interceptors={}", | ||
| options.getNamespace(), | ||
| interceptors.size()); | ||
| } | ||
| } | ||
|
|
||
| private NexusClientCallsInterceptor initializeClientInvoker() { | ||
| NexusClientCallsInterceptor invoker = new RootNexusClientInvoker(genericClient, options); | ||
| for (NexusClientInterceptor clientInterceptor : interceptors) { | ||
| NexusClientCallsInterceptor wrapped = clientInterceptor.nexusClientCallsInterceptor(invoker); | ||
| if (wrapped == null) { | ||
| throw new IllegalStateException( | ||
| "NexusClientInterceptor " | ||
| + clientInterceptor.getClass().getName() | ||
| + " returned null from nexusClientCallsInterceptor; expected a non-null" | ||
| + " NexusClientCallsInterceptor wrapping the supplied next link"); | ||
| } | ||
| invoker = wrapped; | ||
| } | ||
| return invoker; | ||
| } | ||
|
|
||
| @Override | ||
| public WorkflowServiceStubs getWorkflowServiceStubs() { | ||
| return workflowServiceStubs; | ||
| } | ||
|
|
||
| @Override | ||
| public UntypedNexusOperationHandle getHandle(String operationId, @Nullable String runId) { | ||
| return new NexusOperationHandleImpl(operationId, runId, nexusClientCallsInvoker); | ||
| } | ||
|
|
||
| @Override | ||
| public <R> NexusOperationHandle<R> getHandle( | ||
| String operationId, @Nullable String runId, Class<R> resultClass) { | ||
| return getHandle(operationId, runId, resultClass, null); | ||
| } | ||
|
|
||
| @Override | ||
| public <R> NexusOperationHandle<R> getHandle( | ||
| String operationId, | ||
| @Nullable String runId, | ||
| Class<R> resultClass, | ||
| @Nullable java.lang.reflect.Type resultType) { | ||
| return NexusOperationHandle.fromUntyped(getHandle(operationId, runId), resultClass, resultType); | ||
| } | ||
|
|
||
| @Override | ||
| public <T> NexusServiceClient<T> newNexusServiceClient(Class<T> service, String endpoint) { | ||
| enforceNonWorkflowThread(); | ||
| return WorkflowThreadMarker.protectFromWorkflowThread( | ||
|
maciejdudko marked this conversation as resolved.
|
||
| new NexusServiceClientImpl<>(nexusClientCallsInvoker, service, endpoint, options), | ||
| NexusServiceClient.class); | ||
| } | ||
|
|
||
| @Override | ||
| public UntypedNexusServiceClient newUntypedNexusServiceClient( | ||
| String endpoint, String serviceName) { | ||
| enforceNonWorkflowThread(); | ||
| return WorkflowThreadMarker.protectFromWorkflowThread( | ||
| new UntypedNexusServiceClientImpl(nexusClientCallsInvoker, endpoint, serviceName, options), | ||
| UntypedNexusServiceClient.class); | ||
| } | ||
|
|
||
| @Override | ||
| public Stream<NexusOperationExecutionMetadata> listNexusOperationExecutions( | ||
| @Nullable String query) { | ||
| ListNexusOperationExecutionsOutput out = | ||
| nexusClientCallsInvoker.listNexusOperationExecutions( | ||
| new ListNexusOperationExecutionsInput(query)); | ||
| return out.getOperations(); | ||
| } | ||
|
|
||
| @Override | ||
| public NexusOperationExecutionCount countNexusOperationExecutions(@Nullable String query) { | ||
| CountNexusOperationExecutionsOutput out = | ||
| nexusClientCallsInvoker.countNexusOperationExecutions( | ||
| new CountNexusOperationExecutionsInput(query)); | ||
| return out.getCount(); | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.