8000 Avoid disconnected traces when using Kotlin flowOn (#8651) · DataDog/dd-trace-java@32046a3 · GitHub
[go: up one dir, main page]

Skip to content

Commit 32046a3

Browse files
authored
Avoid disconnected traces when using Kotlin flowOn (#8651)
* Restore parent scope stack when completing coroutines
1 parent a1a175c commit 32046a3

File tree

6 files changed

+79
-14
lines changed

6 files changed

+79
-14
lines changed

dd-java-agent/instrumentation/kotlin-coroutines/coroutines-1.5/src/test/groovy/KotlinCoroutineInstrumentationTest.groovy

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import datadog.trace.core.DDSpan
22
import datadog.trace.instrumentation.kotlin.coroutines.AbstractKotlinCoroutineInstrumentationTest
33
import kotlinx.coroutines.CoroutineDispatcher
4-
import spock.lang.Ignore
54

65
class KotlinCoroutineInstrumentationTest extends AbstractKotlinCoroutineInstrumentationTest<KotlinCoroutineTests> {
76

@@ -44,7 +43,6 @@ class KotlinCoroutineInstrumentationTest extends AbstractKotlinCoroutineInstrume
4443
[dispatcherName, dispatcher] << dispatchersToTest
4544
}
4645

47-
@Ignore("Not working: disconnected trace")
4846
def "kotlin trace consistent after flow"() {
4947
setup:
5048
KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher)

dd-java-agent/instrumentation/kotlin-coroutines/coroutines-1.5/src/test/kotlin/KotlinCoroutineTests.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class KotlinCoroutineTests(dispatcher: CoroutineDispatcher) : CoreKotlinCoroutin
4646
emit(1)
4747
}.flowOn(Dispatchers.IO)
4848
val ff = f.single()
49-
// FIXME: This span is detached
49+
5050
childSpan("outside-flow").activateAndUse {
5151
println("hello $ff")
5252
}

dd-java-agent/instrumentation/kotlin-coroutines/src/main/java/datadog/trace/instrumentation/kotlin/coroutines/CoroutineContextHelper.java

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,24 @@
11
package datadog.trace.instrumentation.kotlin.coroutines;
22

3+
import java.lang.invoke.MethodHandle;
4+
import java.lang.invoke.MethodHandles;
5+
import java.lang.invoke.MethodType;
36
import kotlin.coroutines.CoroutineContext;
47
import kotlinx.coroutines.AbstractCoroutine;
8+
import kotlinx.coroutines.ChildHandle;
59
import kotlinx.coroutines.Job;
10+
import kotlinx.coroutines.JobNode;
11+
import kotlinx.coroutines.JobSupport;
612
import org.jetbrains.annotations.Nullable;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
715

816
public class CoroutineContextHelper {
17+
private static final Logger log = LoggerFactory.getLogger(CoroutineContextHelper.class);
18+
919
/*
1020
IntelliJ shows a warning here for Job being out of bounds, but that's not true, the class compiles.
1121
*/
12-
1322
@Nullable
1423
@SuppressWarnings("unchecked")
1524
public static Job getJob(final CoroutineContext context) {
@@ -40,7 +49,57 @@ public static void closeScopeStateContext(final AbstractCoroutine<?> coroutine)
4049
final ScopeStateCoroutineContext scopeStackContext =
4150
getScopeStateContext(coroutine.getContext());
4251
if (scopeStackContext != null) {
43-
scopeStackContext.maybeCloseScopeAndCancelContinuation(coroutine);
52+
scopeStackContext.maybeCloseScopeAndCancelContinuation(coroutine, getParentJob(coroutine));
53+
}
54+
}
55+
56+
private static final MethodHandle PARENT_HANDLE_METHOD;
57+
private static final MethodHandle PARENT_HANDLE_FIELD;
58+
private static final MethodHandle JOB_FIELD;
59+
60+
static {
61+
MethodH 6D40 andle parentHandleMethod = null;
62+
MethodHandle parentHandleField = null;
63+
MethodHandle jobField = null;
64+
65+
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
66+
try {
67+
// Kotlin coroutines 1.5+
68+
parentHandleMethod =
69+
lookup.findVirtual(
70+
JobSupport.class,
71+
"getParentHandle$kotlinx_coroutines_core",
72+
MethodType.methodType(ChildHandle.class));
73+
jobField = lookup.findGetter(JobNode.class, "job", JobSupport.class);
74+
} catch (Throwable ignore) {
75+
try {
76+
// Kotlin coroutines 1.3
77+
parentHandleField = lookup.findGetter(JobSupport.class, "parentHandle", ChildHandle.class);
78+
jobField = lookup.findGetter(JobNode.class, "job", Job.class);
79+
} catch (Throwable e) {
80+
log.debug("Unable to access parent handle", e);
81+
}
82+
}
83+
84+
PARENT_HANDLE_METHOD = parentHandleMethod;
85+
PARENT_HANDLE_FIELD = parentHandleField;
86+
JOB_FIELD = jobField;
87+
}
88+
89+
private static Job getParentJob(JobSupport coroutine) {
90+
try {
91+
Object parentHandle = null;
92+
if (null != PARENT_HANDLE_METHOD) {
93+
parentHandle = PARENT_HANDLE_METHOD.invoke(coroutine);
94+
} else if (null != PARENT_HANDLE_FIELD) {
95+
parentHandle = PARENT_HANDLE_FIELD.invoke(coroutine);
96+
}
97+
if (parentHandle instanceof JobNode) {
98+
return (Job) JOB_FIELD.invoke((JobNode) parentHandle);
99+
}
100+
} catch (Throwable e) {
101+
log.debug("Unable to extract parent job", e);
44102
}
103+
return null;
45104
}
46105
}

dd-java-agent/instrumentation/kotlin-coroutines/src/main/java/datadog/trace/instrumentation/kotlin/coroutines/ScopeStateCoroutineContext.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,20 @@ public ScopeState updateThreadContext(@NotNull final CoroutineContext coroutineC
5252
}
5353

5454
/** If there's a context item for the coroutine then try to close it */
55-
public void maybeCloseScopeAndCancelContinuation(final Job coroutine) {
55+
public void maybeCloseScopeAndCancelContinuation(final Job coroutine, final Job parent) {
5656
final ScopeStateCoroutineContextItem contextItem = contextItemPerCoroutine.get(coroutine);
5757
if (contextItem != null) {
58-
final ScopeState currentThreadScopeState = AgentTracer.get().newScopeState();
59-
currentThreadScopeState.fetchFromActive();
58+
ScopeState currentThreadScopeState = null;
59+
if (parent != null) {
60+
final ScopeStateCoroutineContextItem parentItem = contextItemPerCoroutine.get(parent);
61+
if (parentItem != null) {
62+
currentThreadScopeState = parentItem.getScopeState();
63+
}
64+
}
65+
if (currentThreadScopeState == null) {
66+
currentThreadScopeState = AgentTracer.get().newScopeState();
67+
currentThreadScopeState.fetchFromActive();
68+
}
6069

6170
contextItem.maybeCloseScopeAndCancelContinuation();
6271
contextItemPerCoroutine.remove(coroutine);
@@ -107,6 +116,10 @@ public ScopeStateCoroutineContextItem() {
107116
coroutineScopeState = AgentTracer.get().newScopeState();
108117
}
109118

119+
public ScopeState getScopeState() {
120+
return coroutineScopeState;
121+
}
122+
110123
public void activate() {
111124
coroutineScopeState.activate();
112125

dd-java-agent/instrumentation/kotlin-coroutines/src/testFixtures/groovy/datadog/trace/instrumentation/kotlin/coroutines/AbstractKotlinCoroutineInstrumentationTest.groovy

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import datadog.trace.core.DDSpan
55
import kotlinx.coroutines.CoroutineDispatcher
66
import kotlinx.coroutines.Dispatchers
77
import kotlinx.coroutines.ThreadPoolDispatcherKt
8-
import spock.lang.Ignore
98
import spock.lang.Shared
109

1110
abstract class AbstractKotlinCoroutineInstrumentationTest<T extends CoreKotlinCoroutineTests> extends AgentTestRunner {
@@ -366,7 +365,6 @@ abstract class AbstractKotlinCoroutineInstrumentationTest<T extends CoreKotlinCo
366365
[dispatcherName, dispatcher] << dispatchersToTest
367366
}
368367

369-
@Ignore("Not working: disconnected trace")
370368
def "kotlin trace consistent with timeout"() {
371369
setup:
372370
CoreKotlinCoroutineTests kotlinTest = getCoreKotlinCoroutineTestsInstance(dispatcher)
@@ -389,7 +387,7 @@ abstract class AbstractKotlinCoroutineInstrumentationTest<T extends CoreKotlinCo
389387
}
390388
span(2) {
391389
operationName "3-after-timeout"
392-
childOf span(5)
390+
childOf span(1)
393391
}
394392
span(3) {
395393
operationName "4-after-timeout-2"
@@ -406,7 +404,6 @@ abstract class AbstractKotlinCoroutineInstrumentationTest<T extends CoreKotlinCo
406404
[dispatcherName, dispatcher] << dispatchersToTest
407405
}
408406

409-
@Ignore("Not working: disconnected trace")
410407
def "kotlin trace consistent after delay"() {
411408
setup:
412409
CoreKotlinCoroutineTests kotlinTest = getCoreKotlinCoroutineTestsInstance(dispatcher)

dd-java-agent/instrumentation/kotlin-coroutines/src/testFixtures/kotlin/datadog/trace/instrumentation/kotlin/coroutines/CoreKotlinCoroutineTests.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,6 @@ abstract class CoreKotlinCoroutineTests(private val dispatcher: CoroutineDispatc
320320
delay(10)
321321
}
322322
}
323-
// FIXME: This span is detached
324323
childSpan("3-after-timeout").activateAndUse {
325324
delay(10)
326325
}
@@ -351,7 +350,6 @@ abstract class CoreKotlinCoroutineTests(private val dispatcher: CoroutineDispatc
351350
}
352351
}
353352

354-
// FIXME: This span is detached
355353
tracedChild("after-process")
356354

357355
6

0 commit comments

Comments
 (0)
0