8000 Alternative API Security sampling algorithm by smola · Pull Request #8961 · DataDog/dd-trace-java · GitHub
[go: up one dir, main page]

Skip to content

Alternative API Security sampling algorithm #8961

8000
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
wip
  • Loading branch information
smola committed Jun 11, 2025
commit ce506c980d829e30f7ca955a8f80cd7f27e9e79b
Original file line number Diff line number Diff line change
Expand Up @@ -686,12 +686,14 @@ private NoopFlow onRequestEnded(RequestContext ctx_, IGSpanInfo spanInfo) {

Collection<AppSecEvent> collectedEvents = ctx.transferCollectedEvents();

final Object route = tags.get(Tags.HTTP_ROUTE);
if (route != null) {
ctx.setRoute(route.toString());
if (apiSecurityPostProcessor != null) {
final Object route = tags.get(Tags.HTTP_ROUTE);
if (route != null) {
ctx.setRoute(route.toString());
}
// TODO: Move this to traceSegmentPostProcessors
apiSecurityPostProcessor.processTraceSegment(traceSeg, ctx, null);
}
// TODO: Move this to traceSegmentPostProcessors
apiSecurityPostProcessor.processTraceSegment(traceSeg, ctx, null);

for (TraceSegmentPostProcessor pp : this.traceSegmentPostProcessors) {
pp.processTraceSegment(traceSeg, ctx, collectedEvents);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import com.datadog.appsec.event.EventProducerService
import com.datadog.appsec.event.ExpiredSubscriberInfoException
import com.datadog.appsec.event.data.KnownAddresses
import com.datadog.appsec.gateway.AppSecRequestContext
import datadog.trace.api.gateway.RequestContext
import datadog.trace.api.ProductTraceSource
import datadog.trace.api.config.AppSecConfig
import datadog.trace.api.config.GeneralConfig
import datadog.trace.api.internal.TraceSegment
import datadog.trace.bootstrap.instrumentation.api.AgentSpan
import datadog.trace.bootstrap.instrumentation.api.Tags
import datadog.trace.test.util.DDSpecification

class ApiSecurityProcessorTest extends DDSpecification {
Expand All @@ -29,18 +31,14 @@ class ApiSecurityProcessorTest extends DDSpecification {
1 * producer.getDataSubscribers(KnownAddresses.WAF_CONTEXT_PROCESSOR) >> subInfo
1 * subInfo.isEmpty() >> false
1 * producer.publishDataEvent(_, ctx, _, _)
1 * ctx.commitDerivatives(traceSegment)
1 * ctx.closeWafContext()
1 * ctx.close()
1 * traceSegment.setTagTop('asm.keep', true)
0 * _
}

void 'no schema extracted if sampling is false'() {
given:
def sampler = Mock(ApiSecuritySampler)
def producer = Mock(EventProducerService)
def span = Mock(AgentSpan)
def reqCtx = Mock(RequestContext)
def ctx = Mock(AppSecRequestContext)
def traceSegment = Mock(TraceSegment)
def processor = new ApiSecurityProcessor(sampler, producer)
Expand All @@ -50,45 +48,30 @@ class ApiSecurityProcessorTest extends DDSpecification {

then:
noExceptionThrown()
1 * span.getRequestContext() >> reqCtx
1 * reqCtx.getData(_) >> ctx
1 * sampler.sample(ctx) >> false
1 * ctx.closeWafContext()
1 * ctx.close()
0 * _
}

void 'permit is released even if request context close throws'() {
void 'process null appsec request context does nothing'() {
given:
def sampler = Mock(ApiSecuritySampler)
def producer = Mock(EventProducerService)
def span = Mock(AgentSpan)
def reqCtx = Mock(RequestContext)
def traceSegment = Mock(TraceSegment)
def ctx = Mock(AppSecRequestContext)
def processor = new ApiSecurityProcessor(sampler, producer)

when:
processor.processTraceSegment(traceSegment, ctx, null)
processor.processTraceSegment(traceSegment, null, null)

then:
noExceptionThrown()
1 * span.getRequestContext() >> reqCtx
1 * reqCtx.getData(_) >> ctx
1 * sampler.sample(ctx) >> true
1 * reqCtx.getTraceSegment() >> traceSegment
1 * producer.getDataSubscribers(_) >> null
1 * ctx.closeWafContext()
1 * ctx.close() >> { throw new RuntimeException() }
0 * _
}

void 'context is cleaned up on timeout'() {
void 'empty event subscription does not break the process'() {
given:
def sampler = Mock(ApiSecuritySampler)
def producer = Mock(EventProducerService)
def span = Mock(AgentSpan)
def reqCtx = Mock(RequestContext)
def subInfo = Mock(EventProducerService.DataSubscriberInfo)
def traceSegment = Mock(TraceSegment)
def ctx = Mock(AppSecRequestContext)
def processor = new ApiSecurityProcessor(sampler, producer)
Expand All @@ -98,39 +81,17 @@ class ApiSecurityProcessorTest extends DDSpecification {

then:
noExceptionThrown()
1 * span.getRequestContext() >> reqCtx
1 * reqCtx.getData(_) >> ctx
1 * ctx.closeWafContext()
1 * ctx.close()
0 * _
}

void 'process null appsec request context does nothing'() {
given:
def sampler = Mock(ApiSecuritySampler)
def producer = Mock(EventProducerService)
def span = Mock(AgentSpan)
def traceSegment = Mock(TraceSegment)
def reqCtx = Mock(RequestContext)
def processor = new ApiSecurityProcessor(sampler, producer)

when:
processor.processTraceSegment(traceSegment, null, null)

then:
noExceptionThrown()
1 * span.getRequestContext() >> reqCtx
1 * reqCtx.getData(_) >> null
1 * sampler.sample(ctx) >> true
1 * producer.getDataSubscribers(_) >> subInfo
1 * subInfo.isEmpty() >> true
0 * _
}

void 'empty event subscription does not break the process'() {
void 'expired event subscription does not break the process'() {
given:
def sampler = Mock(ApiSecuritySampler)
def producer = Mock(EventProducerService)
def subInfo = Mock(EventProducerService.DataSubscriberInfo)
def span = Mock(AgentSpan)
def reqCtx = Mock(RequestContext)
def traceSegment = Mock(TraceSegment)
def ctx = Mock(AppSecRequestContext)
def processor = new ApiSecurityProcessor(sampler, producer)
Expand All @@ -140,42 +101,34 @@ class ApiSecurityProcessorTest extends DDSpecification {

then:
noExceptionThrown()
1 * span.getRequestContext() >> reqCtx
1 * reqCtx.getData(_) >> ctx
1 * sampler.sample(ctx) >> true
1 * reqCtx.getTraceSegment() >> traceSegment
1 * producer.getDataSubscribers(_) >> subInfo
1 * subInfo.isEmpty() >> true
1 * ctx.closeWafContext()
1 * ctx.close()
1 * subInfo.isEmpty() >> false
1 * producer.publishDataEvent(_, ctx, _, _) >> { throw new ExpiredSubscriberInfoException() }
0 * _
}

void 'expired event subscription does not break the process'() {
void 'test api security sampling with tracing disabled'() {
given:
injectSysConfig(GeneralConfig.APM_TRACING_ENABLED, "false")
injectSysConfig(AppSecConfig.API_SECURITY_ENABLED, "true")
def sampler = Mock(ApiSecuritySampler)
def producer = Mock(EventProducerService)
def subInfo = Mock(EventProducerService.DataSubscriberInfo)
def span = Mock(AgentSpan)
def reqCtx = Mock(RequestContext)
def producer = Mock(EventProducerService)
def traceSegment = Mock(TraceSegment)
def ctx = Mock(AppSecRequestContext)
def processor = new ApiSecurityProcessor(sampler, producer)
def ctx = Mock(AppSecRequestContext)

when:
processor.processTraceSegment(traceSegment, ctx, null)

then:
noExceptionThrown()
1 * span.getRequestContext() >> reqCtx
1 * reqCtx.getData(_) >> ctx
1 * sampler.sample(ctx) >> true
1 * reqCtx.getTraceSegment() >> traceSegment
1 * producer.getDataSubscribers(_) >> subInfo
1 * producer.getDataSubscribers(KnownAddresses.WAF_CONTEXT_PROCESSOR) >> subInfo
1 * subInfo.isEmpty() >> false
1 * producer.publishDataEvent(_, ctx, _, _) >> { throw new ExpiredSubscriberInfoException() }
1 * ctx.closeWafContext()
1 * ctx.close()
1 * producer.publishDataEvent(_, ctx, _, _)
1 * traceSegment.setTagTop('asm.keep', true)
1 * traceSegment.setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.ASM)
0 * _
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import com.datadog.appsec.event.data.KnownAddresses
import com.datadog.appsec.report.AppSecEvent
import com.datadog.appsec.report.AppSecEventWrapper
import datadog.trace.api.ProductTraceSource
import datadog.trace.api.config.GeneralConfig
import datadog.trace.api.function.TriConsumer
import datadog.trace.api.function.TriFunction
import datadog.trace.api.gateway.BlockResponseFunction
Expand Down Expand Up @@ -1202,26 +1201,6 @@ class GatewayBridgeSpecification extends DDSpecification {
0 * traceSegment.setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.ASM)
}

void 'test api security sampling with tracing disabled'() {
given:
injectSysConfig(GeneralConfig.APM_TRACING_ENABLED, "false")
AppSecRequestContext mockAppSecCtx = Mock(AppSecRequestContext)
RequestContext mockCtx = Stub(RequestContext) {
getData(RequestContextSlot.APPSEC) >> mockAppSecCtx
getTraceSegment() >> traceSegment
}
IGSpanInfo spanInfo = Mock(AgentSpan)
when:
def flow = requestEndedCB.apply(mockCtx, spanInfo)
then:
1 * mockAppSecCtx.transferCollectedEvents() >> []
1 * spanInfo.getTags() >> ['http.route': 'route']
1 * apiSecurityProcessor.processTraceSegment(_, _, _ )
1 * traceSegment.setTagTop(Tags.ASM_KEEP, true)
1 * traceSegment.setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.ASM)
}


void 'test default writeRequestHeaders'(){
given:
def allowedHeaders = ['x-allowed-header', 'x-multiple-allowed-header', 'x-always-included'] as Set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ import datadog.trace.common.sampling.SingleSpanSampler
import datadog.trace.common.writer.ddagent.PrioritizationStrategy.PublishResult
import datadog.trace.core.CoreSpan
import datadog.trace.core.DDSpan
import datadog.trace.core.DDSpanContext
import datadog.trace.core.PendingTrace
import datadog.trace.core.monitor.HealthMetrics
import datadog.trace.bootstrap.instrumentation.api.SpanPostProcessor
import datadog.trace.test.util.DDSpecification
import spock.util.concurrent.PollingConditions

Expand Down Expand Up @@ -152,56 +149,6 @@ class TraceProcessingWorkerTest extends DDSpecification {
priority << [SAMPLER_DROP, USER_DROP, SAMPLER_KEEP, USER_KEEP, UNSET]
}

def "trace should be post-processed"() {
setup:
AtomicInteger acceptedCount = new AtomicInteger()
PayloadDispatcherImpl countingDispatcher = Mock(PayloadDispatcherImpl)
countingDispatcher.addTrace(_) >> {
acceptedCount.getAndIncrement()
}
HealthMetrics healthMetrics = Mock(HealthMetrics)

def span1 = DDSpan.create("test", 0, Mock(DDSpanContext) {
getTraceCollector() >> Mock(PendingTrace) {
getCurrentTimeNano() >> 0
}
}, [])
def processedSpan1 = false

// Span 2 - should NOT be post-processed
def span2 = DDSpan.create("test", 0, Mock(DDSpanContext) {
getTraceCollector() >> Mock(PendingTrace) {
getCurrentTimeNano() >> 0
}
}, [])
def processedSpan2 = false

SpanPostProcessor.Holder.INSTANCE = Mock(SpanPostProcessor) {
process(span1, _) >> { processedSpan1 = true }
process(span2, _) >> { processedSpan2 = true }
}

TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics,
countingDispatcher, {
false
}, FAST_LANE, 100, TimeUnit.SECONDS, null)
worker.start()

when: "traces are submitted"
worker.publish(span1, SAMPLER_KEEP, [span1, span2])
worker.publish(span2, SAMPLER_KEEP, [span1, span2])

then: "traces are passed through unless rejected on submission"
conditions.eventually {
assert processedSpan1
assert processedSpan2
}

cleanup:
SpanPostProcessor.Holder.INSTANCE = SpanPostProcessor.Holder.NOOP
worker.close()
}

def "traces should be processed"() {
setup:
AtomicInteger acceptedCount = new AtomicInteger()
Expand Down
Loading
0