8000 Alternative API Security sampling algorithm by smola · Pull Request #8961 · DataDog/dd-trace-java · GitHub
[go: up one dir, main page]

Skip to content

Alternative API Security sampling algorithm #8961

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Alternative API Security sampling algorithm
  • Loading branch information
smola committed Jun 11, 2025
commit 826630fad05e41f7de0cd0403a0efbd73adb807a
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.datadog.appsec.blocking.BlockingServiceImpl;
import com.datadog.appsec.config.AppSecConfigService;
import com.datadog.appsec.config.AppSecConfigServiceImpl;
import com.datadog.appsec.config.TraceSegmentPostProcessor;
import com.datadog.appsec.ddwaf.WAFModule;
import com.datadog.appsec.event.EventDispatcher;< 8000 /span>
import com.datadog.appsec.event.ReplaceableEventProducerService;
Expand All @@ -22,7 +23,6 @@
import datadog.trace.api.telemetry.ProductChange;
import datadog.trace.api.telemetry.ProductChangeCollector;
import datadog.trace.bootstrap.ActiveSubsystems;
import datadog.trace.bootstrap.instrumentation.api.SpanPostProcessor;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -68,18 +68,6 @@ private static void doStart(SubscriptionService gw, SharedCommunicationObjects s
EventDispatcher eventDispatcher = new EventDispatcher();
REPLACEABLE_EVENT_PRODUCER.replaceEventProducerService(eventDispatcher);

ApiSecuritySampler requestSampler;
if (Config.get().isApiSecurityEnabled()) {
requestSampler = new ApiSecuritySamplerImpl();
// When DD_API_SECURITY_ENABLED=true, ths post-processor is set even when AppSec is inactive.
// This should be low overhead since the post-processor exits early if there's no AppSec
// context.
SpanPostProcessor.Holder.INSTANCE =
new ApiSecurityProcessor(requestSampler, REPLACEABLE_EVENT_PRODUCER);
} else {
requestSampler = new ApiSecuritySampler.NoOp();
}

ConfigurationPoller configurationPoller = sco.configurationPoller(config);
// may throw and abort startup
APP_SEC_CONFIG_SERVICE =
Expand All @@ -89,11 +77,15 @@ private static void doStart(SubscriptionService gw, SharedCommunicationObjects s

sco.createRemaining(config);

TraceSegmentPostProcessor apiSecurityPostProcessor =
Config.get().isApiSecurityEnabled()
? new ApiSecurityProcessor(new ApiSecuritySampler(), REPLACEABLE_EVENT_PRODUCER)
: null;
GatewayBridge gatewayBridge =
new GatewayBridge(
gw,
REPLACEABLE_EVENT_PRODUCER,
requestSampler,
apiSecurityPostProcessor,
APP_SEC_CONFIG_SERVICE.getTraceSegmentPostProcessors());

loadModules(eventDispatcher, sco.monitoring);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
package com.datadog.appsec.api.security;

import com.datadog.appsec.config.TraceSegmentPostProcessor;
import com.datadog.appsec.event.EventProducerService;
import com.datadog.appsec.event.ExpiredSubscriberInfoException;
import com.datadog.appsec.event.data.DataBundle;
import com.datadog.appsec.event.data.KnownAddresses;
import com.datadog.appsec.event.data.SingletonDataBundle;
import com.datadog.appsec.gateway.AppSecRequestContext;
import com.datadog.appsec.gateway.GatewayContext;
import datadog.trace.api.gateway.RequestContext;
import datadog.trace.api.gateway.RequestContextSlot;
import com.datadog.appsec.report.AppSecEvent;
import datadog.trace.api.Config;
import datadog.trace.api.ProductTraceSource;
import datadog.trace.api.internal.TraceSegment;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;

import datadog.trace.bootstrap.instrumentation.api.Tags;
import java.util.Collection;
import java.util.Collections;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ApiSecurityProcessor {
public class ApiSecurityProcessor implements TraceSegmentPostProcessor {

private static final Logger log = LoggerFactory.getLogger(ApiSecurityProcessor.class);
private final ApiSecuritySampler sampler;
Expand All @@ -28,39 +30,22 @@ public ApiSecurityProcessor(ApiSecuritySampler sampler, EventProducerService pro
this.producerService = producerService;
}

public void process(@Nonnull AgentSpan span) {
final RequestContext ctx_ = span.getRequestContext();
if (ctx_ == null) {
@Override
public void processTraceSegment(
TraceSegment segment, AppSecRequestContext ctx, Collection<AppSecEvent> collectedEvents) {
if (segment == null || ctx == null) {
return;
}
final AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
if (ctx == null) {
if (!sampler.sample(ctx)) {
log.debug("Request not sampled, skipping API security post-processing");
return;
}

try {
if (!sampler.sample(ctx)) {
log.debug("Request not sampled, skipping API security post-processing");
return;
}
log.debug("Request sampled, processing API security post-processing");
extractSchemas(ctx, ctx_.getTraceSegment());
} finally {
ctx.setKeepOpenForApiSecurityPostProcessing(false);
try {
// XXX: Close the additive first. This is not strictly needed, but it'll prevent getting it
// detected as a
// missed request-ended event.
ctx.closeWafContext();
ctx.close();
} catch (Exception e) {
log.debug("Error closing AppSecRequestContext", e);
}
sampler.releaseOne();
}
log.debug("Request sampled, processing API security post-processing");
extractSchemas(ctx, segment);
}

private void extractSchemas(final AppSecRequestContext ctx, final TraceSegment traceSegment) {
private void extractSchemas(
final @Nonnull AppSecRequestContext ctx, final @Nonnull TraceSegment traceSegment) {
final EventProducerService.DataSubscriberInfo sub =
producerService.getDataSubscribers(KnownAddresses.WAF_CONTEXT_PROCESSOR);
if (sub == null || sub.isEmpty()) {
Expand All @@ -74,7 +59,12 @@ private void extractSchemas(final AppSecRequestContext ctx, final TraceSegment t
try {
GatewayContext gwCtx = new GatewayContext(false);
producerService.publishDataEvent(sub, ctx, bundle, gwCtx);
ctx.commitDerivatives(traceSegment);
// TODO: Perhaps do this if schemas have actually been extracted (check when committing
// derivatives)
traceSegment.setTagTop(Tags.ASM_KEEP, true);
if (!Config.get().isApmTracingEnabled()) {
traceSegment.setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.ASM);
}
} catch (ExpiredSubscriberInfoException e) {
log.debug("Subscriber info expired", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.datadog.appsec.gateway.AppSecRequestContext;
import datadog.trace.util.AgentTaskScheduler;

import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -11,10 +10,10 @@
import java.util.concurrent.atomic.AtomicReference;

/**
* Internal map for API Security sampling.
* See "[RFC-1021] API Security Sampling Algorithm for thread-based concurrency".
* Internal map for API Security sampling. See "[RFC-1021] API Security Sampling Algorithm for
* thread-based concurrency".
*/
final public class ApiSecuritySampler {
public class ApiSecuritySampler {

private static final int DEFAULT_MAX_ITEM_COUNT = 4096;
private static final int DEFAULT_INTERVAL_SECONDS = 30;
Expand All @@ -28,10 +27,20 @@ final public class ApiSecuritySampler {
private final long maxItemCount;

public ApiSecuritySampler() {
this(DEFAULT_MAX_ITEM_COUNT, DEFAULT_INTERVAL_SECONDS, new Random().nextLong(), new DefaultMonotonicClock(), AgentTaskScheduler.INSTANCE);
this(
DEFAULT_MAX_ITEM_COUNT,
DEFAULT_INTERVAL_SECONDS,
new Random().nextLong(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Vulnerability

Use of insecure random values (...read more)

Functions as Math.random() and objects like java.util.Random() do not provide strong enough randomness. Consider using java.security.SecureRandom() instead.

View in Datadog  Leave us feedback  Documentation

new DefaultMonotonicClock(),
AgentTaskScheduler.INSTANCE);
}

public ApiSecuritySampler(final int maxItemCount, final int intervalSeconds, final long zero, final MonotonicClock clock, Executor executor) {
public ApiSecuritySampler(
final int maxItemCount,
final int intervalSeconds,
final long zero,
final MonotonicClock clock,
Executor executor) {
table = new AtomicReference<>(new Table(maxItemCount));
this.maxItemCount = maxItemCount;
this.intervalSeconds = intervalSeconds;
Expand Down Expand Up @@ -154,7 +163,7 @@ public FindSlotResult findSlot(final long key) {
index = 0;
}
} while (index != startIndex);
return new FindSlotResult(table[(int)(maxItemCount * 2)], false);
return new FindSlotResult(table[(int) (maxItemCount * 2)], false);
}

static class FindSlotResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,6 @@ public class AppSecRequestContext implements DataBundle, Closeable {
// Used to detect missing request-end event at close.
private volatile boolean requestEndCalled;

private volatile boolean keepOpenForApiSecurityPostProcessing;
private volatile Long apiSecurityEndpointHash;

private static final AtomicIntegerFieldUpdater<AppSecRequestContext> WAF_TIMEOUTS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AppSecRequestContext.class, "wafTimeouts");
private static final AtomicIntegerFieldUpdater<AppSecRequestContext> RASP_TIMEOUTS_UPDATER =
Expand Down Expand Up @@ -343,22 +340,6 @@ public void setRoute(String route) {
this.route = route;
}

public void setKeepOpenForApiSecurityPostProcessing(final boolean flag) {
this.keepOpenForApiSecurityPostProcessing = flag;
}

public boolean isKeepOpenForApiSecurityPostProcessing() {
return this.keepOpenForApiSecurityPostProcessing;
}

public void setApiSecurityEndpointHash(long hash) {
this.apiSecurityEndpointHash = hash;
}

public Long getApiSecurityEndpointHash() {
return this.apiSecurityEndpointHash;
}

void addRequestHeader(String name, String value) {
if (finishedRequestHeaders) {
throw new IllegalStateException("Request headers were said to be finished before");
Expand Down Expand Up @@ -554,23 +535,18 @@ public void close() {
if (!requestEndCalled) {
log.debug(SEND_TELEMETRY, "Request end event was not called before close");
}
// For API Security, we sometimes keep contexts open for late processing. In that case, this
// flag needs to be
// later reset by the API Security post-processor and close must be called again.
if (!keepOpenForApiSecurityPostProcessing) {
if (wafContext != null) {
log.debug(
SEND_TELEMETRY, "WAF object had not been closed (probably missed request-end event)");
closeWafContext();
}
collectedCookies = null;
requestHeaders.clear();
responseHeaders.clear();
persistentData.clear();
if (derivatives != null) {
derivatives.clear();
derivatives = null;
}
if (wafContext != null) {
log.debug(
SEND_TELEMETRY, "WAF object had not been closed (probably missed request-end event)");
closeWafContext();
}
collectedCookies = null;
requestHeaders.clear();
responseHeaders.clear();
persistentData.clear();
if (derivatives != null) {
derivatives.clear();
derivatives = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class GatewayBridge {

private final SubscriptionService subscriptionService;
private final EventProducerService producerService;
private final ApiSecuritySampler requestSampler;
private final TraceSegmentPostProcessor apiSecurityPostProcessor;
private final List<TraceSegmentPostProcessor> traceSegmentPostProcessors;

// subscriber cache
Expand All @@ -114,11 +114,11 @@ public class GatewayBridge {
public GatewayBridge(
SubscriptionService subscriptionService,
EventProducerService producerService,
ApiSecuritySampler requestSampler,
TraceSegmentPostProcessor apiSecurityPostProcessor,
List<TraceSegmentPostProcessor> traceSegmentPostProcessors) {
this.subscriptionService = subscriptionService;
this.producerService = producerService;
this.requestSampler = requestSampler;
this.apiSecurityPostProcessor = apiSecurityPostProcessor;
this.traceSegmentPostProcessors = traceSegmentPostProcessors;
}

Expand Down Expand Up @@ -679,22 +679,20 @@ private NoopFlow onRequestEnded(RequestContext ctx_, IGSpanInfo spanInfo) {
TraceSegment traceSeg = ctx_.getTraceSegment();
Map<String, Object> tags = spanInfo.getTags();

if (maybeSampleForApiSecurity(ctx, spanInfo, tags)) {
if (!Config.get().isApmTracingEnabled()) {
traceSeg.setTagTop(Tags.ASM_KEEP, true);
traceSeg.setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.ASM);
}
} else {
ctx.closeWafContext();
}

// AppSec report metric and events for web span only
if (traceSeg != null) {
traceSeg.setTagTop("_dd.appsec.enabled", 1);
traceSeg.setTagTop("_dd.runtime_family", "jvm");

Collection<AppSecEvent> collectedEvents = ctx.transferCollectedEvents();

final Object route = tags.get(Tags.HTTP_ROUTE);
if (route != null) {
ctx.setRoute(route.toString());
}
// TODO: Move this to traceSegmentPostProcessors
apiSecurityPostProcessor.processTraceSegment(traceSeg, ctx, null);

for (TraceSegmentPostProcessor pp : this.traceSegmentPostProcessors) {
pp.processTraceSegment(traceSeg, ctx, collectedEvents);
}
Expand Down Expand Up @@ -748,6 +746,7 @@ private NoopFlow onRequestEnded(RequestContext ctx_, IGSpanInfo spanInfo) {
writeRequestHeaders(
traceSeg, DEFAULT_REQUEST_HEADERS_ALLOW_LIST, ctx.getRequestHeaders(), false);
}

// If extracted any derivatives - commit them
if (!ctx.commitDerivatives(traceSeg)) {
log.debug("Unable to commit, derivatives will be skipped {}", ctx.getDerivativeKeys());
Expand All @@ -765,21 +764,11 @@ private NoopFlow onRequestEnded(RequestContext ctx_, IGSpanInfo spanInfo) {
);
}

ctx.closeWafContext();
ctx.close();
return NoopFlow.INSTANCE;
}

private boolean maybeSampleForApiSecurity(
AppSecRequestContext ctx, IGSpanInfo spanInfo, Map<String, Object> tags) {
log.debug("Checking API Security for end of request handler on span: {}", spanInfo.getSpanId());
// API Security sampling requires http.route tag.
final Object route = tags.get(Tags.HTTP_ROUTE);
if (route != null) {
ctx.setRoute(route.toString());
}
return requestSampler.preSampleRequest(ctx);
}

private Flow<Void> onRequestHeadersDone(RequestContext ctx_) {
AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
if (ctx == null || ctx.isReqDataPublished()) {
Expand Down
Loading
Loading
0