8000 Implemented WebSocket support for Netty 4.1 · DataDog/dd-trace-java@e619f2a · GitHub
[go: up one dir, main page]

Skip to content

Commit e619f2a

Browse files
Implemented WebSocket support for Netty 4.1
1 parent 52b943d commit e619f2a

File tree

8 files changed

+377
-6
lines changed

8 files changed

+377
-6
lines changed

dd-java-agent/instrumentation/netty-4.1-shared/src/main/java/datadog/trace/instrumentation/netty41/AttributeKeys.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import datadog.trace.api.GenericClassValue;
66
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
77
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
8+
import datadog.trace.bootstrap.instrumentation.websocket.HandlerContext;
89
import io.netty.handler.codec.http.HttpHeaders;
910
import io.netty.util.AttributeKey;
1011
import java.util.concurrent.ConcurrentHashMap;
@@ -33,6 +34,9 @@ public final class AttributeKeys {
3334
public static final AttributeKey<Boolean> BLOCKED_RESPONSE_KEY =
3435
attributeKey("datadog.server.blocked_response");
3536

37+
public static final AttributeKey<HandlerContext.Sender> WEBSOCKET_HANDLER_CONTEXT =
38+
attributeKey("datadog.server.websocket.handler_context");
39+
3640
/**
3741
* Generate an attribute key or reuse the one existing in the global app map. This implementation
3842
* creates attributes only once even if the current class is loaded by several class loaders and

dd-java-agent/instrumentation/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/NettyChannelPipelineInstrumentation.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import datadog.trace.instrumentation.netty41.server.HttpServerResponseTracingHandler;
2323
import datadog.trace.instrumentation.netty41.server.HttpServerTracingHandler;
2424
import datadog.trace.instrumentation.netty41.server.MaybeBlockResponseHandler;
25+
import datadog.trace.instrumentation.netty41.server.websocket.WebSocketProtocolHandshakeHandler;
26+
import datadog.trace.instrumentation.netty41.server.websocket.WebSocketServerTracingHandler;
2527
import io.netty.channel.ChannelHandler;
2628
import io.netty.channel.ChannelHandlerContext;
2729
import io.netty.channel.ChannelPipeline;
@@ -31,6 +33,7 @@
3133
import io.netty.handler.codec.http.HttpResponseDecoder;
3234
import io.netty.handler.codec.http.HttpResponseEncoder;
3335
import io.netty.handler.codec.http.HttpServerCodec;
36+
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
3437
import io.netty.util.Attribute;
3538
import net.bytebuddy.asm.Advice;
3639
import net.bytebuddy.description.type.TypeDescription;
@@ -77,6 +80,10 @@ public String[] helperClassNames() {
7780
packageName + ".server.HttpServerResponseTracingHandler",
7881
packageName + ".server.HttpServerTracingHandler",
7982
packageName + ".server.MaybeBlockResponseHandler",
83+
packageName + ".server.websocket.WebSocketProtocolHandshakeHandler",
84+
packageName + ".server.websocket.WebSocketServerTracingHandler",
85+
packageName + ".server.websocket.WebSocketServerResponseTracingHandler",
86+
packageName + ".server.websocket.WebSocketServerRequestTracingHandler",
8087
packageName + ".NettyHttp2Helper",
8188
};
8289
}
< A935 /td>
@@ -139,15 +146,19 @@ public static void addHandler(
139146
try {
140147
ChannelHandler toAdd = null;
141148
ChannelHandler toAdd2 = null;
149+
ChannelHandler toAdd3 = null;
142150
// Server pipeline handlers
143151
if (handler instanceof HttpServerCodec) {
144152
toAdd = new HttpServerTracingHandler();
145153
toAdd2 = MaybeBlockResponseHandler.INSTANCE;
154+
toAdd3 = new WebSocketServerTracingHandler();
146155
} else if (handler instanceof HttpRequestDecoder) {
147156
toAdd = HttpServerRequestTracingHandler.INSTANCE;
148157
} else if (handler instanceof HttpResponseEncoder) {
149158
toAdd = HttpServerResponseTracingHandler.INSTANCE;
150159
toAdd2 = MaybeBlockResponseHandler.INSTANCE;
160+
} else if (handler instanceof WebSocketServerProtocolHandler) {
161+
toAdd = WebSocketProtocolHandshakeHandler.INSTANCE;
151162
} else
152163
// Client pipeline handlers
153164
if (handler instanceof HttpClientCodec) {
@@ -180,6 +191,13 @@ public static void addHandler(
180191
pipeline.remove(existing2);
181192
}
182193
pipeline.addAfter(pipeline.context(toAdd).name(), null, toAdd2);
194+
if (toAdd3 != null) {
195+
ChannelHandler existing3 = pipeline.get(toAdd3.getClass());
196+
if (existing3 != null) {
197+
pipeline.remove(existing3);
198+
}
199+
pipeline.addAfter(pipeline.context(toAdd2).name(), null, toAdd3);
200+
}
183201
}
184202
}
185203
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package datadog.trace.instrumentation.netty41.server.websocket;
2+
3+
import static datadog.trace.instrumentation.netty41.AttributeKeys.WEBSOCKET_HANDLER_CONTEXT;
4+
5+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
6+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
7+
import datadog.trace.bootstrap.instrumentation.websocket.HandlerContext;
8+
import io.netty.channel.ChannelHandler;
9+
import io.netty.channel.ChannelHandlerContext;
10+
import io.netty.channel.ChannelInboundHandlerAdapter;
11+
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
12+
13+
@ChannelHandler.Sharable
14+
public class WebSocketProtocolHandshakeHandler extends ChannelInboundHandlerAdapter {
15+
public static WebSocketProtocolHandshakeHandler INSTANCE =
16+
new WebSocketProtocolHandshakeHandler();
17+
18+
@Override
19+
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
20+
if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
21+
// WebSocket Handshake Completed
22+
final AgentSpan current = AgentTracer.get().activeSpan();
23+
if (current != null) {
24+
ctx.channel()
25+
.attr(WEBSOCKET_HANDLER_CONTEXT)
26+
.set(
27+
new HandlerContext.Sender(
28+
current.getLocalRootSpan(), ctx.channel().id().asShortText()));
29+
}
30+
}
31+
super.userEventTriggered(ctx, evt);
32+
}
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package datadog.trace.instrumentation.netty41.server.websocket;
2+
3+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
4+
import static datadog.trace.bootstrap.instrumentation.decorator.WebsocketDecorator.DECORATE;
5+
import static datadog.trace.instrumentation.netty41.AttributeKeys.WEBSOCKET_HANDLER_CONTEXT;
6+
7+
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
8+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
9+
import datadog.trace.bootstrap.instrumentation.websocket.HandlerContext;
10+
import io.netty.channel.*;
11+
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
12+
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
13+
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
14+
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
15+
16+
@ChannelHandler.Sharable
17+
public class WebSocketServerRequestTracingHandler extends ChannelInboundHandlerAdapter {
18+
public static WebSocketServerRequestTracingHandler INSTANCE =
19+
new WebSocketServerRequestTracingHandler();
20+
21+
@Override
22+
public void channelRead(ChannelHandlerContext ctx, Object frame) {
23+
24+
if (!(frame instanceof WebSocketFrame)) {
25+
ctx.fireChannelRead(frame);
26+
return;
27+
}
28+
29+
Channel channel = ctx.channel();
30+
final HandlerContext.Sender sessionState = channel.attr(WEBSOCKET_HANDLER_CONTEXT).get();
31+
if (sessionState == null) {
32+
return;
33+
}
34+
35+
final HandlerContext.Receiver handlerContext =
36+
new HandlerContext.Receiver(
37+
sessionState.getHandshakeSpan(), ctx.channel().id().asShortText());
38+
39+
if (frame instanceof TextWebSocketFrame) {
40+
// WebSocket Read Text Start
41+
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
42+
43+
final AgentSpan span =
44+
DECORATE.onReceiveFrameStart(
45+
handlerContext, textFrame.text(), textFrame.isFinalFragment());
46+
if (span == null) {
47+
ctx.fireChannelRead(textFrame);
48+
} else {
49+
try (final AgentScope scope = activateSpan(span)) {
50+
ctx.fireChannelRead(textFrame);
51+
52+
// WebSocket Read Text Start
53+
if (textFrame.isFinalFragment()) {
54+
DECORATE.onFrameEnd(handlerContext);
55+
}
56+
}
57+
}
58+
return;
59+
}
60+
61+
if (frame instanceof BinaryWebSocketFrame) {
62+
// WebSocket Read Binary Start
63+
BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
64+
final AgentSpan span =
65+
DECORATE.onReceiveFrameStart(
66+
handlerContext, binaryFrame.content().nioBuffer(), binaryFrame.isFinalFragment());
67+
if (span == null) {
68+
ctx.fireChannelRead(binaryFrame);
69+
} else {
70+
try (final AgentScope scope = activateSpan(span)) {
71+
ctx.fireChannelRead(binaryFrame);
72+
73+
// WebSocket Read Binary End
74+
if (binaryFrame.isFinalFragment()) {
75+
DECORATE.onFrameEnd(handlerContext);
76+
}
77+
}
78+
}
79+
return;
80+
}
81+
82+
if (frame instanceof CloseWebSocketFrame) {
83+
// WebSocket Closed by client
84+
CloseWebSocketFrame closeFrame = (CloseWebSocketFrame) frame;
85+
int statusCode = closeFrame.statusCode();
86+
String reasonText = closeFrame.reasonText();
87+
channel.attr(WEBSOCKET_HANDLER_CONTEXT).remove();
88+
final AgentSpan span =
89+
DECORATE.onSessionCloseReceived(handlerContext, reasonText, statusCode);
90+
if (span == null) {
91+
ctx.fireChannelRead(closeFrame);
92+
} else {
93+
try (final AgentScope scope = activateSpan(span)) {
94+
ctx.fireChannelRead(closeFrame);
95+
96+
if (closeFrame.isFinalFragment()) {
97+
DECORATE.onFrameEnd(handlerContext);
98+
}
99+
}
100+
}
101+
}
102+
}
103+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package datadog.trace.instrumentation.netty41.server.websocket;
2+
3+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
4+
import static datadog.trace.bootstrap.instrumentation.decorator.WebsocketDecorator.DECORATE;
5+
import static datadog.trace.bootstrap.instrumentation.websocket.HandlersExtractor.MESSAGE_TYPE_BINARY;
6+
import static datadog.trace.bootstrap.instrumentation.websocket.HandlersExtractor.MESSAGE_TYPE_TEXT;
7+
import static datadog.trace.instrumentation.netty41.AttributeKeys.WEBSOCKET_HANDLER_CONTEXT;
8+
9+
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
10+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
11+
import datadog.trace.bootstrap.instrumentation.websocket.HandlerContext;
12+
import io.netty.channel.*;
13+
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
14+
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
15+
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
16+
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
17+
18+
@ChannelHandler.Sharable
19+
public class WebSocketServerResponseTracingHandler extends ChannelOutboundHandlerAdapter {
20+
public static WebSocketServerResponseTracingHandler INSTANCE =
21+
new WebSocketServerResponseTracingHandler();
22+
23+
@Override
24+
public void write(ChannelHandlerContext ctx, Object frame, ChannelPromise promise)
25+
throws Exception {
26+
if (!(frame instanceof WebSocketFrame)) {
27+
ctx.write(frame, promise);
28+
return;
29+
}
30+
31+
Channel channel = ctx.channel();
32+
HandlerContext.Sender handlerContext = channel.attr(WEBSOCKET_HANDLER_CONTEXT).get();
33+
if (handlerContext == null) {
34+
ctx.write(frame, promise);
35+
return;
36+
}
37+
38+
if (frame instanceof TextWebSocketFrame) {
39+
// WebSocket Write Text Start
40+
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
41+
final AgentSpan span =
42+
DECORATE.onSendFrameStart(handlerContext, MESSAGE_TYPE_TEXT, textFrame.text().length());
43+
if (span == null) {
44+
ctx.write(frame, promise);
45+
} else {
46+
try (final AgentScope scope = activateSpan(span)) {
47+
ctx.write(frame, promise);
48+
49+
// WebSocket Write Text End
50+
if (textFrame.isFinalFragment()) {
51+
DECORATE.onFrameEnd(handlerContext);
52+
}
53+
}
54+
}
55+
return;
56+
}
57+
58+
if (frame instanceof BinaryWebSocketFrame) {
59+
// WebSocket Write Binary Start
60+
BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
61+
final AgentSpan span =
62+
DECORATE.onSendFrameStart(
63+
handlerContext, MESSAGE_TYPE_BINARY, binaryFrame.content().readableBytes());
64+
if (span == null) {
65+
ctx.write(frame, promise);
66+
} else {
67+
try (final AgentScope scope = activateSpan(span)) {
68+
ctx.write(frame, promise);
69+
70+
// WebSocket Write Binary End
71+
if (binaryFrame.isFinalFragment()) {
72+
DECORATE.onFrameEnd(handlerContext);
73+
}
74+
}
75+
}
76+
return;
77+
}
78+
79+
if (frame instanceof CloseWebSocketFrame) {
80+
// WebSocket Closed by Server
81+
CloseWebSocketFrame closeFrame = (CloseWebSocketFrame) frame;
82+
int statusCode = closeFrame.statusCode();
83+
String reasonText = closeFrame.reasonText();
84+
channel.attr(WEBSOCKET_HANDLER_CONTEXT).remove();
85+
final AgentSpan span = DECORATE.onSessionCloseIssued(handlerContext, reasonText, statusCode);
86+
if (span == null) {
87+
ctx.write(frame, promise);
88+
} else {
89+
try (final AgentScope scope = activateSpan(span)) {
90+
ctx.write(frame, promise);
91+
if (closeFrame.isFinalFragment()) {
92+
DECORATE.onFrameEnd(handlerContext);
93+
}
94+
}
95+
}
96+
}
97+
}
98+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package datadog.trace.instrumentation.netty41.server.websocket;
2+
3+
import io.netty.channel.CombinedChannelDuplexHandler;
4+
5+
public class WebSocketServerTracingHandler
6+
extends CombinedChannelDuplexHandler<
7+
WebSocketServerRequestTracingHandler, WebSocketServerResponseTracingHandler> {
8+
9+
public WebSocketServerTracingHandler() {
10+
super(
11+
WebSocketServerRequestTracingHandler.INSTANCE,
12+
WebSocketServerResponseTracingHandler.INSTANCE);
13+
}
14+
}

0 commit comments

Comments
 (0)
0