|
6 | 6 |
|
7 | 7 | import datadog.communication.ddagent.SharedCommunicationObjects;
|
8 | 8 | import datadog.trace.api.Config;
|
| 9 | +import datadog.trace.api.flare.TracerFlare; |
9 | 10 | import datadog.trace.api.time.TimeSource;
|
10 | 11 | import datadog.trace.core.monitor.HealthMetrics;
|
| 12 | +import java.io.IOException; |
| 13 | +import java.util.ArrayList; |
11 | 14 | import java.util.concurrent.TimeUnit;
|
12 | 15 | import java.util.concurrent.atomic.AtomicInteger;
|
| 16 | +import java.util.zip.ZipOutputStream; |
13 | 17 | import org.jctools.queues.MessagePassingQueue;
|
14 | 18 | import org.jctools.queues.MpscBlockingConsumerArrayQueue;
|
15 | 19 | import org.slf4j.Logger;
|
@@ -44,6 +48,7 @@ public interface Element {
|
44 | 48 | }
|
45 | 49 |
|
46 | 50 | private static class DelayingPendingTraceBuffer extends PendingTraceBuffer {
|
| 51 | + private static final Logger log = LoggerFactory.getLogger(DelayingPendingTraceBuffer.class); |
47 | 52 | private static final long FORCE_SEND_DELAY_MS = TimeUnit.SECONDS.toMillis(5);
|
48 | 53 | private static final long SEND_DELAY_NS = TimeUnit.MILLISECONDS.toNanos(500);
|
49 | 54 | private static final long SLEEP_TIME_MS = 100;
|
@@ -78,6 +83,7 @@ public void enqueue(Element pendingTrace) {
|
78 | 83 |
|
79 | 84 | @Override
|
80 | 85 | public void start() {
|
| 86 | + TracerFlare.addReporter(new TracerDump(this)); |
81 | 87 | worker.start();
|
82 | 88 | }
|
83 | 89 |
|
@@ -235,6 +241,45 @@ public DelayingPendingTraceBuffer(
|
235 | 241 | config, bufferSize, sharedCommunicationObjects, healthMetrics)
|
236 | 242 | : null;
|
237 | 243 | }
|
| 244 | + |
| 245 | + static class TracerDump implements TracerFlare.Reporter { |
| 246 | + |
| 247 | + private final DelayingPendingTraceBuffer buffer; |
| 248 | + |
| 249 | + public TracerDump(DelayingPendingTraceBuffer buffer) { |
| 250 | + this.buffer = buffer; |
| 251 | + } |
| 252 | + |
| 253 | + @Override |
| 254 | + public void addReportToFlare(ZipOutputStream zip) throws IOException { |
| 255 | + TracerFlare.addText(zip, "trace_dump.txt", getDumpText()); |
| 256 | + } |
| 257 | + |
| 258 | + private String getDumpText() { |
| 259 | + StringBuilder dumpText = new StringBuilder(); |
| 260 | + try { |
| 261 | + Element head = buffer.queue.poll(1, TimeUnit.SECONDS); |
| 262 | + ArrayList<Element> dumpSpans = new ArrayList<>(); |
| 263 | + while (head != null && dumpSpans.size() < 100) { // temp arbitrary limit |
| 264 | + dumpSpans.add(head); |
| 265 | + head = buffer.queue.poll(); |
| 266 | + } |
| 267 | + for (Element e : dumpSpans) { |
| 268 | + buffer.queue.offer(e); |
| 269 | + if (e instanceof PendingTrace) { |
| 270 | + PendingTrace trace = (PendingTrace) e; |
| 271 | + for (DDSpan span : trace.getSpans()) { |
| 272 | + dumpText.append(span.toString()).append("\n"); |
| 273 | + } |
| 274 | + } |
| 275 | + } |
| 276 | + } catch (InterruptedException e) { |
| 277 | + log.error("Error with polling the buffer queue. Buffer: {}", buffer); |
| 278 | + Thread.currentThread().interrupt(); |
| 279 | + } |
| 280 | + return dumpText.toString(); |
| 281 | + } |
| 282 | + } |
238 | 283 | }
|
239 | 284 |
|
240 | 285 | static class DiscardingPendingTraceBuffer extends PendingTraceBuffer {
|
|
0 commit comments