@@ -95,7 +95,9 @@ public class GatewayBridge {
95
95
// subscriber cache
96
96
private volatile DataSubscriberInfo initialReqDataSubInfo ;
97
97
private volatile DataSubscriberInfo rawRequestBodySubInfo ;
98
+ private volatile DataSubscriberInfo rawResponseBodySubInfo ;
98
99
private volatile DataSubscriberInfo requestBodySubInfo ;
100
+ private volatile DataSubscriberInfo responseBodySubInfo ;
99
101
private volatile DataSubscriberInfo pathParamsSubInfo ;
100
102
private volatile DataSubscriberInfo respDataSubInfo ;
101
103
private volatile DataSubscriberInfo grpcServerMethodSubInfo ;
@@ -142,6 +144,8 @@ public void init() {
142
144
subscriptionService .registerCallback (EVENTS .responseStarted (), this ::onResponseStarted );
143
145
subscriptionService .registerCallback (EVENTS .responseHeader (), this ::onResponseHeader );
144
146
subscriptionService .registerCallback (EVENTS .responseHeaderDone (), this ::onResponseHeaderDone );
147
+ subscriptionService .registerCallback (EVENTS .responseBodyStart (), this ::onResponseBodyStart );
148
+ subscriptionService .registerCallback (EVENTS .responseBodyDone (), this ::onResponseBodyDone );
145
149
subscriptionService .registerCallback (EVENTS .grpcServerMethod (), this ::onGrpcServerMethod );
146
150
subscriptionService .registerCallback (
147
151
EVENTS .grpcServerRequestMessage (), this ::onGrpcServerRequestMessage );
@@ -164,6 +168,10 @@ public void init() {
164
168
subscriptionService .registerCallback (
165
169
EVENTS .requestBodyProcessed (), this ::onRequestBodyProcessed );
166
170
}
171
+ if (additionalIGEvents .contains (EVENTS .responseBodyProcessed ())) {
172
+ subscriptionService .registerCallback (
173
+ EVENTS .responseBodyProcessed (), this ::onResponseBodyProcessed );
174
+ }
167
175
}
168
176
169
177
/**
@@ -173,7 +181,9 @@ public void init() {
173
181
public void reset () {
174
182
initialReqDataSubInfo = null ;
175
183
rawRequestBodySubInfo = null ;
184
+ rawResponseBodySubInfo = null ;
176
185
requestBodySubInfo = null ;
186
+ responseBodySubInfo = null ;
177
187
pathParamsSubInfo = null ;
178
188
respDataSubInfo = null ;
179
189
grpcServerMethodSubInfo = null ;
@@ -596,6 +606,40 @@ private Flow<Void> onRequestBodyProcessed(RequestContext ctx_, Object obj) {
596
606
}
597
607
}
598
608
609
+ private Flow <Void > onResponseBodyProcessed (RequestContext ctx_ , Object obj ) {
610
+ AppSecRequestContext ctx = ctx_ .getData (RequestContextSlot .APPSEC );
611
+ if (ctx == null ) {
612
+ return NoopFlow .INSTANCE ;
613
+ }
614
+
615
+ if (ctx .isConvertedResBodyPublished ()) {
616
+ log .debug (
617
+ "Response body already published; will ignore new value of type {}" , obj .getClass ());
618
+ return NoopFlow .INSTANCE ;
619
+ }
620
+ ctx .setConvertedResBodyPublished (true );
621
+
622
+ while (true ) {
623
+ DataSubscriberInfo subInfo = responseBodySubInfo ;
624
+ if (subInfo == null ) {
625
+ subInfo = producerService .getDataSubscribers (KnownAddresses .RESPONSE_BODY_OBJECT );
626
+ responseBodySubInfo = subInfo ;
627
+ }
628
+ if (subInfo == null || subInfo .isEmpty ()) {
629
+ return NoopFlow .INSTANCE ;
630
+ }
631
+ DataBundle bundle =
632
+ new SingletonDataBundle <>(
633
+ KnownAddresses .RESPONSE_BODY_OBJECT , ObjectIntrospection .convert (obj , ctx ));
634
+ try {
635
+ GatewayContext gwCtx = new GatewayContext (false );
636
+ return producerService .publishDataEvent (subInfo , ctx , bundle , gwCtx );
637
+ } catch (ExpiredSubscriberInfoException e ) {
638
+ responseBodySubInfo = null ;
639
+ }
640
+ }
641
+ }
642
+
599
643
private Flow <Void > onRequestBodyDone (RequestContext ctx_ , StoredBodySupplier supplier ) {
600
644
AppSecRequestContext ctx = ctx_ .getData (RequestContextSlot .APPSEC );
601
645
if (ctx == null || ctx .isRawReqBodyPublished ()) {
@@ -614,7 +658,7 @@ private Flow<Void> onRequestBodyDone(RequestContext ctx_, StoredBodySupplier sup
614
658
}
615
659
616
660
CharSequence bodyContent = supplier .get ();
617
- if (bodyContent == null || bodyContent .length () == 0 ) {
661
+ if (bodyContent .length () == 0 ) {
618
662
return NoopFlow .INSTANCE ;
619
663
}
620
664
DataBundle bundle = new SingletonDataBundle <>(KnownAddresses .REQUEST_BODY_RAW , bodyContent );
@@ -627,6 +671,38 @@ private Flow<Void> onRequestBodyDone(RequestContext ctx_, StoredBodySupplier sup
627
671
}
628
672
}
629
673
674
+ private Flow <Void > onResponseBodyDone (RequestContext ctx_ , StoredBodySupplier supplier ) {
675
+ AppSecRequestContext ctx = ctx_ .getData (RequestContextSlot .APPSEC );
676
+ if (ctx == null || ctx .isRawResBodyPublished ()) {
677
+ return NoopFlow .INSTANCE ;
678
+ }
679
+ ctx .setRawResBodyPublished (true );
680
+
681
+ while (true ) {
682
+ DataSubscriberInfo subInfo = responseBodySubInfo ;
683
+ if (subInfo == null ) {
684
+ subInfo = producerService .getDataSubscribers (KnownAddresses .RESPONSE_BODY_OBJECT );
685
+ responseBodySubInfo = subInfo ;
686
+ }
687
+ if (subInfo == null || subInfo .isEmpty ()) {
688
+ return NoopFlow .INSTANCE ;
689
+ }
690
+
691
+ CharSequence bodyContent = supplier .toString ();
692
+ if (bodyContent .length () == 0 ) {
693
+ return NoopFlow .INSTANCE ;
694
+ }
695
+ DataBundle bundle =
696
+ new SingletonDataBundle <>(KnownAddresses .RESPONSE_BODY_OBJECT , bodyContent );
697
+ try {
698
+ GatewayContext gwCtx = new GatewayContext (false );
699
+ return producerService .publishDataEvent (subInfo , ctx , bundle , gwCtx );
700
+ } catch (ExpiredSubscriberInfoException e ) {
701
+ responseBodySubInfo = null ;
702
+ }
703
+ }
704
+ }
705
+
630
706
private Flow <Void > onRequestPathParams (RequestContext ctx_ , Map <String , ?> data ) {
631
707
AppSecRequestContext ctx = ctx_ .getData (RequestContextSlot .APPSEC );
632
708
if (ctx == null || ctx .isPathParamsPublished ()) {
@@ -663,6 +739,16 @@ private Void onRequestBodyStart(RequestContext ctx_, StoredBodySupplier supplier
663
739
return null ;
664
740
}
665
741
742
+ private Void onResponseBodyStart (RequestContext ctx_ , StoredBodySupplier supplier ) {
743
+ AppSecRequestContext ctx = ctx_ .getData (RequestContextSlot .APPSEC );
744
+ if (ctx == null ) {
745
+ return null ;
746
+ }
747
+
748
+ ctx .setStoredResponseBodySupplier (supplier );
749
+ return null ;
750
+ }
751
+
666
752
private Flow <AppSecRequestContext > onRequestStarted () {
667
753
if (!AppSecSystem .isActive ()) {
668
754
return RequestContextSupplier .EMPTY ;
@@ -1032,8 +1118,10 @@ private Flow<Void> maybePublishResponseData(AppSecRequestContext ctx) {
1032
1118
1033
1119
MapDataBundle bundle =
1034
1120
MapDataBundle .of (
1035
- KnownAddresses .RESPONSE_STATUS , String .valueOf (ctx .getResponseStatus ()),
1036
- KnownAddresses .RESPONSE_HEADERS_NO_COOKIES , ctx .getResponseHeaders ());
1121
+ KnownAddresses .RESPONSE_STATUS ,
1122
+ String .valueOf (ctx .getResponseStatus ()),
1123
+ KnownAddresses .RESPONSE_HEADERS_NO_COOKIES ,
1124
+ ctx .getResponseHeaders ());
1037
1125
1038
1126
while (true ) {
1039
1127
DataSubscriberInfo subInfo = respDataSubInfo ;
@@ -1128,6 +1216,10 @@ private static class IGAppSecEventDependencies {
1128
1216
KnownAddresses .REQUEST_BODY_RAW , l (EVENTS .requestBodyStart (), EVENTS .requestBodyDone ()));
1129
1217
DATA_DEPENDENCIES .put (KnownAddresses .REQUEST_PATH_PARAMS , l (EVENTS .requestPathParams ()));
1130
1218
DATA_DEPENDENCIES .put (KnownAddresses .REQUEST_BODY_OBJECT , l (EVENTS .requestBodyProcessed ()));
1219
+ DATA_DEPENDENCIES .put (
1220
+ KnownAddresses .RESPONSE_BODY_RAW ,
1221
+ l (EVENTS .responseBodyStart (), EVENTS .responseBodyDone ()));
1222
+ DATA_DEPENDENCIES .put (KnownAddresses .RESPONSE_BODY_OBJECT , l (EVENTS .responseBodyProcessed ()));
1131
1223
}
1132
1224
1133
1225
private static Collection <datadog .trace .api .gateway .EventType <?>> l (
0 commit comments