-
Notifications
You must be signed in to change notification settings - Fork 183
Sample code for Nexus messaging #776
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
87c691b
Status checkin
Evanthx 12ae22f
Updating
Evanthx 85bec3e
Moving packages
Evanthx f6f20ab
Remove yarn.lock and node_modules/.yarn-integrity from tracking
Evanthx 47a43e2
Reviewing
Evanthx b6c14f4
Some tweaks
Evanthx 17adffb
Splitting test cases apart
Evanthx 4bfdba9
Tinkering
Evanthx 050f0c4
Reviewing and some updates
Evanthx 5ca7674
Updating a readme
Evanthx 7facf39
Updated for code review comments
Evanthx 039abcf
Fixes from code review
Evanthx 91ebd19
Some changes from code reviews
Evanthx dc62262
Fixed package name in a readme
Evanthx 1835613
Fixed typo
Evanthx File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
16 changes: 16 additions & 0 deletions
16
core/src/main/java/io/temporal/samples/nexusmessaging/README.md
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| This sample shows how to expose a long-running Workflow's queries, updates, and signals as Nexus | ||
| operations. There are two self-contained examples, each in its own directory: | ||
|
|
||
| | | `callerpattern/` | `ondemandpattern/` | | ||
| |---|---|--------------------------------------------------------------| | ||
| | **Pattern** | Signal an existing Workflow | Create and run Workflows on demand, and send signals to them | | ||
| | **Who creates the Workflow?** | The handler worker starts it on boot | The caller starts it via a Nexus operation | | ||
| | **Who knows the Workflow ID?** | Only the handler | The caller chooses and passes it in every operation | | ||
| | **Nexus service** | `NexusGreetingService` | `NexusRemoteGreetingService` | | ||
|
|
||
| Each directory is fully self-contained for clarity. The | ||
| `GreetingWorkflow`, `GreetingWorkflowImpl`, `GreetingActivity` and `GreetingActivityImpl` classes are pretty much the same between the two — only the | ||
| Nexus service interface and its implementation differ. This highlights that the same Workflow can be | ||
| exposed through Nexus in different ways depending on whether the caller needs lifecycle control. | ||
|
|
||
| See each directory's README for running instructions. | ||
65 changes: 65 additions & 0 deletions
65
core/src/main/java/io/temporal/samples/nexusmessaging/callerpattern/README.md
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| ## Caller pattern | ||
|
|
||
| The handler worker starts a `GreetingWorkflow` for a User ID. | ||
| `NexusGreetingServiceImpl` holds that ID and routes every Nexus operation to it. | ||
| The caller's input does not have that Workflow ID as the caller doesn't know it - but the caller sends in the User ID, | ||
| and `NexusGreetingServiceImpl` knows how to get the desired Workflow ID from that User ID (see the getWorkflowId call). | ||
|
|
||
| HandlerWorker is using the same getWorkflowId call to generate a Workflow ID from a User ID when it launches the Workflow. | ||
|
|
||
| The caller Workflow: | ||
| 1. Queries for supported languages (`getLanguages` — backed by a `@QueryMethod`) | ||
| 2. Changes the language to Arabic (`setLanguage` — backed by an `@UpdateMethod` that calls an activity) | ||
| 3. Confirms the change via a second query (`getLanguage`) | ||
| 4. Approves the Workflow (`approve` — backed by a `@SignalMethod`) | ||
|
|
||
| ### Running | ||
|
|
||
| Start a Temporal server: | ||
|
|
||
| ```bash | ||
| temporal server start-dev | ||
| ``` | ||
|
|
||
| Create the namespaces and Nexus endpoint: | ||
|
|
||
| ```bash | ||
| temporal operator namespace create --namespace nexus-messaging-handler-namespace | ||
| temporal operator namespace create --namespace nexus-messaging-caller-namespace | ||
|
|
||
| temporal operator nexus endpoint create \ | ||
| --name nexus-messaging-nexus-endpoint \ | ||
| --target-namespace nexus-messaging-handler-namespace \ | ||
| --target-task-queue nexus-messaging-handler-task-queue | ||
| ``` | ||
|
|
||
| This sample loads connection settings from `ClientConfigProfile`. The | ||
| `nexus-messaging-handler` and `nexus-messaging-caller` profiles are defined in | ||
| `core/src/main/resources/config.toml`. You can override settings with environment | ||
| variables or by editing the TOML file (see the `envconfig` sample for details). | ||
|
|
||
| In one terminal, start the handler worker: | ||
|
|
||
| ```bash | ||
| ./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexusmessaging.callerpattern.handler.HandlerWorker | ||
| ``` | ||
|
|
||
| In a second terminal, start the caller worker: | ||
|
|
||
| ```bash | ||
| ./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexusmessaging.callerpattern.caller.CallerWorker | ||
| ``` | ||
|
|
||
| In a third terminal, run the following command to start the example: | ||
|
|
||
| ```bash | ||
| ./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexusmessaging.callerpattern.caller.CallerStarter | ||
| ``` | ||
|
|
||
| Expected output: | ||
|
|
||
| ``` | ||
| Supported languages: [CHINESE, ENGLISH] | ||
| Language changed: ENGLISH -> ARABIC | ||
| Workflow approved | ||
| ``` |
46 changes: 46 additions & 0 deletions
46
.../src/main/java/io/temporal/samples/nexusmessaging/callerpattern/caller/CallerStarter.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| package io.temporal.samples.nexusmessaging.callerpattern.caller; | ||
|
|
||
| import io.temporal.client.WorkflowClient; | ||
| import io.temporal.client.WorkflowOptions; | ||
| import io.temporal.envconfig.ClientConfigProfile; | ||
| import io.temporal.envconfig.LoadClientConfigProfileOptions; | ||
| import io.temporal.serviceclient.WorkflowServiceStubs; | ||
| import java.nio.file.Paths; | ||
| import java.util.List; | ||
| import java.util.UUID; | ||
|
|
||
| public class CallerStarter { | ||
|
|
||
| public static void main(String[] args) { | ||
| ClientConfigProfile profile; | ||
| try { | ||
| String configFilePath = | ||
| Paths.get(CallerStarter.class.getResource("/config.toml").toURI()).toString(); | ||
| profile = | ||
| ClientConfigProfile.load( | ||
| LoadClientConfigProfileOptions.newBuilder() | ||
| .setConfigFilePath(configFilePath) | ||
| .setConfigFileProfile(CallerWorker.CONFIG_PROFILE) | ||
| .build()); | ||
| } catch (Exception e) { | ||
| throw new RuntimeException("Failed to load client configuration", e); | ||
| } | ||
|
|
||
| WorkflowServiceStubs service = | ||
| WorkflowServiceStubs.newServiceStubs(profile.toWorkflowServiceStubsOptions()); | ||
| WorkflowClient client = WorkflowClient.newInstance(service, profile.toWorkflowClientOptions()); | ||
|
|
||
| CallerWorkflow workflow = | ||
| client.newWorkflowStub( | ||
| CallerWorkflow.class, | ||
| WorkflowOptions.newBuilder() | ||
| .setWorkflowId("nexus-messaging-caller-" + UUID.randomUUID()) | ||
| .setTaskQueue(CallerWorker.TASK_QUEUE) | ||
| .build()); | ||
|
|
||
| // Launch the worker, passing in an identifier which the Nexus service will use | ||
| // to find the matching workflow (See NexusGreetingServiceImpl::getWorkflowId) | ||
| List<String> log = workflow.run("user-1"); | ||
| log.forEach(System.out::println); | ||
| } | ||
| } |
58 changes: 58 additions & 0 deletions
58
core/src/main/java/io/temporal/samples/nexusmessaging/callerpattern/caller/CallerWorker.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| package io.temporal.samples.nexusmessaging.callerpattern.caller; | ||
|
|
||
| import io.temporal.client.WorkflowClient; | ||
| import io.temporal.envconfig.ClientConfigProfile; | ||
| import io.temporal.envconfig.LoadClientConfigProfileOptions; | ||
| import io.temporal.serviceclient.WorkflowServiceStubs; | ||
| import io.temporal.worker.Worker; | ||
| import io.temporal.worker.WorkerFactory; | ||
| import io.temporal.worker.WorkflowImplementationOptions; | ||
| import io.temporal.workflow.NexusServiceOptions; | ||
| import java.nio.file.Paths; | ||
| import java.util.Collections; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| public class CallerWorker { | ||
| private static final Logger logger = LoggerFactory.getLogger(CallerWorker.class); | ||
|
|
||
| static final String CONFIG_PROFILE = "nexus-messaging-caller"; | ||
| public static final String TASK_QUEUE = "nexus-messaging-caller-task-queue"; | ||
| static final String NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint"; | ||
|
|
||
| public static void main(String[] args) throws InterruptedException { | ||
| ClientConfigProfile profile; | ||
| try { | ||
| String configFilePath = | ||
| Paths.get(CallerWorker.class.getResource("/config.toml").toURI()).toString(); | ||
| profile = | ||
| ClientConfigProfile.load( | ||
| LoadClientConfigProfileOptions.newBuilder() | ||
| .setConfigFilePath(configFilePath) | ||
| .setConfigFileProfile(CONFIG_PROFILE) | ||
| .build()); | ||
| } catch (Exception e) { | ||
| throw new RuntimeException("Failed to load client configuration", e); | ||
| } | ||
|
|
||
| WorkflowServiceStubs service = | ||
| WorkflowServiceStubs.newServiceStubs(profile.toWorkflowServiceStubsOptions()); | ||
| WorkflowClient client = WorkflowClient.newInstance(service, profile.toWorkflowClientOptions()); | ||
|
|
||
| WorkerFactory factory = WorkerFactory.newInstance(client); | ||
| Worker worker = factory.newWorker(TASK_QUEUE); | ||
| worker.registerWorkflowImplementationTypes( | ||
| WorkflowImplementationOptions.newBuilder() | ||
| .setNexusServiceOptions( | ||
| // The key must match the @Service-annotated interface name. | ||
| Collections.singletonMap( | ||
| "NexusGreetingService", | ||
| NexusServiceOptions.newBuilder().setEndpoint(NEXUS_ENDPOINT).build())) | ||
| .build(), | ||
| CallerWorkflowImpl.class); | ||
|
|
||
| factory.start(); | ||
| logger.info("Caller worker started, ctrl+c to exit"); | ||
| Thread.currentThread().join(); | ||
| } | ||
| } |
11 changes: 11 additions & 0 deletions
11
...src/main/java/io/temporal/samples/nexusmessaging/callerpattern/caller/CallerWorkflow.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| package io.temporal.samples.nexusmessaging.callerpattern.caller; | ||
|
|
||
| import io.temporal.workflow.WorkflowInterface; | ||
| import io.temporal.workflow.WorkflowMethod; | ||
| import java.util.List; | ||
|
|
||
| @WorkflowInterface | ||
| public interface CallerWorkflow { | ||
| @WorkflowMethod | ||
| List<String> run(String userId); | ||
| } |
73 changes: 73 additions & 0 deletions
73
...main/java/io/temporal/samples/nexusmessaging/callerpattern/caller/CallerWorkflowImpl.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| package io.temporal.samples.nexusmessaging.callerpattern.caller; | ||
|
|
||
| import io.temporal.failure.ApplicationFailure; | ||
| import io.temporal.samples.nexusmessaging.callerpattern.service.Language; | ||
| import io.temporal.samples.nexusmessaging.callerpattern.service.NexusGreetingService; | ||
| import io.temporal.workflow.NexusOperationOptions; | ||
| import io.temporal.workflow.NexusServiceOptions; | ||
| import io.temporal.workflow.Workflow; | ||
| import java.time.Duration; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import org.slf4j.Logger; | ||
|
|
||
| public class CallerWorkflowImpl implements CallerWorkflow { | ||
|
|
||
| private static final Logger logger = Workflow.getLogger(CallerWorkflowImpl.class); | ||
|
|
||
| // The endpoint is configured at the worker level in CallerWorker; only operation options are | ||
| // set here. | ||
| NexusGreetingService greetingService = | ||
| Workflow.newNexusServiceStub( | ||
| NexusGreetingService.class, | ||
| NexusServiceOptions.newBuilder() | ||
| .setOperationOptions( | ||
| NexusOperationOptions.newBuilder() | ||
| .setScheduleToCloseTimeout(Duration.ofSeconds(10)) | ||
| .build()) | ||
| .build()); | ||
|
|
||
| @Override | ||
| public List<String> run(String userId) { | ||
|
|
||
| // Messages in the log array are passed back to the caller who will then log them to report what | ||
| // is happening. | ||
| // The same message is also logged for demo purposes, so that things are visible in the caller | ||
| // workflow output. | ||
| List<String> log = new ArrayList<>(); | ||
|
|
||
| // Call a Nexus operation backed by a query against the entity workflow. | ||
| // The workflow must already be running on the handler, otherwise you will | ||
| // get an error saying the workflow has already terminated. | ||
| NexusGreetingService.GetLanguagesOutput languagesOutput = | ||
| greetingService.getLanguages(new NexusGreetingService.GetLanguagesInput(false, userId)); | ||
| log.add("Supported languages: " + languagesOutput.getLanguages()); | ||
| logger.info("Supported languages: {}", languagesOutput.getLanguages()); | ||
|
|
||
| // Following are examples for each of the three messaging types - | ||
| // update, query, then signal. | ||
|
|
||
| // Call a Nexus operation backed by an update against the entity workflow. | ||
| Language previousLanguage = | ||
| greetingService.setLanguage( | ||
| new NexusGreetingService.SetLanguageInput(Language.ARABIC, userId)); | ||
|
|
||
| // Call a Nexus operation backed by a query to confirm the language change. | ||
| Language currentLanguage = | ||
| greetingService.getLanguage(new NexusGreetingService.GetLanguageInput(userId)); | ||
| if (currentLanguage != Language.ARABIC) { | ||
| throw ApplicationFailure.newFailure( | ||
| "Expected language ARABIC, got " + currentLanguage, "AssertionError"); | ||
| } | ||
|
|
||
| log.add("Language changed: " + previousLanguage.name() + " -> " + Language.ARABIC.name()); | ||
| logger.info("Language changed from {} to {}", previousLanguage, Language.ARABIC); | ||
|
|
||
| // Call a Nexus operation backed by a signal against the entity workflow. | ||
| greetingService.approve(new NexusGreetingService.ApproveInput("caller", userId)); | ||
| log.add("Workflow approved"); | ||
| logger.info("Workflow approved"); | ||
|
|
||
| return log; | ||
| } | ||
| } |
12 changes: 12 additions & 0 deletions
12
.../main/java/io/temporal/samples/nexusmessaging/callerpattern/handler/GreetingActivity.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| package io.temporal.samples.nexusmessaging.callerpattern.handler; | ||
|
|
||
| import io.temporal.activity.ActivityInterface; | ||
| import io.temporal.activity.ActivityMethod; | ||
| import io.temporal.samples.nexusmessaging.callerpattern.service.Language; | ||
|
|
||
| @ActivityInterface | ||
| public interface GreetingActivity { | ||
| // Simulates a call to a remote greeting service. Returns null if the language is not supported. | ||
| @ActivityMethod | ||
| String callGreetingService(Language language); | ||
| } |
25 changes: 25 additions & 0 deletions
25
...n/java/io/temporal/samples/nexusmessaging/callerpattern/handler/GreetingActivityImpl.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| package io.temporal.samples.nexusmessaging.callerpattern.handler; | ||
|
|
||
| import io.temporal.samples.nexusmessaging.callerpattern.service.Language; | ||
| import java.util.EnumMap; | ||
| import java.util.Map; | ||
|
|
||
| public class GreetingActivityImpl implements GreetingActivity { | ||
|
|
||
| private static final Map<Language, String> GREETINGS = new EnumMap<>(Language.class); | ||
|
|
||
| static { | ||
| GREETINGS.put(Language.ARABIC, "مرحبا بالعالم"); | ||
| GREETINGS.put(Language.CHINESE, "你好,世界"); | ||
| GREETINGS.put(Language.ENGLISH, "Hello, world"); | ||
| GREETINGS.put(Language.FRENCH, "Bonjour, monde"); | ||
| GREETINGS.put(Language.HINDI, "नमस्ते दुनिया"); | ||
| GREETINGS.put(Language.PORTUGUESE, "Olá mundo"); | ||
| GREETINGS.put(Language.SPANISH, "Hola mundo"); | ||
| } | ||
|
|
||
| @Override | ||
| public String callGreetingService(Language language) { | ||
| return GREETINGS.get(language); | ||
| } | ||
| } |
47 changes: 47 additions & 0 deletions
47
.../main/java/io/temporal/samples/nexusmessaging/callerpattern/handler/GreetingWorkflow.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| package io.temporal.samples.nexusmessaging.callerpattern.handler; | ||
|
|
||
| import io.temporal.samples.nexusmessaging.callerpattern.service.Language; | ||
| import io.temporal.samples.nexusmessaging.callerpattern.service.NexusGreetingService; | ||
| import io.temporal.workflow.QueryMethod; | ||
| import io.temporal.workflow.SignalMethod; | ||
| import io.temporal.workflow.UpdateMethod; | ||
| import io.temporal.workflow.UpdateValidatorMethod; | ||
| import io.temporal.workflow.WorkflowInterface; | ||
| import io.temporal.workflow.WorkflowMethod; | ||
|
|
||
| /** | ||
| * A long-running "entity" workflow that backs the NexusGreetingService Nexus operations. The | ||
| * workflow exposes queries, an update, and a signal. These are private implementation details of | ||
| * the Nexus service: the caller only interacts via Nexus operations. | ||
| */ | ||
| @WorkflowInterface | ||
| public interface GreetingWorkflow { | ||
|
|
||
| @WorkflowMethod | ||
| String run(); | ||
|
|
||
| // Returns the languages currently supported by the workflow. | ||
| @QueryMethod | ||
| NexusGreetingService.GetLanguagesOutput getLanguages( | ||
| NexusGreetingService.GetLanguagesInput input); | ||
|
|
||
| // Returns the currently active language. | ||
| @QueryMethod | ||
| Language getLanguage(); | ||
|
|
||
| // Approves the workflow, allowing it to complete. | ||
| @SignalMethod | ||
| void approve(NexusGreetingService.ApproveInput input); | ||
|
|
||
| // Changes the active language synchronously (only supports languages already in the greetings | ||
| // map). | ||
| @UpdateMethod | ||
| Language setLanguage(NexusGreetingService.SetLanguageInput input); | ||
|
|
||
| @UpdateValidatorMethod(updateName = "setLanguage") | ||
| void validateSetLanguage(NexusGreetingService.SetLanguageInput input); | ||
|
|
||
| // Changes the active language, calling an activity to fetch a greeting for new languages. | ||
| @UpdateMethod | ||
| Language setLanguageUsingActivity(NexusGreetingService.SetLanguageInput input); | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.