-
Notifications
You must be signed in to change notification settings - Fork 310
Inject trace context into AWS Step Functions input #7585
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
7f8721d
a974597
7ecfa8c
fa36832
22c652b
eb642b0
0778f3d
f9d0eea
421856b
895341a
9d76dec
aba1119
e56f2d3
3039780
370df6d
a644a8b
fc6e570
ee83940
973cd79
96f7f39
fdb01a2
8417f6e
0aac239
2992508
da7af0b
ca13195
2d4507d
29a651c
a434846
1197c1d
0f304c7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
muzzle { | ||
pass { | ||
group = "software.amazon.awssdk" | ||
module = "sfn" | ||
versions = "[2.15.35,)" | ||
assertInverse = true | ||
} | ||
} | ||
|
||
apply from: "$rootDir/gradle/java.gradle" | ||
|
||
addTestSuiteForDir('latestDepTest', 'test') | ||
addTestSuiteExtendingForDir('latestDepForkedTest', 'latestDepTest', 'test') | ||
|
||
dependencies { | ||
compileOnly group: 'software.amazon.awssdk', name: 'sfn', version: '2.15.35' | ||
|
||
// Include httpclient instrumentation for testing because it is a dependency for aws-sdk. | ||
testImplementation project(':dd-java-agent:instrumentation:apache-httpclient-4') | ||
testImplementation project(':dd-java-agent:instrumentation:aws-java-sdk-2.2') | ||
testImplementation 'software.amazon.awssdk:sfn:2.27.2' | ||
testImplementation 'org.testcontainers:localstack:1.19.7' | ||
DylanLovesCoffee marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
latestDepTestImplementation group: 'software.amazon.awssdk', name: 'sfn', version: '+' | ||
} | ||
|
||
tasks.withType(Test).configureEach { | ||
usesService(testcontainersLimit) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
package datadog.trace.instrumentation.aws.v2.sfn; | ||
|
||
import datadog.trace.bootstrap.instrumentation.api.AgentSpan; | ||
|
||
public class InputAttributeInjector { | ||
public static String buildTraceContext(AgentSpan span) { | ||
// Extract span tags | ||
StringBuilder spanTagsJSON = new StringBuilder(); | ||
nhulston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
spanTagsJSON.append('{'); | ||
span.getTags() | ||
.forEach( | ||
(tagKey, tagValue) -> | ||
spanTagsJSON | ||
.append("\"&qu 10000 ot;) | ||
DylanLovesCoffee marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.append(tagKey) | ||
.append("\":\"") | ||
.append(tagValue) | ||
.append("\",")); | ||
spanTagsJSON.setLength(spanTagsJSON.length() - 1); // remove trailing comma | ||
spanTagsJSON.append('}'); | ||
|
||
// Build DD trace context object | ||
String ddTraceContextJSON = | ||
String.format( | ||
"\"_datadog\": { \"x-datadog-trace-id\": \"%s\",\"x-datadog-parent-id\":\"%s\", \"x-datadog-tags\": %s }", | ||
span.getTraceId().toString(), span.getSpanId(), spanTagsJSON); | ||
|
||
return ddTraceContextJSON; | ||
} | ||
|
||
public static StringBuilder getModifiedInput(String request, String ddTraceContextJSON) { | ||
DylanLovesCoffee marked this conversation as resolved.
Show resolved
Hide resolved
|
||
StringBuilder modifiedInput = new StringBuilder(request); | ||
int startPos = modifiedInput.indexOf("{"); | ||
nhulston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
int endPos = modifiedInput.lastIndexOf("}"); | ||
String inputContent = modifiedInput.substring(startPos + 1, endPos); | ||
if (inputContent.isEmpty()) { | ||
modifiedInput.insert(endPos, ddTraceContextJSON); | ||
} else { | ||
modifiedInput.insert( | ||
endPos, String.format(", %s", ddTraceContextJSON)); // prepend comma to existing input | ||
DylanLovesCoffee marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return modifiedInput; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
package datadog.trace.instrumentation.aws.v2.sfn; | ||
|
||
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; | ||
import static net.bytebuddy.matcher.ElementMatchers.isMethod; | ||
|
||
import com.google.auto.service.AutoService; | ||
import datadog.trace.agent.tooling.Instrumenter; | ||
import datadog.trace.agent.tooling.InstrumenterModule; | ||
import java.util.List; | ||
import net.bytebuddy.asm.Advice; | ||
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; | ||
|
||
/** AWS SDK v2 Step Function instrumentation */ | ||
@AutoService(InstrumenterModule.class) | ||
public final class SfnClientInstrumentation extends InstrumenterModule.Tracing | ||
implements Instrumenter.ForSingleType { | ||
|
||
public SfnClientInstrumentation() { | ||
super("sfn", "aws-sdk"); | ||
} | ||
|
||
@Override | ||
public String instrumentedType() { | ||
return "software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder"; | ||
} | ||
|
||
@Override | ||
public void methodAdvice(MethodTransformer transformer) { | ||
transformer.applyAdvice( | ||
isMethod().and(named("resolveExecutionInterceptors")), | ||
SfnClientInstrumentation.class.getName() + "$AwsSfnBuilderAdvice"); | ||
} | ||
|
||
@Override | ||
public String[] helperClassNames() { | ||
return new String[] {packageName + ".SfnInterceptor", packageName + ".InputAttributeInjector"}; | ||
} | ||
|
||
public static class AwsSfnBuilderAdvice { | ||
@Advice.OnMethodExit(suppress = Throwable.class) | ||
public static void addHandler(@Advice.Return final List<ExecutionInterceptor> interceptors) { | ||
for (ExecutionInterceptor interceptor : interceptors) { | ||
if (interceptor instanceof SfnInterceptor) { | ||
return; | ||
} | ||
} | ||
interceptors.add(new SfnInterceptor()); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
package datadog.trace.instrumentation.aws.v2.sfn; | ||
|
||
import datadog.trace.bootstrap.InstanceStore; | ||
import datadog.trace.bootstrap.instrumentation.api.AgentSpan; | ||
import software.amazon.awssdk.core.SdkRequest; | ||
import software.amazon.awssdk.core.interceptor.Context; | ||
import software.amazon.awssdk.core.interceptor.ExecutionAttribute; | ||
import software.amazon.awssdk.core.interceptor.ExecutionAttributes; | ||
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; | ||
import software.amazon.awssdk.services.sfn.model.StartExecutionRequest; | ||
import software.amazon.awssdk.services.sfn.model.StartSyncExecutionRequest; | ||
|
||
public class SfnInterceptor implements ExecutionInterceptor { | ||
|
||
public static final ExecutionAttribute<AgentSpan> SPAN_ATTRIBUTE = | ||
InstanceStore.of(ExecutionAttribute.class) | ||
.putIfAbsent("DatadogSpan", () -> new ExecutionAttribute<>("DatadogSpan")); | ||
|
||
public SfnInterceptor() {} | ||
|
||
@Override | ||
public SdkRequest modifyRequest( | ||
nhulston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Context.ModifyRequest context, ExecutionAttributes executionAttributes) { | ||
final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE); | ||
// StartExecutionRequest | ||
if (context.request() instanceof StartExecutionRequest) { | ||
StartExecutionRequest request = (StartExecutionRequest) context.request(); | ||
if (request.input() == null) { | ||
return request; | ||
} | ||
String ddTraceContextJSON = InputAttributeInjector.buildTraceContext(span); | ||
// Inject the trace context into the Step Function input | ||
StringBuilder modifiedInput = | ||
InputAttributeInjector.getModifiedInput(request.input(), ddTraceContextJSON); | ||
|
||
return request.toBuilder().input(modifiedInput.toString()).build(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can be deduplicated using a dedicated method: SdkRequest injectTraceContext(request, span) {
String traceContext = InputAttributeInjector.buildTraceContext(span);
// Inject the trace context into the Step Function input
String modifiedInput = InputAttributeInjector.getModifiedInput(request.input(), traceContext);
return request.toBuilder().input(modifiedInput).build()
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since I had to handle both |
||
} | ||
|
||
// StartSyncExecutionRequest | ||
if (context.request() instanceof StartSyncExecutionRequest) { | ||
StartSyncExecutionRequest request = (StartSyncExecutionRequest) context.request(); | ||
if (request.input() == null) { | ||
return request; | ||
} | ||
String ddTraceContextJSON = InputAttributeInjector.buildTraceContext(span); | ||
// Inject the trace context into the Step Function input | ||
StringBuilder modifiedInput = | ||
InputAttributeInjector.getModifiedInput(request.input(), ddTraceContextJSON); | ||
|
||
return request.toBuilder().input(modifiedInput.toString()).build(); | ||
} | ||
|
||
return context.request(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
import datadog.trace.agent.test.naming.VersionedNamingTestBase | ||
import datadog.trace.agent.test.utils.TraceUtils | ||
import datadog.trace.api.DDSpanTypes | ||
import datadog.trace.bootstrap.instrumentation.api.Tags | ||
import groovy.json.JsonSlurper | ||
import org.testcontainers.containers.GenericContainer | ||
import org.testcontainers.utility.DockerImageName | ||
import software.amazon.awssdk.services.sfn.SfnClient | ||
import software.amazon.awssdk.services.sfn.model.StartExecutionResponse | ||
import software.amazon.awssdk.regions.Region | ||
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider | ||
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials | ||
import spock.lang.Shared | ||
|
||
import java.time.Duration | ||
|
||
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan | ||
|
||
|
||
abstract class SfnClientTest extends VersionedNamingTestBase { | ||
static final LOCALSTACK = new GenericContainer(DockerImageName.parse("localstack/localstack")) | ||
DylanLovesCoffee marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.withExposedPorts(4566) | ||
.withEnv("SERVICES", "stepfunctions") | ||
.withReuse(true) | ||
.withStartupTimeout(Duration.ofSeconds(120)) | ||
|
||
@Shared SfnClient sfnClient | ||
DylanLovesCoffee marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
@Shared String testStateMachineARN | ||
|
||
def setupSpec() { | ||
LOCALSTACK.start() | ||
def endPoint = "http://" + LOCALSTACK.getHost() + ":" + LOCALSTACK.getMappedPort(4566) | ||
sfnClient = SfnClient.builder() | ||
.endpointOverride(URI.create(endPoint)) | ||
.region(Region.US_EAST_1) | ||
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test", "test"))) | ||
.build() | ||
|
||
def response = sfnClient.createStateMachine { builder -> | ||
builder.name("testStateMachine") | ||
.definition("{\"StartAt\": \"HelloWorld\", \"States\": {\"HelloWorld\": {\"Type\": \"Pass\", \"End\": true}}}") | ||
.build() | ||
} | ||
testStateMachineARN = response.stateMachineArn() | ||
} | ||
|
||
def cleanupSpec() { | ||
LOCALSTACK.stop() | ||
} | ||
|
||
def "Step Functions span is created"() { | ||
when: | ||
StartExecutionResponse response | ||
TraceUtils.runUnderTrace('parent', { | ||
response = sfnClient.startExecution { builder -> | ||
builder.stateMachineArn(testStateMachineARN) | ||
.input("{\"key\": \"value\"}") | ||
.build() | ||
} | ||
}) | ||
|
||
def endPoint = "http://" + LOCALSTACK.getHost() + ":" + LOCALSTACK.getMappedPort(4566) | ||
nhulston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
then: | ||
assertTraces(1) { | ||
trace(2) { | ||
basicSpan(it, "parent") | ||
span { | ||
serviceName "java-aws-sdk" | ||
operationName "aws.http" | ||
DylanLovesCoffee marked this conversation as resolved.
Show resolved
Hide resolved
|
||
resourceName "Sfn.StartExecution" | ||
spanType DDSpanTypes.HTTP_CLIENT | ||
errored false | ||
measured true | ||
childOf(span(0)) | ||
tags { | ||
"$Tags.COMPONENT" "java-aws-sdk" | ||
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT | ||
"$Tags.HTTP_URL" endPoint+'/' | ||
"$Tags.HTTP_METHOD" "POST" | ||
"$Tags.HTTP_STATUS" 200 | ||
"$Tags.PEER_PORT" LOCALSTACK.getMappedPort(4566) | ||
"$Tags.PEER_HOSTNAME" LOCALSTACK.getHost() | ||
"aws.service" "Sfn" | ||
"aws.operation" "StartExecution" | ||
"aws.agent" "java-aws-sdk" | ||
"aws.requestId" response.responseMetadata().requestId() | ||
"aws_service" "Sfn" | ||
defaultTags() | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
def "Trace context is injected to Step Functions input"() { | ||
when: | ||
StartExecutionResponse response | ||
TraceUtils.runUnderTrace('parent', { | ||
response = sfnClient.startExecution { builder -> | ||
builder.stateMachineArn(testStateMachineARN) | ||
.input("{\"key\": \"value\"}") | ||
.build() | ||
} | ||
}) | ||
|
||
then: | ||
def execution = sfnClient.describeExecution { builder -> | ||
builder.executionArn(response.executionArn()) | ||
.build() | ||
} | ||
def input = new JsonSlurper().parseText(execution.input()) | ||
input["key"] == "value" | ||
input["_datadog"]["x-datadog-trace-id"] != null | ||
input["_datadog"]["x-datadog-parent-id"] != null | ||
input["_datadog"]["x-datadog-tags"] != null | ||
} | ||
} | ||
|
||
class SfnClientV0Test extends SfnClientTest { | ||
@Override | ||
int version() { | ||
0 | ||
} | ||
|
||
@Override | ||
String service() { | ||
return null | ||
} | ||
|
||
@Override | ||
String operation() { | ||
return null | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.