FFFF WIP: Use java thread id for thread filter by zhengyu123 · Pull Request #415 · DataDog/java-profiler · GitHub
[go: up one dir, main page]

Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ddprof-lib/src/main/cpp/flightRecorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "threadState.h"
#include "tsc.h"
#include "vmStructs.h"
#include "wallclock/wallClock.h"
#include <arpa/inet.h>
#include <cxxabi.h>
#include <errno.h>
Expand Down
19 changes: 6 additions & 13 deletions ddprof-lib/src/main/cpp/javaApi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include "tsc.h"
#include "vmEntry.h"
#include "vmStructs.h"
#include "wallClock.h"
#include <errno.h>
#include <fstream>
#include <sstream>
Expand Down Expand Up @@ -133,15 +132,11 @@ Java_com_datadoghq_profiler_JavaProfiler_getSamples(JNIEnv *env,
// We direct corresponding JNI calls to JavaCritical to make sure the parameters/return value
// still compatible in the event of signature changes in the future.
extern "C" DLLEXPORT void JNICALL
JavaCritical_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0() {
JavaCritical_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0(jlong java_tid) {
ProfiledThread *current = ProfiledThread::current();
if (unlikely(current == nullptr)) {
return;
}
int tid = current->tid();
if (unlikely(tid < 0)) {
return;
}
ThreadFilter *thread_filter = Profiler::instance()->threadFilter();
if (unlikely(!thread_filter->enabled())) {
return;
Expand All @@ -158,7 +153,7 @@ JavaCritical_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0() {
if (unlikely(slot_id == -1)) {
return; // Failed to register thread
}
thread_filter->add(tid, slot_id);
thread_filter->add(java_tid, current->tid(), slot_id);
}

extern "C" DLLEXPORT void JNICALL
Expand All @@ -167,10 +162,6 @@ JavaCritical_com_datadoghq_profiler_JavaProfiler_filterThreadRemove0() {
if (unlikely(current == nullptr)) {
return;
}
int tid = current->tid();
if (unlikely(tid < 0)) {
return;
}
ThreadFilter *thread_filter = Profiler::instance()->threadFilter();
if (unlikely(!thread_filter->enabled())) {
return;
Expand All @@ -181,14 +172,16 @@ JavaCritical_com_datadoghq_profiler_JavaProfiler_filterThreadRemove0() {
// Thread doesn't have a slot ID yet - nothing to remove
return;
}
current->setFilterSlotId(-1);
thread_filter->remove(slot_id);
}


extern "C" DLLEXPORT void JNICALL
Java_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0(JNIEnv *env,
jclass unused) {
JavaCritical_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0();
jclass unused,
jlong java_tid) {
JavaCritical_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0(java_tid);
}

extern "C" DLLEXPORT void JNICALL
Expand Down
17 changes: 9 additions & 8 deletions ddprof-lib/src/main/cpp/profiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include "flightRecorder.h"
#include "itimer.h"
#include "j9Ext.h"
#include "j9WallClock.h"
#include "wallclock/j9WallClock.h"
#include "libraryPatcher.h"
#include "objectSampler.h"
#include "os.h"
Expand All @@ -27,7 +27,8 @@
#include "thread.h"
#include "tsc.h"
#include "vmStructs.h"
#include "wallClock.h"
#include "wallclock/wallClockASGCT.h"
#include "wallclock/wallClockJVMTI.h"
#include <algorithm>
#include <dlfcn.h>
#include <fstream>
Expand Down Expand Up @@ -1187,10 +1188,10 @@ Engine *Profiler::selectCpuEngine(Arguments &args) {
}
}

Engine *Profiler::selectWallEngine(Arguments &args) {
BaseWallClock *Profiler::selectWallEngine(Arguments &args) {
if (args._wall < 0 &&
(args._event == NULL || strcmp(args._event, EVENT_WALL) != 0)) {
return &noop_engine;
return nullptr;
}
if (VM::isOpenJ9()) {
if (args._wallclock_sampler == JVMTI || !J9Ext::shouldUseAsgct() || !J9Ext::can_use_ASGCT()) {
Expand All @@ -1201,18 +1202,18 @@ Engine *Profiler::selectWallEngine(Arguments &args) {
}
j9_engine.sampleIdleThreads();
TEST_LOG("J9[wall]=jvmti");
return (Engine *)&j9_engine;
return (BaseWallClock *)&j9_engine;
} else {
TEST_LOG("J9[wall]=asgct");
return (Engine *)&wall_asgct_engine;
return &wall_asgct_engine;
}
}
switch (args._wallclock_sampler) {
case JVMTI:
return (Engine*)&wall_jvmti_engine;
return &wall_jvmti_engine;
case ASGCT:
default:
return (Engine*)&wall_asgct_engine;
return &wall_asgct_engine;
}
}

Expand Down
7 changes: 4 additions & 3 deletions ddprof-lib/src/main/cpp/profiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class FrameName;
class VMNMethod;
class StackContext;
class VM;
class BaseWallClock;

enum State { NEW, IDLE, RUNNING, TERMINATED };

Expand Down Expand Up @@ -124,7 +125,7 @@ class alignas(alignof(SpinLock)) Profiler {
CallTraceStorage _call_trace_storage;
FlightRecorder _jfr;
Engine *_cpu_engine;
Engine *_wall_engine = NULL;
BaseWallClock *_wall_engine = NULL;
Engine *_alloc_engine;
int _event_mask;

Expand Down Expand Up @@ -186,7 +187,7 @@ class alignas(alignof(SpinLock)) Profiler {
void mangle(const char *name, char *buf, size_t size);

Engine *selectCpuEngine(Arguments &args);
Engine *selectWallEngine(Arguments &args);
BaseWallClock *selectWallEngine(Arguments &args);
Engine *selectAllocEngine(Arguments &args);
Error checkJvmCapabilities();

Expand Down Expand Up @@ -240,7 +241,7 @@ class alignas(alignof(SpinLock)) Profiler {
int max_stack_depth() { return _max_stack_depth; }
time_t uptime() { return time(NULL) - _start_time; }
Engine *cpuEngine() { return _cpu_engine; }
Engine *wallEngine() { return _wall_engine; }
BaseWallClock *wallEngine() const { return _wall_engine; }

Dictionary *classMap() { return &_class_map; }
Dictionary *stringLabelMap() { return &_string_label_map; }
Expand Down
69 changes: 54 additions & 15 deletions ddprof-lib/src/main/cpp/threadFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include "threadFilter.h"
#include "arch.h"
#include "os.h"
#include "profiler.h"
#include "wallclock/wallClock.h"
#include <cassert>
#include <cstdlib>
#include <cstdio>
Expand Down Expand Up @@ -77,7 +79,7 @@ void ThreadFilter::initializeChunk(int chunk_idx) {
// Allocate and initialize new chunk completely before swapping
ChunkStorage* new_chunk = new ChunkStorage();
for (auto& slot : new_chunk->slots) {
slot.value.store(-1, std::memory_order_relaxed);
slot.value.java_tid.store(-1, std::memory_order_relaxed);
}

// Try to install it atomically
Expand Down Expand Up @@ -145,6 +147,11 @@ void ThreadFilter::initFreeList() {
}
}

bool ThreadFilter::filter_by_os_tid() {
BaseWallClock* wall_clock = Profiler::instance()->wallEngine();
return wall_clock == nullptr || wall_clock->mode() == BaseWallClock::Mode::ASGCT;
}

bool ThreadFilter::accept(SlotID slot_id) const {
// Fast path: if disabled, accept everything (relaxed to avoid fences on hot path)
if (unlikely(!_enabled.load(std::memory_order_relaxed))) {
Expand All @@ -158,12 +165,16 @@ bool ThreadFilter::accept(SlotID slot_id) const {
// This is not a fast path like the add operation.
ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_acquire);
if (likely(chunk != nullptr)) {
return chunk->slots[slot_idx].value.load(std::memory_order_relaxed) != -1;
if (filter_by_os_tid()) {
return chunk->slots[slot_idx].value.os_tid.load(std::memory_order_relaxed) != -1;
} else {
return chunk->slots[slot_idx].value.java_tid.load(std::memory_order_relaxed) != -1;
}
}
return false;
}

void ThreadFilter::add(int tid, SlotID slot_id) {
void ThreadFilter::add(long java_tid, int os_tid, SlotID slot_id) {
// PRECONDITION: slot_id must be from registerThread() or negative
// Undefined behavior for invalid positive slot_ids (performance optimization)
if (slot_id < 0) return;
Expand All @@ -174,7 +185,11 @@ void ThreadFilter::add(int tid, SlotID slot_id) {
// Fast path: assume valid slot_id from registerThread()
ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_acquire);
if (likely(chunk != nullptr)) {
chunk->slots[slot_idx].value.store(tid, std::memory_order_release);
if (filter_by_os_tid()) {
chunk->slots[slot_idx].value.os_tid.store(os_tid, std::memory_order_release);
} else {
chunk->slots[slot_idx].value.java_tid.store(java_tid, std::memory_order_release);
}
}
}

Expand All @@ -195,8 +210,11 @@ void ThreadFilter::remove(SlotID slot_id) {
if (unlikely(chunk == nullptr)) {
return;
}

chunk->slots[slot_idx].value.store(-1, std::memory_order_release);
if (filter_by_os_tid()) {
chunk->slots[slot_idx].value.java_tid.store(-1, std::memory_order_release);
} else {
chunk->slots[slot_idx].value.os_tid.store(-1, std::memory_order_release);
}
}

void ThreadFilter::unregisterThread(SlotID slot_id) {
Expand Down Expand Up @@ -255,12 +273,13 @@ ThreadFilter::SlotID ThreadFilter::popFromFreeList() {
return -1; // Empty list
}

void ThreadFilter::collect(std::vector<int>& tids) const {
tids.clear();
void ThreadFilter::collect_java_tids(std::vector<long>& java_tids) const {
assert(!filter_by_os_tid());
java_tids.clear();

// Reserve space for efficiency
// The eventual resize is not the bottleneck, so we reserve a reasonable size
tids.reserve(512);
java_tids.reserve(512);

// Scan only initialized chunks
int num_chunks = _num_chunks.load(std::memory_order_relaxed);
Expand All @@ -271,16 +290,36 @@ void ThreadFilter::collect(std::vector<int>& tids) const {
}

for (const auto& slot : chunk->slots) {
int slot_tid = slot.value.load(std::memory_order_relaxed);
if (slot_tid != -1) {
tids.push_back(slot_tid);
long java_tid = slot.value.java_tid.load(std::memory_order_relaxed);
if (java_tid != -1) {
java_tids.push_back(java_tid);
}
}
}
}

void ThreadFilter::collect_os_tids(std::vector<int>& os_tids) const {
assert(!filter_by_os_tid());
os_tids.clear();

// Reserve space for efficiency
// The eventual resize is not the bottleneck, so we reserve a reasonable size
os_tids.reserve(512);

// Optional: shrink if we over-reserved significantly
if (tids.capacity() > tids.size() * 2) {
tids.shrink_to_fit();
// Scan only initialized chunks
int num_chunks = _num_chunks.load(std::memory_order_relaxed);
for (int chunk_idx = 0; chunk_idx < num_chunks; ++chunk_idx) {
ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_acquire);
if (chunk == nullptr) {
continue; // Skip unallocated chunks
}

for (const auto& slot : chunk->slots) {
int os_tid = slot.value.os_tid.load(std::memory_order_relaxed);
if (os_tid != -1) {
os_tids.push_back(os_tid);
}
}
}
}

Expand Down
14 changes: 11 additions & 3 deletions ddprof-lib/src/main/cpp/threadFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,25 @@ class ThreadFilter {
bool enabled() const;
// Hot path methods - slot_id MUST be from registerThread(), undefined behavior otherwise
bool accept(SlotID slot_id) const;
void add(int tid, SlotID slot_id);
void add(long java_tid, int os_tid, SlotID slot_id);
void remove(SlotID slot_id);
void collect(std::vector<int>& tids) const;

void collect_java_tids(std::vector<long>& java_tids) const;
void collect_os_tids(std::vector<int>& os_tids) const;

SlotID registerThread();
void unregisterThread(SlotID slot_id);

private:
// Use os tid for filtering vs java tid
static bool filter_by_os_tid();

// Optimized slot structure with padding to avoid false sharing
struct alignas(DEFAULT_CACHE_LINE_SIZE) Slot {
std::atomic<int> value{-1};
union {
std::atomic<long> java_tid;
std::atomic<int> os_tid;
} value;
char padding[DEFAULT_CACHE_LINE_SIZE - sizeof(value)]; // Pad to cache line size
};
static_assert(sizeof(Slot) == DEFAULT_CACHE_LINE_SIZE, "Slot must be exactly one cache line");
Expand Down
Loading
Loading
0