8000 Added @ActivityInterface by mfateev · Pull Request #56 · temporalio/sdk-java · GitHub
[go: up one dir, main page]

Skip to content

Added @ActivityInterface #56

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 4, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added @ActivityInterface implementation to POJOActivityTaskHandler
  • Loading branch information
mfateev committed Apr 2, 2020
commit be87de4f69e65f3b5a31cd4de30201d8ac3d164a
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ public final class InternalUtils {
* @return "Simple class name"_"methodName"
*/
public static String getSimpleName(Method method) {
return method.getDeclaringClass().getSimpleName() + "_" + method.getName();
return getSimpleName(method.getDeclaringClass(), method);
}

public static String getSimpleName(Class<?> type, Method method) {
return type.getSimpleName() + "_" + method.getName();
}

public static String getWorkflowType(Method method, WorkflowMethod workflowMethod) {
Expand Down
151 changes: 129 additions & 22 deletions src/main/java/io/temporal/internal/sync/POJOActivityTaskHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
package io.temporal.internal.sync;

import com.google.common.base.Joiner;
import com.google.common.reflect.TypeToken;
import com.google.common.base.Objects;
import com.uber.m3.tally.Scope;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import io.temporal.client.ActivityCancelledException;
import io.temporal.common.MethodRetry;
Expand All @@ -36,11 +37,14 @@
import io.temporal.proto.workflowservice.RespondActivityTaskFailedRequest;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.testing.SimulatedTimeoutException;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -85,30 +89,29 @@ private void addActivityImplementation(
+ "\" This annotation can be used only on the interface method it implements.");
}
}
TypeToken<?>.TypeSet interfaces = TypeToken.of(cls).getTypes().interfaces();
if (interfaces.isEmpty()) {
throw new IllegalArgumentException("Activity must implement at least one interface");
Set<MethodInterfacePair> activityMethods =
getAnnotatedInterfaceMethods(cls, ActivityInterface.class);
if (activityMethods.isEmpty()) {
throw new IllegalArgumentException(
"Class doesn't implement any non empty interface annotated with @ActivityInterface: "
+ cls.getName());
}
for (TypeToken<?> i : interfaces) {
if (i.getType().getTypeName().startsWith("org.mockito")) {
continue;
for (MethodInterfacePair pair : activityMethods) {
Method method = pair.getMethod();
ActivityMethod annotation = method.getAnnotation(ActivityMethod.class);
String activityType;
if (annotation != null && !annotation.name().isEmpty()) {
activityType = annotation.name();
} else {
activityType = InternalUtils.getSimpleName(pair.getType(), method);
}
for (Method method : i.getRawType().getMethods()) {
ActivityMethod annotation = method.getAnnotation(ActivityMethod.class);
String activityType;
if (annotation != null && !annotation.name().isEmpty()) {
activityType = annotation.name();
} else {
activityType = InternalUtils.getSimpleName(method);
}
if (activities.containsKey(activityType)) {
throw new IllegalStateException(
activityType + " activity type is already registered with the worker");
}

ActivityTaskExecutor implementation = newTaskExecutor.apply(method, activity);
activities.put(activityType, implementation);
if (activities.containsKey(activityType)) {
throw new IllegalStateException(
activityType + " activity type is already registered with the worker");
}

ActivityTaskExecutor implementation = newTaskExecutor.apply(method, activity);
activities.put(activityType, implementation);
}
}

Expand Down Expand Up @@ -270,4 +273,108 @@ public ActivityTaskHandler.Result execute(ActivityTaskImpl task, Scope metricsSc
}
}
}

static class MethodInterfacePair {
private final Method method;
private final Class<?> type;

MethodInterfacePair(Method method, Class<?> type) {
this.method = method;
this.type = type;
}

public Method getMethod() {
return method;
}

public Class<?> getType() {
return type;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MethodInterfacePair that = (MethodInterfacePair) o;
return Objects.equal(method, that.method) && Objects.equal(type, that.type);
}

@Override
public int hashCode() {
return Objects.hashCode(method, type);
}

@Override
public String toString() {
return "MethodInterfacePair{" + "method=" + method + ", type=" + type + '}';
}
}

/** Used to override equals and hashCode of Method to ensure deduping by method name in a set. */
static class MethodWrapper {
private final Method method;

MethodWrapper(Method method) {
this.method = method;
}

public Method getMethod() {
return method;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MethodWrapper that = (MethodWrapper) o;
return Objects.equal(method.getName(), that.method.getName());
}

@Override
public int hashCode() {
return Objects.hashCode(method.getName());
}
}

Set<MethodInterfacePair> getAnnotatedInterfaceMethods(
Class<?> implementationClass, Class<? extends Annotation> annotationClass) {
if (implementationClass.isInterface()) {
throw new IllegalArgumentException(
"Concrete class expected. Found interface: " + implementationClass.getSimpleName());
}
Set<MethodInterfacePair> pairs = new HashSet<>();
// Methods inherited from interfaces that are not annotated with @ActivityInterface
Set<MethodWrapper> ignored = new HashSet<>();
getAnnotatedInterfaceMethods(implementationClass, annotationClass, ignored, pairs);
return pairs;
}

private void getAnnotatedInterfaceMethods(
Class<?> current,
Class<? extends Annotation> annotationClass,
Set<MethodWrapper> methods,
Set<MethodInterfacePair> result) {
// Using set to dedupe methods which are defined in both non activity parent and current
Set<MethodWrapper> ourMethods = new HashSet<>();
if (current.isInterface()) {
Method[] declaredMethods = current.getDeclaredMethods();
for (int i = 0; i < declaredMethods.length; i++) {
Method declaredMethod = declaredMethods[i];
ourMethods.add(new MethodWrapper(declaredMethod));
}
}
Class<?>[] interfaces = current.getInterfaces();
for (int i = 0; i < interfaces.length; i++) {
Class<?> anInterface = interfaces[i];
getAnnotatedInterfaceMethods(anInterface, annotationClass, ourMethods, result);
}
Annotation annotation = current.getAnnotation(annotationClass);
if (annotation == null) {
methods.addAll(ourMethods);
return;
}
for (MethodWrapper method : ourMethods) {
result.add(new MethodInterfacePair(method.getMethod(), current));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import io.grpc.Status;
import io.temporal.activity.Activity;
import io.temporal.activity.ActivityInterface;
import io.temporal.client.ActivityCancelledException;
import io.temporal.testing.TestActivityEnvironment;
import io.temporal.workflow.ActivityFailureException;
Expand All @@ -43,6 +44,7 @@ public void setUp() {
testEnvironment = TestActivityEnvironment.newInstance();
}

@ActivityInterface
public interface TestActivity {

String activity1(String input);
Expand Down Expand Up @@ -106,6 +108,7 @@ public void testHeartbeat() {
assertEquals("details1", details.get());
}

@ActivityInterface
public interface InterruptibleTestActivity {
void activity1() throws InterruptedException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.mockito.Mockito.when;

import io.temporal.activity.Activity;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityOptions;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowClientOptions;
Expand Down Expand Up @@ -165,6 +166,7 @@ public void testFailure() {
}
}

@ActivityInterface
public interface TestActivity {
String activity1(String input);
}
Expand Down Expand Up @@ -490,6 +492,7 @@ public void testConcurrentDecision() throws ExecutionException, InterruptedExcep
log.info(testEnvironment.getDiagnostics());
}

@ActivityInterface
public interface TestCancellationActivity {
String activity1(String input);
}
Expand Down
Loading
0