diff --git a/core/src/main/java/io/temporal/samples/updatewithstart/README.md b/core/src/main/java/io/temporal/samples/updatewithstart/README.md new file mode 100644 index 00000000..f0c48660 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/updatewithstart/README.md @@ -0,0 +1,24 @@ +### Early-Return Sample + +This sample demonstrates an early-return from a workflow. + +By utilizing Update-with-Start, a client can start a new workflow and synchronously receive +a response mid-workflow, while the workflow continues to run to completion. + +To run the sample, start the worker: +```bash +./gradlew -q execute -PmainClass=io.temporal.samples.earlyreturn.EarlyReturnWorker +``` + +Then, start the client: + +```bash +./gradlew -q execute -PmainClass=io.temporal.samples.earlyreturn.EarlyReturnClient +``` + +* The client will start a workflow using Update-With-Start. +* Update-With-Start will trigger an initialization step. +* If the initialization step succeeds (default), intialization will return to the client with a transaction ID and the workflow will continue. The workflow will then complete and return the final result. +* If the intitialization step fails (amount <= 0), the workflow will return to the client with an error message and the workflow will run an activity to cancel the transaction. + +To trigger a failed initialization, set the amount to <= 0 in the `EarlyReturnClient` class's `runWorkflowWithUpdateWithStart` method and re-run the client. \ No newline at end of file diff --git a/core/src/main/java/io/temporal/samples/updatewithstart/StartWorkflowRequest.java b/core/src/main/java/io/temporal/samples/updatewithstart/StartWorkflowRequest.java new file mode 100644 index 00000000..36ca9f42 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/updatewithstart/StartWorkflowRequest.java @@ -0,0 +1,13 @@ +package io.temporal.samples.updatewithstart; + +public class StartWorkflowRequest { + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + private String value; +} diff --git a/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartClient.java b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartClient.java new file mode 100644 index 00000000..f05d8a25 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartClient.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.updatewithstart; + +import io.temporal.api.enums.v1.WorkflowIdConflictPolicy; +import io.temporal.api.enums.v1.WorkflowIdReusePolicy; +import io.temporal.client.*; +import io.temporal.serviceclient.WorkflowServiceStubs; +import java.util.UUID; + +public class UpdateWithStartClient { + private static final String TASK_QUEUE = "UpdateWithStartTQ"; + private static final String WORKFLOW_ID_PREFIX = "update-with-start-"; + + public static void main(String[] args) { + WorkflowClient client = setupWorkflowClient(); + var opts = buildWorkflowOptions(); + runWorkflowWithUpdateWithStart(client, opts); + runWorkflowWithUpdateWithStart(client, opts); + } + + // Set up the WorkflowClient + public static WorkflowClient setupWorkflowClient() { + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + return WorkflowClient.newInstance(service); + } + + // Run workflow using 'updateWithStart' + private static void runWorkflowWithUpdateWithStart( + WorkflowClient client, WorkflowOptions options) { + + var args = new StartWorkflowRequest(); + args.setValue(UUID.randomUUID().toString()); + + UpdateWithStartWorkflow workflow = + client.newWorkflowStub(UpdateWithStartWorkflow.class, options); + + try { + // // First I tried this to updateWithStart + // var result = + // WorkflowClient.executeUpdateWithStart( + // workflow::putApplication, + // args, + // UpdateOptions.newBuilder().build(), + // new WithStartWorkflowOperation<>(workflow::execute, args)); + + var handle = + WorkflowClient.startUpdateWithStart( + workflow::putApplication, + args, + UpdateOptions.newBuilder() + .setWaitForStage(WorkflowUpdateStage.COMPLETED) + .build(), + new WithStartWorkflowOperation<>(workflow::execute, args)); + var result = handle.getResult(); + + System.out.println( + "Workflow UwS with value: " + + result.getInitArgs().getValue() + + ", with updates count:" + + result.getUpdates().size()); + + System.out.println( + "Workflow QUERY with initArgs: " + + workflow.getState().getInitArgs().getValue() + + "\n with updates: " + + workflow.getState().getUpdates().size() + + "\nwith execute args " + + workflow.getState().getExecuteArgs().getValue()); + + } catch (WorkflowExecutionAlreadyStarted e) { + System.err.println("WorkflowAlreadyStarted" + e); + } catch (WorkflowServiceException e) { + System.err.println("WorkflowServiceException" + e.getCause()); + } catch (Exception e) { + System.err.println( + "UpdateWithStart failed: " + e.getMessage() + "/" + e.getClass().getCanonicalName()); + } + } + + // https://docs.temporal.io/develop/java/message-passing + // Build WorkflowOptions with task queue and unique ID + private static WorkflowOptions buildWorkflowOptions() { + return WorkflowOptions.newBuilder() + .setTaskQueue(TASK_QUEUE) + .setWorkflowIdReusePolicy( + WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY) + .setWorkflowIdConflictPolicy( + WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING) + .setWorkflowId(WORKFLOW_ID_PREFIX + System.currentTimeMillis()) + .build(); + } +} diff --git a/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorker.java b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorker.java new file mode 100644 index 00000000..3bf6819c --- /dev/null +++ b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorker.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.updatewithstart; + +import io.temporal.client.WorkflowClient; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; + +public class UpdateWithStartWorker { + private static final String TASK_QUEUE = "UpdateWithStartTQ"; + + public static void main(String[] args) { + WorkflowClient client = UpdateWithStartClient.setupWorkflowClient(); + startWorker(client); + } + + private static void startWorker(WorkflowClient client) { + WorkerFactory factory = WorkerFactory.newInstance(client); + Worker worker = factory.newWorker(TASK_QUEUE); + + worker.registerWorkflowImplementationTypes(UpdateWithStartWorkflowImpl.class); + + factory.start(); + System.out.println("Worker started"); + } +} diff --git a/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflow.java b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflow.java new file mode 100644 index 00000000..f9086a91 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflow.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.updatewithstart; + +import io.temporal.workflow.QueryMethod; +import io.temporal.workflow.UpdateMethod; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +@WorkflowInterface +public interface UpdateWithStartWorkflow { + @WorkflowMethod + void execute(StartWorkflowRequest args); + + @UpdateMethod + UpdateWithStartWorkflowState putApplication(StartWorkflowRequest args); + + @QueryMethod + UpdateWithStartWorkflowState getState(); +} diff --git a/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflowImpl.java b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflowImpl.java new file mode 100644 index 00000000..14cc2218 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflowImpl.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.updatewithstart; + +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UpdateWithStartWorkflowImpl implements UpdateWithStartWorkflow { + private static final Logger log = LoggerFactory.getLogger(UpdateWithStartWorkflowImpl.class); + private UpdateWithStartWorkflowState state; + + @WorkflowInit + public UpdateWithStartWorkflowImpl(StartWorkflowRequest args) { + this.state = new UpdateWithStartWorkflowState(); + this.state.setInitArgs(args); + System.out.println("WorkflowInit args = " + args); + } + + @Override + public void execute(StartWorkflowRequest args) { + log.info("Workflow started {}", args); + + this.state.setExecuteArgs(args); + System.out.println("execute called"); + + Workflow.await(() -> this.state.getUpdates().size() == 2); + } + + @Override + public UpdateWithStartWorkflowState putApplication(StartWorkflowRequest args) { + + this.state.getUpdates().add(args); + System.out.println("put application called " + this.state.getUpdates().size()); + return this.state; + } + + @Override + public UpdateWithStartWorkflowState getState() { + System.out.println("getState called " + this.state.getInitArgs().getValue()); + return this.state; + } +} diff --git a/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflowState.java b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflowState.java new file mode 100644 index 00000000..1c4aba04 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/updatewithstart/UpdateWithStartWorkflowState.java @@ -0,0 +1,36 @@ +package io.temporal.samples.updatewithstart; + +import java.util.ArrayList; +import java.util.List; + +public class UpdateWithStartWorkflowState { + private StartWorkflowRequest initArgs; + private StartWorkflowRequest executeArgs; + private List updates = new ArrayList<>(); + + public UpdateWithStartWorkflowState() {} + + public StartWorkflowRequest getInitArgs() { + return initArgs; + } + + public void setInitArgs(StartWorkflowRequest initArgs) { + this.initArgs = initArgs; + } + + public List getUpdates() { + return updates; + } + + public void setUpdates(List updates) { + this.updates = updates; + } + + public StartWorkflowRequest getExecuteArgs() { + return executeArgs; + } + + public void setExecuteArgs(StartWorkflowRequest executeArgs) { + this.executeArgs = executeArgs; + } +}