8000 working writer · DataDog/dd-trace-java@0b4f8a8 · GitHub
[go: up one dir, main page]

Skip to content

Commit 0b4f8a8

Browse files
committed
working writer
1 parent 2f1f735 commit 0b4f8a8

File tree

16 files changed

+538
-20
lines changed

16 files changed

+538
-20
lines changed

communication/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ dependencies {
44
implementation libs.slf4j
55

66
api project(':remote-config:remote-config-api')
7+
implementation project(':components:json')
78
implementation project(':remote-config:remote-config-core')
89
implementation project(':internal-api')
910
implementation project(':utils:container-utils')

dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import java.util.List;
1010
import java.util.Map;
1111
import javax.annotation.Nonnull;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
1214

1315
public class DDLLMObsSpan implements LLMObsSpan {
1416

@@ -29,6 +31,8 @@ public class DDLLMObsSpan implements LLMObsSpan {
2931

3032
private boolean finished = false;
3133

34+
private static final Logger LOGGER = LoggerFactory.getLogger(DDLLMObsSpan.class);
35+
3236
public DDLLMObsSpan(
3337
@Nonnull String kind,
3438
String spanName,
@@ -77,6 +81,7 @@ public void annotateIO(
7781
if (finished) {
7882
return;
7983
}
84+
LOGGER.warn("ANNOTATE IN {} OUT {}", inputData, outputData);
8085
if (inputData != null && !inputData.isEmpty()) {
8186
this.span.setTag(INPUT, inputData);
8287
}

dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObsConstants.java

Lines changed: 0 additions & 5 deletions
This file was deleted.

dd-trace-core/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ dependencies {
6868
implementation project(':components:json')
6969
implementation project(':utils:container-utils')
7070
implementation project(':utils:socket-utils')
71+
72+
implementation group: 'org.msgpack', name: 'msgpack-core', version: '0.8.10'
73+
implementation group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.10'
74+
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.10.0'
75+
7176
// for span exception debugging
7277
compileOnly project(':dd-java-agent:agent-debugger:debugger-bootstrap')
7378

dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import java.util.concurrent.TimeUnit;
1818
import okhttp3.HttpUrl;
1919
import okhttp3.OkHttpClient;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
2022

2123
public class DDAgentWriter extends RemoteWriter {
2224

@@ -28,6 +30,8 @@ public static DDAgentWriterBuilder builder() {
2830

2931
public static class DDAgentWriterBuilder {
3032

33+
private static final Logger LOGGER = LoggerFactory.getLogger(DDAgentWriterBuilder.class);
34+
3135
String agentHost = DEFAULT_AGENT_HOST;
3236
int traceAgentPort = DEFAULT_TRACE_AGENT_PORT;
3337
String unixDomainSocket = null;
@@ -151,6 +155,8 @@ public DDAgentWriter build() {
151155
}
152156

153157
final DDAgentMapperDiscovery mapperDiscovery = new DDAgentMapperDiscovery(featureDiscovery);
158+
LOGGER.warn("ADDING MAPPER IN AGENT WRITER {} ", mapperDiscovery);
159+
154160
final PayloadDispatcher dispatcher =
155161
new PayloadDispatcherImpl(mapperDiscovery, agentApi, healthMetrics, monitoring);
156162
final TraceProcessingWorker traceProcessingWorker =

dd-trace-core/src/main/java/datadog/trace/common/writer/DDIntakeWriter.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import java.util.EnumMap;
1313
import java.util.Map;
1414
import java.util.concurrent.TimeUnit;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
1517

1618
public class DDIntakeWriter extends RemoteWriter {
1719

@@ -40,6 +42,8 @@ public static class DDIntakeWriterBuilder {
4042

4143
private SingleSpanSampler singleSpanSampler;
4244

45+
private static final Logger log = LoggerFactory.getLogger(DDIntakeWriterBuilder.class);
46+
4347
public DDIntakeWriterBuilder addTrack(final TrackType trackType, final RemoteApi intakeApi) {
4448
tracks.put(trackType, intakeApi);
4549
return this;
@@ -98,6 +102,7 @@ public DDIntakeWriterBuilder singleSpanSampler(SingleSpanSampler singleSpanSampl
98102
}
99103

100104
public DDIntakeWriter build() {
105+
log.debug("DDINTAKEWRITER TRACKS {}", tracks);
101106
if (tracks.isEmpty()) {
102107
throw new IllegalArgumentException("At least one track needs to be configured");
103108
}
@@ -111,7 +116,11 @@ public DDIntakeWriter build() {
111116
.map(this::createDispatcher)
112117
.toArray(PayloadDispatcher[]::new);
113118
dispatcher = new CompositePayloadDispatcher(dispatchers);
119+
for (PayloadDispatcher dispatcher2 : dispatchers) {
120+
log.debug("COMP DISPATCHER {}", dispatcher2);
121+
}
114122
}
123+
log.debug("DISPATCHER {}", dispatcher);
115124

116125
final TraceProcessingWorker traceProcessingWorker =
117126
new TraceProcessingWorker(

dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,10 @@ public static Writer createWriter(
3939
final Sampler sampler,
4040
final SingleSpanSampler singleSpanSampler,
4141
final HealthMetrics healthMetrics) {
42-
return createWriter(
43-
config, commObjects, sampler, singleSpanSampler, healthMetrics, config.getWriterType());
42+
Writer w =
43+
createWriter(
44+
config, commObjects, sampler, singleSpanSampler, healthMetrics, config.getWriterType());
45+
return w;
4446
}
4547

4648
public static Writer createWriter(
@@ -51,14 +53,20 @@ public static Writer createWriter(
5153
final HealthMetrics healthMetrics,
5254
String configuredType) {
5355

56+
log.debug("START CREATE WRITER {}", configuredType);
57+
5458
if (LOGGING_WRITER_TYPE.equals(configuredType)) {
59+
log.debug("STARTED WRITER LOGGING");
5560
return new LoggingWriter();
5661
} else if (PRINTING_WRITER_TYPE.equals(configuredType)) {
62+
log.debug("STARTED WRITER PRINTING");
5763
return new PrintingWriter(System.out, true);
5864
} else if (configuredType.startsWith(TRACE_STRUCTURE_WRITER_TYPE)) {
65+
log.debug("STARTED WRITER TRACE STRCT");
5966
return new TraceStructureWriter(
6067
Strings.replace(configuredType, TRACE_STRUCTURE_WRITER_TYPE, ""));
6168
} else if (configuredType.startsWith(MULTI_WRITER_TYPE)) {
69+
log.debug("STARTED WRITER MULTI");
6270
return new MultiWriter(
6371
config, commObjects, sampler, singleSpanSampler, healthMetrics, configuredType);
6472
}
@@ -82,7 +90,8 @@ public static Writer createWriter(
8290

8391
// The AgentWriter doesn't support the CI Visibility protocol. If CI Visibility is
8492
// enabled, check if we can use the IntakeWriter instead.
85-
if (DD_AGENT_WRITER_TYPE.equals(configuredType) && config.isCiVisibilityEnabled()) {
93+
if (DD_AGENT_WRITER_TYPE.equals(configuredType) && (config.isCiVisibilityEnabled())) {
94+
log.info("SUPPORTS EVP PROXY {}", featuresDiscovery.supportsEvpProxy());
8695
if (featuresDiscovery.supportsEvpProxy() || config.isCiVisibilityAgentlessEnabled()) {
8796
configuredType = DD_INTAKE_WRITER_TYPE;
8897
} else {
@@ -116,6 +125,10 @@ public static Writer createWriter(
116125
builder.addTrack(TrackType.CITESTCOV, coverageApi);
117126
}
118127

128+
final RemoteApi llmobsApi =
129+
createDDIntakeRemoteApi(config, commObjects, featuresDiscovery, TrackType.LLMOBS);
130+
builder.addTrack(TrackType.LLMOBS, llmobsApi);
131+
119132
remoteWriter = builder.build();
120133

121134
} else { // configuredType == DDAgentWriter
@@ -171,26 +184,40 @@ private static RemoteApi createDDIntakeRemoteApi(
171184
SharedCommunicationObjects commObjects,
172185
DDAgentFeaturesDiscovery featuresDiscovery,
173186
TrackType trackType) {
174-
if (featuresDiscovery.supportsEvpProxy() && !config.isCiVisibilityAgentlessEnabled()) {
187+
// TODO make it so that it is agentless for the requested product and not both
188+
if (featuresDiscovery.supportsEvpProxy()
189+
&& !config.isCiVisibilityAgentlessEnabled()
190+
&& !config.isLlmObsAgentlessEnabled()) {
175191
return DDEvpProxyApi.builder()
176192
.httpClient(commObjects.okHttpClient)
177193
.agentUrl(commObjects.agentUrl)
178194
.evpProxyEndpoint(featuresDiscovery.getEvpProxyEndpoint())
179195
.trackType(trackType)
180196
.compressionEnabled(featuresDiscovery.supportsContentEncodingHeadersWithEvpProxy())
181197
.build();
182-
183198
} else {
184199
HttpUrl hostUrl = null;
200+
String llmObsAgentlessUrl = config.getLlMObsAgentlessUrl();
201+
log.debug("LLMOBS URL {}", llmObsAgentlessUrl);
202+
185203
if (config.getCiVisibilityAgentlessUrl() != null) {
186204
hostUrl = HttpUrl.get(config.getCiVisibilityAgentlessUrl());
187205
log.info("Using host URL '{}' to report CI Visibility traces in Agentless mode.", hostUrl);
206+
} else if (config.isLlmObsEnabled()
207+
&& config.isLlmObsAgentlessEnabled()
208+
&& llmObsAgentlessUrl != null
209+
&& !llmObsAgentlessUrl.isEmpty()) {
210+
hostUrl = HttpUrl.get(llmObsAgentlessUrl);
211+
log.info("Using host URL '{}' to report LLM Obs traces in Agentless mode.", hostUrl);
188212
}
189-
return DDIntakeApi.builder()
190-
.hostUrl(hostUrl)
191-
.apiKey(config.getApiKey())
192-
.trackType(trackType)
193-
.build();
213+
RemoteApi ddintake =
214+
DDIntakeApi.builder()
215+
.hostUrl(hostUrl)
216+
.apiKey(config.getApiKey())
217+
.trackType(trackType)
218+
.build();
219+
log.debug("CREATED DD INTAKE for track {} {}", trackType.name(), ddintake);
220+
return ddintake;
194221
}
195222
}
196223

dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ public void addResponseListener(final RemoteResponseListener listener) {
9090
public Response sendSerializedTraces(final Payload payload) {
9191
final int sizeInBytes = payload.sizeInBytes();
9292
String tracesEndpoint = featuresDiscovery.getTraceEndpoint();
93+
log.warn("SENDING AGENT PL {} {}", payload, tracesEndpoint);
94+
9395
if (null == tracesEndpoint) {
9496
featuresDiscovery.discoverIfOutdated();
9597
tracesEndpoint = featuresDiscovery.getTraceEndpoint();
@@ -124,9 +126,11 @@ public Response sendSerializedTraces(final Payload payload) {
124126
handleAgentChange(response.header(DATADOG_AGENT_STATE));
125127
if (response.code() != 200) {
126128
agentErrorCounter.incrementErrorCount(response.message(), payload.traceCount());
129+
log.error("FAILED TO SEND AGENT PL {}", response.message());
127130
countAndLogFailedSend(payload.traceCount(), sizeInBytes, response, null);
128131
return Response.failed(response.code());
129132
}
133+
log.info("SUCCESS SEND AGENT PL {}", response);
130134
countAndLogSuccessfulSend(payload.traceCount(), sizeInBytes);
131135
String responseString = null;
132136
try {
@@ -146,6 +150,7 @@ public Response sendSerializedTraces(final Payload payload) {
146150
}
147151
}
148152
} catch (final IOException e) {
153+
log.error("FAILED TO SEND AGENT PL", e);
149154
countAndLogFailedSend(payload.traceCount(), sizeInBytes, null, e);
150155
return Response.failed(e);
151156
}

dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDEvpProxyApi.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ private DDEvpProxyApi(
128128
public Response sendSerializedTraces(Payload payload) {
129129
final int sizeInBytes = payload.sizeInBytes();
130130

131+
log.debug("SENDING PL {} TO TRACK {}", payload, trackType);
131132
Request.Builder builder =
132133
new Request.Builder()
133134
.url(proxiedApiUrl)

dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeApi.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.net.ConnectException;
1717
import java.util.Locale;
1818
import java.util.concurrent.TimeUnit;
19+
import okhttp3.Headers;
1920
import okhttp3.HttpUrl;
2021
import okhttp3.OkHttpClient;
2122
import okhttp3.Request;
@@ -121,16 +122,33 @@ private DDIntakeApi(
121122

122123
@Override
123124
public Response sendSerializedTraces(Payload payload) {
125+
log.debug("DDINTAKE SENDING {} for track {}", payload, trackType);
124126
final int sizeInBytes = payload.sizeInBytes();
125127

126-
final Request request =
128+
Request.Builder builder =
127129
new Request.Builder()
128130
.url(intakeUrl)
129131
.addHeader(DD_API_KEY_HEADER, apiKey)
130-
.addHeader(CONTENT_ENCODING_HEADER, GZIP_CONTENT_TYPE)
131132
.post(payload.toRequest())
132-
.tag(OkHttpUtils.CustomListener.class, telemetryListener)
133-
.build();
133+
.tag(OkHttpUtils.CustomListener.class, telemetryListener);
134+
135+
// if (!trackType.equals(TrackType.LLMOBS)) {
136+
// builder.addHeader(CONTENT_ENCODING_HEADER, GZIP_CONTENT_TYPE);
137+
// }
138+
builder.addHeader(CONTENT_ENCODING_HEADER, GZIP_CONTENT_TYPE);
139+
builder.addHeader("content-type", payload.toRequest().contentType().toString());
140+
141+
final Request request = builder.build();
142+
Headers headers = request.headers();
143+
log.warn("HEADER SIZE {}", headers.size());
144+
for (int i = 0; i < headers.size(); i++) {
145+
String name = headers.name(i);
146+
String value = headers.value(i);
147+
if (name != null && !name.equals(DD_API_KEY_HEADER)) {
148+
log.warn("HEADER {} KEY {} VAL {}", i, name, value);
149+
}
150+
}
151+
134152
totalTraces += payload.traceCount();
135153
receivedTraces += payload.traceCount();
136154

@@ -143,6 +161,7 @@ public Response sendSerializedTraces(Payload payload) {
143161
InstrumentationBridge.getMetricCollector()
144162
.add(CiVisibilityCountMetric.ENDPOINT_PAYLOAD_DROPPED, 1, trackType.endpoint);
145163
countAndLogFailedSend(payload.traceCount(), sizeInBytes, response, null);
164+
log.error("FAILED TO SEND FOR TRACK {}", trackType);
146165
return Response.failed(response.code());
147166
}
148167

0 commit comments

Comments
 (0)
0