diff --git a/ddprof-lib/src/main/cpp/flightRecorder.cpp b/ddprof-lib/src/main/cpp/flightRecorder.cpp index 74322bf9e..d1a745a8f 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.cpp +++ b/ddprof-lib/src/main/cpp/flightRecorder.cpp @@ -27,6 +27,7 @@ #include "threadState.h" #include "tsc.h" #include "vmStructs.h" +#include "wallclock/wallClock.h" #include #include #include diff --git a/ddprof-lib/src/main/cpp/javaApi.cpp b/ddprof-lib/src/main/cpp/javaApi.cpp index 355fcd512..78c57779f 100644 --- a/ddprof-lib/src/main/cpp/javaApi.cpp +++ b/ddprof-lib/src/main/cpp/javaApi.cpp @@ -29,7 +29,6 @@ #include "tsc.h" #include "vmEntry.h" #include "vmStructs.h" -#include "wallClock.h" #include #include #include @@ -133,15 +132,11 @@ Java_com_datadoghq_profiler_JavaProfiler_getSamples(JNIEnv *env, // We direct corresponding JNI calls to JavaCritical to make sure the parameters/return value // still compatible in the event of signature changes in the future. extern "C" DLLEXPORT void JNICALL -JavaCritical_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0() { +JavaCritical_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0(jlong java_tid) { ProfiledThread *current = ProfiledThread::current(); if (unlikely(current == nullptr)) { return; } - int tid = current->tid(); - if (unlikely(tid < 0)) { - return; - } ThreadFilter *thread_filter = Profiler::instance()->threadFilter(); if (unlikely(!thread_filter->enabled())) { return; @@ -158,7 +153,7 @@ JavaCritical_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0() { if (unlikely(slot_id == -1)) { return; // Failed to register thread } - thread_filter->add(tid, slot_id); + thread_filter->add(java_tid, current->tid(), slot_id); } extern "C" DLLEXPORT void JNICALL @@ -167,10 +162,6 @@ JavaCritical_com_datadoghq_profiler_JavaProfiler_filterThreadRemove0() { if (unlikely(current == nullptr)) { return; } - int tid = current->tid(); - if (unlikely(tid < 0)) { - return; - } ThreadFilter *thread_filter = Profiler::instance()->threadFilter(); if (unlikely(!thread_filter->enabled())) { return; @@ -181,14 +172,16 @@ JavaCritical_com_datadoghq_profiler_JavaProfiler_filterThreadRemove0() { // Thread doesn't have a slot ID yet - nothing to remove return; } + current->setFilterSlotId(-1); thread_filter->remove(slot_id); } extern "C" DLLEXPORT void JNICALL Java_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0(JNIEnv *env, - jclass unused) { - JavaCritical_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0(); + jclass unused, + jlong java_tid) { + JavaCritical_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0(java_tid); } extern "C" DLLEXPORT void JNICALL diff --git a/ddprof-lib/src/main/cpp/profiler.cpp b/ddprof-lib/src/main/cpp/profiler.cpp index 67e8fec8f..1292971b1 100644 --- a/ddprof-lib/src/main/cpp/profiler.cpp +++ b/ddprof-lib/src/main/cpp/profiler.cpp @@ -15,7 +15,7 @@ #include "flightRecorder.h" #include "itimer.h" #include "j9Ext.h" -#include "j9WallClock.h" +#include "wallclock/j9WallClock.h" #include "libraryPatcher.h" #include "objectSampler.h" #include "os.h" @@ -27,7 +27,8 @@ #include "thread.h" #include "tsc.h" #include "vmStructs.h" -#include "wallClock.h" +#include "wallclock/wallClockASGCT.h" +#include "wallclock/wallClockJVMTI.h" #include #include #include @@ -1187,10 +1188,10 @@ Engine *Profiler::selectCpuEngine(Arguments &args) { } } -Engine *Profiler::selectWallEngine(Arguments &args) { +BaseWallClock *Profiler::selectWallEngine(Arguments &args) { if (args._wall < 0 && (args._event == NULL || strcmp(args._event, EVENT_WALL) != 0)) { - return &noop_engine; + return nullptr; } if (VM::isOpenJ9()) { if (args._wallclock_sampler == JVMTI || !J9Ext::shouldUseAsgct() || !J9Ext::can_use_ASGCT()) { @@ -1201,18 +1202,18 @@ Engine *Profiler::selectWallEngine(Arguments &args) { } j9_engine.sampleIdleThreads(); TEST_LOG("J9[wall]=jvmti"); - return (Engine *)&j9_engine; + return (BaseWallClock *)&j9_engine; } else { TEST_LOG("J9[wall]=asgct"); - return (Engine *)&wall_asgct_engine; + return &wall_asgct_engine; } } switch (args._wallclock_sampler) { case JVMTI: - return (Engine*)&wall_jvmti_engine; + return &wall_jvmti_engine; case ASGCT: default: - return (Engine*)&wall_asgct_engine; + return &wall_asgct_engine; } } diff --git a/ddprof-lib/src/main/cpp/profiler.h b/ddprof-lib/src/main/cpp/profiler.h index 285c64805..78578e0d1 100644 --- a/ddprof-lib/src/main/cpp/profiler.h +++ b/ddprof-lib/src/main/cpp/profiler.h @@ -55,6 +55,7 @@ class FrameName; class VMNMethod; class StackContext; class VM; +class BaseWallClock; enum State { NEW, IDLE, RUNNING, TERMINATED }; @@ -124,7 +125,7 @@ class alignas(alignof(SpinLock)) Profiler { CallTraceStorage _call_trace_storage; FlightRecorder _jfr; Engine *_cpu_engine; - Engine *_wall_engine = NULL; + BaseWallClock *_wall_engine = NULL; Engine *_alloc_engine; int _event_mask; @@ -186,7 +187,7 @@ class alignas(alignof(SpinLock)) Profiler { void mangle(const char *name, char *buf, size_t size); Engine *selectCpuEngine(Arguments &args); - Engine *selectWallEngine(Arguments &args); + BaseWallClock *selectWallEngine(Arguments &args); Engine *selectAllocEngine(Arguments &args); Error checkJvmCapabilities(); @@ -240,7 +241,7 @@ class alignas(alignof(SpinLock)) Profiler { int max_stack_depth() { return _max_stack_depth; } time_t uptime() { return time(NULL) - _start_time; } Engine *cpuEngine() { return _cpu_engine; } - Engine *wallEngine() { return _wall_engine; } + BaseWallClock *wallEngine() const { return _wall_engine; } Dictionary *classMap() { return &_class_map; } Dictionary *stringLabelMap() { return &_string_label_map; } diff --git a/ddprof-lib/src/main/cpp/threadFilter.cpp b/ddprof-lib/src/main/cpp/threadFilter.cpp index 77e6dfb53..90fded2d4 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.cpp +++ b/ddprof-lib/src/main/cpp/threadFilter.cpp @@ -22,6 +22,8 @@ #include "threadFilter.h" #include "arch.h" #include "os.h" +#include "profiler.h" +#include "wallclock/wallClock.h" #include #include #include @@ -77,7 +79,7 @@ void ThreadFilter::initializeChunk(int chunk_idx) { // Allocate and initialize new chunk completely before swapping ChunkStorage* new_chunk = new ChunkStorage(); for (auto& slot : new_chunk->slots) { - slot.value.store(-1, std::memory_order_relaxed); + slot.value.java_tid.store(-1, std::memory_order_relaxed); } // Try to install it atomically @@ -145,6 +147,11 @@ void ThreadFilter::initFreeList() { } } +bool ThreadFilter::filter_by_os_tid() { + BaseWallClock* wall_clock = Profiler::instance()->wallEngine(); + return wall_clock == nullptr || wall_clock->mode() == BaseWallClock::Mode::ASGCT; +} + bool ThreadFilter::accept(SlotID slot_id) const { // Fast path: if disabled, accept everything (relaxed to avoid fences on hot path) if (unlikely(!_enabled.load(std::memory_order_relaxed))) { @@ -158,12 +165,16 @@ bool ThreadFilter::accept(SlotID slot_id) const { // This is not a fast path like the add operation. ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_acquire); if (likely(chunk != nullptr)) { - return chunk->slots[slot_idx].value.load(std::memory_order_relaxed) != -1; + if (filter_by_os_tid()) { + return chunk->slots[slot_idx].value.os_tid.load(std::memory_order_relaxed) != -1; + } else { + return chunk->slots[slot_idx].value.java_tid.load(std::memory_order_relaxed) != -1; + } } return false; } -void ThreadFilter::add(int tid, SlotID slot_id) { +void ThreadFilter::add(long java_tid, int os_tid, SlotID slot_id) { // PRECONDITION: slot_id must be from registerThread() or negative // Undefined behavior for invalid positive slot_ids (performance optimization) if (slot_id < 0) return; @@ -174,7 +185,11 @@ void ThreadFilter::add(int tid, SlotID slot_id) { // Fast path: assume valid slot_id from registerThread() ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_acquire); if (likely(chunk != nullptr)) { - chunk->slots[slot_idx].value.store(tid, std::memory_order_release); + if (filter_by_os_tid()) { + chunk->slots[slot_idx].value.os_tid.store(os_tid, std::memory_order_release); + } else { + chunk->slots[slot_idx].value.java_tid.store(java_tid, std::memory_order_release); + } } } @@ -195,8 +210,11 @@ void ThreadFilter::remove(SlotID slot_id) { if (unlikely(chunk == nullptr)) { return; } - - chunk->slots[slot_idx].value.store(-1, std::memory_order_release); + if (filter_by_os_tid()) { + chunk->slots[slot_idx].value.java_tid.store(-1, std::memory_order_release); + } else { + chunk->slots[slot_idx].value.os_tid.store(-1, std::memory_order_release); + } } void ThreadFilter::unregisterThread(SlotID slot_id) { @@ -255,12 +273,13 @@ ThreadFilter::SlotID ThreadFilter::popFromFreeList() { return -1; // Empty list } -void ThreadFilter::collect(std::vector& tids) const { - tids.clear(); +void ThreadFilter::collect_java_tids(std::vector& java_tids) const { + assert(!filter_by_os_tid()); + java_tids.clear(); // Reserve space for efficiency // The eventual resize is not the bottleneck, so we reserve a reasonable size - tids.reserve(512); + java_tids.reserve(512); // Scan only initialized chunks int num_chunks = _num_chunks.load(std::memory_order_relaxed); @@ -271,16 +290,36 @@ void ThreadFilter::collect(std::vector& tids) const { } for (const auto& slot : chunk->slots) { - int slot_tid = slot.value.load(std::memory_order_relaxed); - if (slot_tid != -1) { - tids.push_back(slot_tid); + long java_tid = slot.value.java_tid.load(std::memory_order_relaxed); + if (java_tid != -1) { + java_tids.push_back(java_tid); } } } +} + +void ThreadFilter::collect_os_tids(std::vector& os_tids) const { + assert(!filter_by_os_tid()); + os_tids.clear(); + + // Reserve space for efficiency + // The eventual resize is not the bottleneck, so we reserve a reasonable size + os_tids.reserve(512); - // Optional: shrink if we over-reserved significantly - if (tids.capacity() > tids.size() * 2) { - tids.shrink_to_fit(); + // Scan only initialized chunks + int num_chunks = _num_chunks.load(std::memory_order_relaxed); + for (int chunk_idx = 0; chunk_idx < num_chunks; ++chunk_idx) { + ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_acquire); + if (chunk == nullptr) { + continue; // Skip unallocated chunks + } + + for (const auto& slot : chunk->slots) { + int os_tid = slot.value.os_tid.load(std::memory_order_relaxed); + if (os_tid != -1) { + os_tids.push_back(os_tid); + } + } } } diff --git a/ddprof-lib/src/main/cpp/threadFilter.h b/ddprof-lib/src/main/cpp/threadFilter.h index 2440d4351..b35dc1820 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.h +++ b/ddprof-lib/src/main/cpp/threadFilter.h @@ -45,17 +45,25 @@ class ThreadFilter { bool enabled() const; // Hot path methods - slot_id MUST be from registerThread(), undefined behavior otherwise bool accept(SlotID slot_id) const; - void add(int tid, SlotID slot_id); + void add(long java_tid, int os_tid, SlotID slot_id); void remove(SlotID slot_id); - void collect(std::vector& tids) const; + + void collect_java_tids(std::vector& java_tids) const; + void collect_os_tids(std::vector& os_tids) const; SlotID registerThread(); void unregisterThread(SlotID slot_id); private: + // Use os tid for filtering vs java tid + static bool filter_by_os_tid(); + // Optimized slot structure with padding to avoid false sharing struct alignas(DEFAULT_CACHE_LINE_SIZE) Slot { - std::atomic value{-1}; + union { + std::atomic java_tid; + std::atomic os_tid; + } value; char padding[DEFAULT_CACHE_LINE_SIZE - sizeof(value)]; // Pad to cache line size }; static_assert(sizeof(Slot) == DEFAULT_CACHE_LINE_SIZE, "Slot must be exactly one cache line"); diff --git a/ddprof-lib/src/main/cpp/threadState.cpp b/ddprof-lib/src/main/cpp/threadState.cpp new file mode 100644 index 000000000..b6b89a88e --- /dev/null +++ b/ddprof-lib/src/main/cpp/threadState.cpp @@ -0,0 +1,104 @@ +/* + * Copyright 2026 Datadog, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "threadState.inline.h" +#include "vmStructs.h" + +ThreadStateResolver* ThreadStateResolver::INSTANCE = nullptr; + +void ThreadStateResolver::initialize() { + if (VM::isHotspot()) { + INSTANCE = new VMStructsThreadStateResolver(); + } else { + INSTANCE = new JNIThreadStateResolver(); + } +} + +VMStructsThreadStateResolver::VMStructsThreadStateResolver() { + assert(VM::isHotspot()); +} + +OSThreadState VMStructsThreadStateResolver::resolveThreadState(jthread thread) { + JNIEnv* jni = VM::jni(); + VMThread* vm_thread = VMThread::fromJavaThread(jni, thread); + // Thread no longer reachable + if (vm_thread == nullptr) { + return OSThreadState::TERMINATED; + } + + int raw_thread_state = vm_thread->state(); + bool is_initialized = raw_thread_state >= JVMJavaThreadState::_thread_in_native && + raw_thread_state < JVMJavaThreadState::_thread_max_state; + if (!is_initialized) { + return OSThreadState::UNKNOWN; + } + + OSThreadState state = OSThreadState::UNKNOWN; + OSThreadState os_state = vm_thread->osThreadState(); + if (os_state == OSThreadState::UNKNOWN) { + state = OSThreadState::RUNNABLE; + } else { + state = os_state; + } + return state; +} + +ExecutionMode VMStructsThreadStateResolver::resolveThreadExecutionMode(jthread thread) { + JNIEnv* jni = VM::jni(); + VMThread* vm_thread = VMThread::fromJavaThread(jni, thread); + return getThreadExecutionMode(vm_thread); +} + + +JNIThreadStateResolver::JNIThreadStateResolver() { + assert(VM::isOpenJ9() || VM::isZing()); + + JNIEnv* jni = VM::jni(); + jclass thread_class = jni->FindClass("java/lang/Thread"); + jclass thread_state_class = jni->FindClass("java/lang/Thread$State"); + assert(thread_class != nullptr); + assert(thread_state_class != nullptr); + + _thread_get_state_id = jni->GetMethodID(thread_class, "getState", "()Ljava/lang/Thread$State;"); + assert(_thread_get_state_id != nullptr); + _state_ordinal_id = jni->GetMethodID(thread_state_class, "ordinal", "()I"); + assert(_state_ordinal_id != nullptr); +} + +OSThreadState JNIThreadStateResolver::resolveThreadState(jthread thread) { + JNIEnv* jni = VM::jni(); + jobject state = jni->CallObjectMethod(thread, _thread_get_state_id); + int ordinal = jni->CallIntMethod(state, _state_ordinal_id); + return ordinal2ThreadState(ordinal); +} + +ExecutionMode JNIThreadStateResolver::resolveThreadExecutionMode(jthread thread) { + return ExecutionMode::JAVA; +} + +OSThreadState JNIThreadStateResolver::ordinal2ThreadState(int ordinal) { + switch(ordinal) { + case 0: return OSThreadState::NEW; + case 1: return OSThreadState::RUNNABLE; + case 2: return OSThreadState::MONITOR_WAIT; + case 3: + case 4: return OSThreadState::OBJECT_WAIT; + case 5: return OSThreadState::TERMINATED; + default: assert(false); + } + return OSThreadState::UNKNOWN; +} diff --git a/ddprof-lib/src/main/cpp/threadState.h b/ddprof-lib/src/main/cpp/threadState.h index 33553a267..b9688a04b 100644 --- a/ddprof-lib/src/main/cpp/threadState.h +++ b/ddprof-lib/src/main/cpp/threadState.h @@ -66,8 +66,46 @@ static ExecutionMode convertJvmExecutionState(int state) { * ExecutionMode::JVM for JVM internal threads, * or the appropriate execution mode for Java threads */ - class VMThread; +class VMThread; inline ExecutionMode getThreadExecutionMode(); +inline ExecutionMode getThreadExecutionMode(VMThread* thread); + +// The helper classes to resolve thread state. +class ThreadStateResolver { +private: + static ThreadStateResolver* INSTANCE; + +public: + // Must be called at initialization phase, in single-threaded mode, + // after resolving JVM type + static void initialize(); + + static ThreadStateResolver* getInstance() { return INSTANCE; } + + virtual OSThreadState resolveThreadState(jthread thread) = 0; + virtual ExecutionMode resolveThreadExecutionMode(jthread thread) = 0; +}; + +// Hotspot JVM uses VMStructs to resolve thread state and execution mode +class VMStructsThreadStateResolver : public ThreadStateResolver { +public: + VMStructsThreadStateResolver(); + OSThreadState resolveThreadState(jthread thread); + ExecutionMode resolveThreadExecutionMode(jthread thread); +}; + +// J9 and Zing use JNI to resolve thread state and execution mode +class JNIThreadStateResolver : public ThreadStateResolver { +private: + jmethodID _thread_get_state_id; + jmethodID _state_ordinal_id; +public: + JNIThreadStateResolver(); + OSThreadState resolveThreadState(jthread thread); + ExecutionMode resolveThreadExecutionMode(jthread thread); +private: + static OSThreadState ordinal2ThreadState(int ordinal); +}; #endif // JAVA_PROFILER_LIBRARY_THREAD_STATE_H diff --git a/ddprof-lib/src/main/cpp/threadState.inline.h b/ddprof-lib/src/main/cpp/threadState.inline.h index 0b4a560b6..06be4f692 100644 --- a/ddprof-lib/src/main/cpp/threadState.inline.h +++ b/ddprof-lib/src/main/cpp/threadState.inline.h @@ -7,14 +7,15 @@ #include "thread.h" #include -inline ExecutionMode getThreadExecutionMode() { - VMThread* vm_thread = VMThread::current(); +inline ExecutionMode getThreadExecutionMode(VMThread* vm_thread) { + // This is hotspot JVM specific implementation + assert(VM::isHotspot()); + // Not a JVM thread - native thread, e.g. thread launched by JNI code if (vm_thread == nullptr) { return ExecutionMode::NATIVE; } - ProfiledThread *prof_thread = ProfiledThread::currentSignalSafe(); bool is_java_thread = prof_thread != nullptr && prof_thread->isJavaThread(); @@ -29,10 +30,15 @@ inline ExecutionMode getThreadExecutionMode() { return is_java_thread ? convertJvmExecutionState(raw_thread_state) : ExecutionMode::JVM; } else { - // It is a JVM internal thread, may or may not be a Java thread, + // It is a JVM internal thread, may or may not be a Java thread, // e.g. Compiler thread or GC thread, etc return ExecutionMode::JVM; } } +inline ExecutionMode getThreadExecutionMode() { + VMThread* vm_thread = VMThread::current(); + return getThreadExecutionMode(vm_thread); +} + #endif // JAVA_PROFILER_LIBRARY_THREAD_STATE_INLINE_H \ No newline at end of file diff --git a/ddprof-lib/src/main/cpp/vmEntry.cpp b/ddprof-lib/src/main/cpp/vmEntry.cpp index 9a8d88fa0..f2066b558 100644 --- a/ddprof-lib/src/main/cpp/vmEntry.cpp +++ b/ddprof-lib/src/main/cpp/vmEntry.cpp @@ -15,6 +15,7 @@ #include "os.h" #include "profiler.h" #include "safeAccess.h" +#include "threadState.h" #include "vmStructs.h" #include #include @@ -375,6 +376,9 @@ bool VM::initProfilerBridge(JavaVM *vm, bool attach) { return false; } + // Initialize thread state resolver + ThreadStateResolver::initialize(); + CodeCache *lib = openJvmLibrary(); if (lib == nullptr) { return false; diff --git a/ddprof-lib/src/main/cpp/wallClock.cpp b/ddprof-lib/src/main/cpp/wallClock.cpp deleted file mode 100644 index 60f44da41..000000000 --- a/ddprof-lib/src/main/cpp/wallClock.cpp +++ /dev/null @@ -1,315 +0,0 @@ -/* - * Copyright The async-profiler authors - * Copyright 2025, Datadog, Inc. - * SPDX-License-Identifier: Apache-2.0 - */ - -#include "wallClock.h" -#include "stackFrame.h" -#include "context.h" -#include "debugSupport.h" -#include "libraries.h" -#include "log.h" -#include "profiler.h" -#include "stackFrame.h" -#include "thread.h" -#include "threadState.inline.h" -#include "vmStructs.h" -#include "guards.h" -#include -#include -#include // For std::sort and std::binary_search - -std::atomic BaseWallClock::_enabled{false}; - -bool WallClockASGCT::inSyscall(void *ucontext) { - StackFrame frame(ucontext); - uintptr_t pc = frame.pc(); - - // Consider a thread sleeping, if it has been interrupted in the middle of - // syscall execution, either when PC points to the syscall instruction, or if - // syscall has just returned with EINTR - if (StackFrame::isSyscall((instruction_t *)pc)) { - return true; - } - - // Make sure the previous instruction address is readable - uintptr_t prev_pc = pc - SYSCALL_SIZE; - if ((pc & 0xfff) >= SYSCALL_SIZE || - Libraries::instance()->findLibraryByAddress((instruction_t *)prev_pc) != - NULL) { - if (StackFrame::isSyscall((instruction_t *)prev_pc) && - frame.checkInterruptedSyscall()) { - return true; - } - } - - return false; -} - -void WallClockASGCT::sharedSignalHandler(int signo, siginfo_t *siginfo, - void *ucontext) { - WallClockASGCT *engine = reinterpret_cast(Profiler::instance()->wallEngine()); - if (signo == SIGVTALRM) { - engine->signalHandler(signo, siginfo, ucontext, engine->_interval); - } -} - -void WallClockASGCT::signalHandler(int signo, siginfo_t *siginfo, void *ucontext, - u64 last_sample) { - // Atomically try to enter critical section - prevents all reentrancy races - CriticalSection cs; - if (!cs.entered()) { - return; // Another critical section is active, defer profiling - } - ProfiledThread *current = ProfiledThread::currentSignalSafe(); - int tid = current != NULL ? current->tid() : OS::threadId(); - Shims::instance().setSighandlerTid(tid); - u64 call_trace_id = 0; - if (current != NULL && _collapsing) { - StackFrame frame(ucontext); - Context &context = Contexts::get(); - call_trace_id = current->lookupWallclockCallTraceId( - (u64)frame.pc(), (u64)frame.sp(), - Profiler::instance()->recordingEpoch(), - context.spanId, context.rootSpanId); - if (call_trace_id != 0) { - Counters::increment(SKIPPED_WALLCLOCK_UNWINDS); - } - } - - ExecutionEvent event; - VMThread *vm_thread = VMThread::current(); - if (vm_thread != NULL && !vm_thread->isThreadAccessible()) { - vm_thread = NULL; - } - int raw_thread_state = vm_thread ? vm_thread->state() : 0; - bool is_java_thread = raw_thread_state >= 4 && raw_thread_state < 12; - bool is_initialized = is_java_thread; - OSThreadState state = OSThreadState::UNKNOWN; - ExecutionMode mode = ExecutionMode::UNKNOWN; - if (vm_thread && is_initialized) { - OSThreadState os_state = vm_thread->osThreadState(); - if (os_state != OSThreadState::UNKNOWN) { - state = os_state; - } - mode = getThreadExecutionMode(); - } - if (state == OSThreadState::UNKNOWN) { - if (inSyscall(ucontext)) { - state = OSThreadState::SYSCALL; - mode = ExecutionMode::SYSCALL; - } else { - state = OSThreadState::RUNNABLE; - } - } - event._thread_state = state; - event._execution_mode = mode; - event._weight = 1; - Profiler::instance()->recordSample(ucontext, last_sample, tid, BCI_WALL, - call_trace_id, &event); - Shims::instance().setSighandlerTid(-1); -} - -Error BaseWallClock::start(Arguments &args) { - int interval = args._event != NULL ? args._interval : args._wall; - if (interval < 0) { - return Error("interval must be positive"); - } - _interval = interval ? interval : DEFAULT_WALL_INTERVAL; - - _reservoir_size = - args._wall_threads_per_tick ? - args._wall_threads_per_tick - : DEFAULT_WALL_THREADS_PER_TICK; - - initialize(args); - - _running = true; - - if (pthread_create(&_thread, NULL, threadEntry, this) != 0) { - return Error("Unable to create timer thread"); - } - - return Error::OK; -} - -void BaseWallClock::stop() { - _running.store(false); - // the thread join ensures we wait for the thread to finish before returning - // (and possibly removing the object) - pthread_kill(_thread, WAKEUP_SIGNAL); - int res = pthread_join(_thread, NULL); - if (res != 0) { - Log::warn("Unable to join WallClock thread on stop %d", res); - } -} - -bool BaseWallClock::isEnabled() const { - return _enabled.load(std::memory_order_acquire); -} - -void WallClockASGCT::initialize(Arguments& args) { - _collapsing = args._wall_collapsing; - OS::installSignalHandler(SIGVTALRM, sharedSignalHandler); -} - -/* This method is extremely racy! - * Thread references, that are returned from JVMTI::GetAllThreads(), only guarantee thread objects - * are not collected by GCs, they don't prevent threads from exiting. - * We have to be extremely careful when accessing thread's data, so it may not be valid. - */ -void WallClockJVMTI::timerLoop() { - // Check for enablement before attaching/dettaching the current thread - if (!isEnabled()) { - return; - } - - jvmtiEnv* jvmti = VM::jvmti(); - if (jvmti == nullptr) { - return; - } - - // Notice: - // We want to cache threads that are captured by collectThread(), so that we can - // clean them up in cleanThreadRefs(). - // The approach is not ideal, but it is cleaner than cleaning individual thread - // during filtering phases. - jint threads_count = 0; - jthread* threads_ptr = nullptr; - - // Attach to JVM as the first step - VM::attachThread("Datadog Profiler Wallclock Sampler"); - auto collectThreads = [&](std::vector& threads) { - jvmtiEnv* jvmti = VM::jvmti(); - if (jvmti == nullptr) { - return; - } - - if (jvmti->GetAllThreads(&threads_count, &threads_ptr) != JVMTI_ERROR_NONE || - threads_count == 0) { - return; - } - - JNIEnv* jni = VM::jni(); - - ThreadFilter* threadFilter = Profiler::instance()->threadFilter(); - bool do_filter = threadFilter->enabled(); - int self = OS::threadId(); - - // If filtering is enabled, collect the filtered TIDs first - std::vector filtered_tids; - if (do_filter) { - Profiler::instance()->threadFilter()->collect(filtered_tids); - // Sort the TIDs for efficient lookup - std::sort(filtered_tids.begin(), filtered_tids.end()); - } - - for (int i = 0; i < threads_count; i++) { - jthread thread = threads_ptr[i]; - if (thread != nullptr) { - VMThread* nThread = VMThread::fromJavaThread(jni, thread); - if (nThread == nullptr) { - continue; - } - int tid = nThread->osThreadId(); - if (tid != self && (!do_filter || - // Use binary search to efficiently find if tid is in filtered_tids - std::binary_search(filtered_tids.begin(), filtered_tids.end(), tid))) { - threads.push_back({nThread, thread, tid}); - } - } - } - }; - - auto sampleThreads = [&](ThreadEntry& thread_entry, int& num_failures, int& threads_already_exited, int& permission_denied) { - static jint max_stack_depth = (jint)Profiler::instance()->max_stack_depth(); - - ExecutionEvent event; - VMThread* vm_thread = thread_entry.native; - int raw_thread_state = vm_thread->state(); - bool is_initialized = raw_thread_state >= JVMJavaThreadState::_thread_in_native && - raw_thread_state < JVMJavaThreadState::_thread_max_state; - OSThreadState state = OSThreadState::UNKNOWN; - ExecutionMode mode = ExecutionMode::UNKNOWN; - if (vm_thread == nullptr || !is_initialized) { - return false; - } - OSThreadState os_state = vm_thread->osThreadState(); - if (os_state == OSThreadState::TERMINATED) { - return false; - } else if (os_state == OSThreadState::UNKNOWN) { - state = OSThreadState::RUNNABLE; - } else { - state = os_state; - } - mode = getThreadExecutionMode(); - - event._thread_state = state; - event._execution_mode = mode; - event._weight = 1; - - Profiler::instance()->recordJVMTISample(1, thread_entry.tid, thread_entry.java, BCI_WALL, &event, false); - return true; - }; - - auto cleanThreadRefs = [&]() { - JNIEnv* jni = VM::jni(); - for (jint index = 0; index < threads_count; index++) { - jni->DeleteLocalRef(threads_ptr[index]); - } - jvmti->Deallocate((unsigned char*)threads_ptr); - threads_ptr = nullptr; - threads_count = 0; - }; - - timerLoopCommon(collectThreads, sampleThreads, cleanThreadRefs, _reservoir_size, _interval); - - - // Don't forget to detach the thread - VM::detachThread(); -} - -void WallClockASGCT::timerLoop() { - // todo: re-allocating the vector every time is not efficient - auto collectThreads = [&](std::vector& tids) { - // Get thread IDs from the filter if it's enabled - // Otherwise list all threads in the system - if (Profiler::instance()->threadFilter()->enabled()) { - Profiler::instance()->threadFilter()->collect(tids); - } else { - ThreadList *thread_list = OS::listThreads(); - while (thread_list->hasNext()) { - int tid = thread_list->next(); - // Don't include the current thread - if (tid != OS::threadId()) { - tids.push_back(tid); - } - tid = thread_list->next(); - } - delete thread_list; - } - }; - - auto sampleThreads = [&](int tid, int& num_failures, int& threads_already_exited, int& permission_denied) { - if (!OS::sendSignalToThread(tid, SIGVTALRM)) { - num_failures++; - if (errno != 0) { - if (errno == ESRCH) { - threads_already_exited++; - } else if (errno == EPERM) { - permission_denied++; - } else { - Log::debug("unexpected error %s", strerror(errno)); - } - } - return false; - } - return true; - }; - - auto doNothing = []() { - }; - - timerLoopCommon(collectThreads, sampleThreads, doNothing, _reservoir_size, _interval); -} diff --git a/ddprof-lib/src/main/cpp/wallClock.h b/ddprof-lib/src/main/cpp/wallClock.h deleted file mode 100644 index 2530c8b2d..000000000 --- a/ddprof-lib/src/main/cpp/wallClock.h +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Copyright The async-profiler authors - * Copyright 2025, Datadog, Inc. - * SPDX-License-Identifier: Apache-2.0 - */ - -#ifndef _WALLCLOCK_H -#define _WALLCLOCK_H - -#include "engine.h" -#include "os.h" -#include "profiler.h" -#include "reservoirSampler.h" -#include "thread.h" -#include "threadFilter.h" -#include "threadState.h" -#include "tsc.h" -#include "vmStructs.h" - -class BaseWallClock : public Engine { - private: - static std::atomic _enabled; - std::atomic _running; - protected: - long _interval; - // Maximum number of threads sampled in one iteration. This limit serves as a - // throttle when generating profiling signals. Otherwise applications with too - // many threads may suffer from a big profiling overhead. Also, keeping this - // limit low enough helps to avoid contention on a spin lock inside - // Profiler::recordSample(). - int _reservoir_size; - - pthread_t _thread; - virtual void timerLoop() = 0; - virtual void initialize(Arguments& args) {}; - - static void *threadEntry(void *wall_clock) { - ((BaseWallClock *)wall_clock)->timerLoop(); - return NULL; - } - - bool isEnabled() const; - - template - void timerLoopCommon(CollectThreadsFunc collectThreads, SampleThreadsFunc sampleThreads, CleanThreadFunc cleanThreads, int reservoirSize, u64 interval) { - if (!_enabled.load(std::memory_order_acquire)) { - return; - } - - // Dither the sampling interval to introduce some randomness and prevent step-locking - const double stddev = ((double)_interval) / 10.0; // 10% standard deviation - // Set up random engine and normal distribution - std::random_device rd; - std::mt19937 generator(rd()); - std::normal_distribution distribution(interval, stddev); - - std::vector threads; - threads.reserve(reservoirSize); - int self = OS::threadId(); - ThreadFilter* thread_filter = Profiler::instance()->threadFilter(); - - // We don't want to profile ourselves in wall time - ProfiledThread* current = ProfiledThread::current(); - if (current != nullptr) { - int slot_id = current->filterSlotId(); - if (slot_id != -1) { - thread_filter->remove(slot_id); - } - } - - u64 startTime = TSC::ticks(); - WallClockEpochEvent epoch(startTime); - - ReservoirSampler reservoir(reservoirSize); - - while (_running.load(std::memory_order_relaxed)) { - collectThreads(threads); - - int num_failures = 0; - int threads_already_exited = 0; - int permission_denied = 0; - std::vector sample = reservoir.sample(threads); - for (ThreadType thread : sample) { - if (!sampleThreads(thread, num_failures, threads_already_exited, permission_denied)) { - continue; - } - } - - epoch.updateNumSamplableThreads(threads.size()); - epoch.updateNumFailedSamples(num_failures); - epoch.updateNumSuccessfulSamples(sample.size() - num_failures); - epoch.updateNumExitedThreads(threads_already_exited); - epoch.updateNumPermissionDenied(permission_denied); - u64 endTime = TSC::ticks(); - u64 duration = TSC::ticks_to_millis(endTime - startTime); - if (epoch.hasChanged() || duration >= 1000) { - epoch.endEpoch(duration); - Profiler::instance()->recordWallClockEpoch(self, &epoch); - epoch.newEpoch(endTime); - startTime = endTime; - } else { - epoch.clean(); - } - - threads.clear(); - cleanThreads(); - - // Get a random sleep duration - // clamp the random interval to <1,2N-1> - // the probability of clamping is extremely small, close to zero - OS::sleep(std::min(std::max((long int)1, static_cast(distribution(generator))), ((_interval * 2) - 1))); - } - } - -public: - BaseWallClock() : - _interval(LONG_MAX), - _reservoir_size(0), - _running(false), - _thread(0) {} - virtual ~BaseWallClock() = default; - - const char* units() { - return "ns"; - } - - virtual const char* name() = 0; - - long interval() const { return _interval; } - - inline void enableEvents(bool enabled) { - _enabled.store(enabled, std::memory_order_release); - } - - Error start(Arguments& args); - void stop(); -}; - -class WallClockASGCT : public BaseWallClock { - private: - bool _collapsing; - - static bool inSyscall(void* ucontext); - - static void sharedSignalHandler(int signo, siginfo_t* siginfo, void* ucontext); - void signalHandler(int signo, siginfo_t* siginfo, void* ucontext, u64 last_sample); - - void initialize(Arguments& args) override; - void timerLoop() override; - - public: - WallClockASGCT() : BaseWallClock(), _collapsing(false) {} - const char* name() override { - return "WallClock (ASGCT)"; - } -}; - -class WallClockJVMTI : public BaseWallClock { - private: - void timerLoop() override; - public: - struct ThreadEntry { - VMThread* native; - jthread java; - int tid; - }; - WallClockJVMTI() : BaseWallClock() {} - const char* name() override { - return "WallClock (JVMTI)"; - } -}; - -#endif // _WALLCLOCK_H diff --git a/ddprof-lib/src/main/cpp/wallClock.inline.h b/ddprof-lib/src/main/cpp/wallClock.inline.h new file mode 100644 index 000000000..35da809fe --- /dev/null +++ b/ddprof-lib/src/main/cpp/wallClock.inline.h @@ -0,0 +1,84 @@ +/* + * Copyright The async-profiler authors + * Copyright 2026, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef _WALLCLOCK_INLINE_H +#define _WALLCLOCK_INLINE_H + +#include "wallclock.h" + +template +void BaseWallClock::timerLoopCommon(CollectThreadsFunc collectThreads, SampleThreadsFunc sampleThreads, CleanThreadFunc cleanThreads, int reservoirSize, u64 interval) { + if (!_enabled.load(std::memory_order_acquire)) { + return; + } + + // Dither the sampling interval to introduce some randomness and prevent step-locking + const double stddev = ((double)_interval) / 10.0; // 10% standard deviation + // Set up random engine and normal distribution + std::random_device rd; + std::mt19937 generator(rd()); + std::normal_distribution distribution(interval, stddev); + + std::vector threads; + threads.reserve(reservoirSize); + int self = OS::threadId(); + ThreadFilter* thread_filter = Profiler::instance()->threadFilter(); + + // We don't want to profile ourselves in wall time + ProfiledThread* current = ProfiledThread::current(); + if (current != nullptr) { + int slot_id = current->filterSlotId(); + if (slot_id != -1) { + thread_filter->remove(slot_id); + } + } + + u64 startTime = TSC::ticks(); + WallClockEpochEvent epoch(startTime); + + ReservoirSampler reservoir(reservoirSize); + + while (_running.load(std::memory_order_relaxed)) { + collectThreads(threads); + + int num_failures = 0; + int threads_already_exited = 0; + int permission_denied = 0; + std::vector sample = reservoir.sample(threads); + for (ThreadType thread : sample) { + if (!sampleThreads(thread, num_failures, threads_already_exited, permission_denied)) { + continue; + } + } + + epoch.updateNumSamplableThreads(threads.size()); + epoch.updateNumFailedSamples(num_failures); + epoch.updateNumSuccessfulSamples(sample.size() - num_failures); + epoch.updateNumExitedThreads(threads_already_exited); + epoch.updateNumPermissionDenied(permission_denied); + u64 endTime = TSC::ticks(); + u64 duration = TSC::ticks_to_millis(endTime - startTime); + if (epoch.hasChanged() || duration >= 1000) { + epoch.endEpoch(duration); + Profiler::instance()->recordWallClockEpoch(self, &epoch); + epoch.newEpoch(endTime); + startTime = endTime; + } else { + epoch.clean(); + } + + threads.clear(); + cleanThreads(); + + // Get a random sleep duration + // clamp the random interval to <1,2N-1> + // the probability of clamping is extremely small, close to zero + OS::sleep(std::min(std::max((long int)1, static_cast(distribution(generator))), ((_interval * 2) - 1))); + } +} + + +if // _WALLCLOCK_INLINE_H diff --git a/ddprof-lib/src/main/cpp/j9WallClock.cpp b/ddprof-lib/src/main/cpp/wallclock/j9WallClock.cpp similarity index 97% rename from ddprof-lib/src/main/cpp/j9WallClock.cpp rename to ddprof-lib/src/main/cpp/wallclock/j9WallClock.cpp index 994cfa3f7..b693b0e75 100644 --- a/ddprof-lib/src/main/cpp/j9WallClock.cpp +++ b/ddprof-lib/src/main/cpp/wallclock/j9WallClock.cpp @@ -20,12 +20,8 @@ #include "threadState.h" #include -volatile bool J9WallClock::_enabled = false; - -long J9WallClock::_interval; - Error J9WallClock::start(Arguments &args) { - if (_running) { + if (_running.load()) { // only one instance should be running return Error::OK; } diff --git a/ddprof-lib/src/main/cpp/j9WallClock.h b/ddprof-lib/src/main/cpp/wallclock/j9WallClock.h similarity index 69% rename from ddprof-lib/src/main/cpp/j9WallClock.h rename to ddprof-lib/src/main/cpp/wallclock/j9WallClock.h index d900159aa..f980cded9 100644 --- a/ddprof-lib/src/main/cpp/j9WallClock.h +++ b/ddprof-lib/src/main/cpp/wallclock/j9WallClock.h @@ -17,41 +17,30 @@ #ifndef _J9WALLCLOCK_H #define _J9WALLCLOCK_H -#include "engine.h" -#include +#include "wallclock/wallClock.h" -class J9WallClock : public Engine { +class J9WallClock : public BaseWallClock { private: - static volatile bool _enabled; - static long _interval; - bool _sample_idle_threads; int _max_stack_depth; - volatile bool _running; - pthread_t _thread; static void *threadEntry(void *wall_clock) { ((J9WallClock *)wall_clock)->timerLoop(); return NULL; } - void timerLoop(); + void timerLoop() override; public: - const char *units() { return "ns"; } - - const char *name() { + const char *name() const override { return _sample_idle_threads ? "J9WallClock" : "J9Execution"; } - - virtual long interval() const { return _interval; } + Mode mode() const override { return Mode::ASGCT; } inline void sampleIdleThreads() { _sample_idle_threads = true; } - Error start(Arguments &args); - void stop(); - - inline void enableEvents(bool enabled) { _enabled = enabled; } + Error start(Arguments &args) override; + void stop() override; }; #endif // _J9WALLCLOCK_H diff --git a/ddprof-lib/src/main/cpp/wallclock/wallClock.cpp b/ddprof-lib/src/main/cpp/wallclock/wallClock.cpp new file mode 100644 index 000000000..055430a91 --- /dev/null +++ b/ddprof-lib/src/main/cpp/wallclock/wallClock.cpp @@ -0,0 +1,62 @@ +/* + * Copyright The async-profiler authors + * Copyright 2025, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "wallClock.h" +#include "stackFrame.h" +#include "context.h" +#include "debugSupport.h" +#include "libraries.h" +#include "log.h" +#include "profiler.h" +#include "stackFrame.h" +#include "thread.h" +#include "threadState.inline.h" +#include "vmStructs.h" +#include "guards.h" +#include +#include +#include // For std::sort and std::binary_search + +std::atomic BaseWallClock::_enabled{false}; + +Error BaseWallClock::start(Arguments &args) { + int interval = args._event != NULL ? args._interval : args._wall; + if (interval < 0) { + return Error("interval must be positive"); + } + _interval = interval ? interval : DEFAULT_WALL_INTERVAL; + + _reservoir_size = + args._wall_threads_per_tick ? + args._wall_threads_per_tick + : DEFAULT_WALL_THREADS_PER_TICK; + + initialize(args); + + _running = true; + + if (pthread_create(&_thread, NULL, threadEntry, this) != 0) { + return Error("Unable to create timer thread"); + } + + return Error::OK; +} + +void BaseWallClock::stop() { + _running.store(false); + // the thread join ensures we wait for the thread to finish before returning + // (and possibly removing the object) + pthread_kill(_thread, WAKEUP_SIGNAL); + int res = pthread_join(_thread, NULL); + if (res != 0) { + Log::warn("Unable to join WallClock thread on stop %d", res); + } +} + +bool BaseWallClock::isEnabled() const { + return _enabled.load(std::memory_order_acquire); +} + diff --git a/ddprof-lib/src/main/cpp/wallclock/wallClock.h b/ddprof-lib/src/main/cpp/wallclock/wallClock.h new file mode 100644 index 000000000..e97b72f76 --- /dev/null +++ b/ddprof-lib/src/main/cpp/wallclock/wallClock.h @@ -0,0 +1,78 @@ +/* + * Copyright The async-profiler authors + * Copyright 2025, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef _WALLCLOCK_H +#define _WALLCLOCK_H + +#include "engine.h" +#include "os.h" +#include "profiler.h" +#include "reservoirSampler.h" +#include "thread.h" +#include "threadFilter.h" +#include "threadState.h" +#include "tsc.h" +#include "vmStructs.h" + +class BaseWallClock : public Engine { + public: + enum class Mode : int { + JVMTI, + ASGCT + }; + + protected: + + static std::atomic _enabled; + std::atomic _running; + long _interval; + // Maximum number of threads sampled in one iteration. This limit serves as a + // throttle when generating profiling signals. Otherwise applications with too + // many threads may suffer from a big profiling overhead. Also, keeping this + // limit low enough helps to avoid contention on a spin lock inside + // Profiler::recordSample(). + int _reservoir_size; + + pthread_t _thread; + virtual void timerLoop() = 0; + virtual void initialize(Arguments& args) {}; + + static void *threadEntry(void *wall_clock) { + ((BaseWallClock *)wall_clock)->timerLoop(); + return NULL; + } + + bool isEnabled() const; + + template + void timerLoopCommon(CollectThreadsFunc collectThreads, SampleThreadsFunc sampleThreads, CleanThreadFunc cleanThreads, int reservoirSize, u64 interval); +public: + BaseWallClock() : + _interval(LONG_MAX), + _reservoir_size(0), + _running(false), + _thread(0) {} + virtual ~BaseWallClock() = default; + + const char* units() { + return "ns"; + } + + virtual const char* name() const = 0; + virtual Mode mode() const = 0; + + long interval() const { return _interval; } + + inline void enableEvents(bool enabled) { + _enabled.store(enabled, std::memory_order_release); + } + + Error start(Arguments& args); + void stop(); +}; + + +#endif // _WALLCLOCK_H diff --git a/ddprof-lib/src/main/cpp/wallclock/wallClock.inline.h b/ddprof-lib/src/main/cpp/wallclock/wallClock.inline.h new file mode 100644 index 000000000..18c1a5389 --- /dev/null +++ b/ddprof-lib/src/main/cpp/wallclock/wallClock.inline.h @@ -0,0 +1,84 @@ +/* + * Copyright The async-profiler authors + * Copyright 2026, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef _WALLCLOCK_INLINE_H +#define _WALLCLOCK_INLINE_H + +#include "wallclock/wallClock.h" + +template +void BaseWallClock::timerLoopCommon(CollectThreadsFunc collectThreads, SampleThreadsFunc sampleThreads, CleanThreadFunc cleanThreads, int reservoirSize, u64 interval) { + if (!_enabled.load(std::memory_order_acquire)) { + return; + } + + // Dither the sampling interval to introduce some randomness and prevent step-locking + const double stddev = ((double)_interval) / 10.0; // 10% standard deviation + // Set up random engine and normal distribution + std::random_device rd; + std::mt19937 generator(rd()); + std::normal_distribution distribution(interval, stddev); + + std::vector threads; + threads.reserve(reservoirSize); + int self = OS::threadId(); + ThreadFilter* thread_filter = Profiler::instance()->threadFilter(); + + // We don't want to profile ourselves in wall time + ProfiledThread* current = ProfiledThread::current(); + if (current != nullptr) { + int slot_id = current->filterSlotId(); + if (slot_id != -1) { + thread_filter->remove(slot_id); + } + } + + u64 startTime = TSC::ticks(); + WallClockEpochEvent epoch(startTime); + + ReservoirSampler reservoir(reservoirSize); + + while (_running.load(std::memory_order_relaxed)) { + collectThreads(threads); + + int num_failures = 0; + int threads_already_exited = 0; + int permission_denied = 0; + std::vector sample = reservoir.sample(threads); + for (ThreadType thread : sample) { + if (!sampleThreads(thread, num_failures, threads_already_exited, permission_denied)) { + continue; + } + } + + epoch.updateNumSamplableThreads(threads.size()); + epoch.updateNumFailedSamples(num_failures); + epoch.updateNumSuccessfulSamples(sample.size() - num_failures); + epoch.updateNumExitedThreads(threads_already_exited); + epoch.updateNumPermissionDenied(permission_denied); + u64 endTime = TSC::ticks(); + u64 duration = TSC::ticks_to_millis(endTime - startTime); + if (epoch.hasChanged() || duration >= 1000) { + epoch.endEpoch(duration); + Profiler::instance()->recordWallClockEpoch(self, &epoch); + epoch.newEpoch(endTime); + startTime = endTime; + } else { + epoch.clean(); + } + + threads.clear(); + cleanThreads(); + + // Get a random sleep duration + // clamp the random interval to <1,2N-1> + // the probability of clamping is extremely small, close to zero + OS::sleep(std::min(std::max((long int)1, static_cast(distribution(generator))), ((_interval * 2) - 1))); + } +} + + +#endif // _WALLCLOCK_INLINE_H diff --git a/ddprof-lib/src/main/cpp/wallclock/wallClockASGCT.cpp b/ddprof-lib/src/main/cpp/wallclock/wallClockASGCT.cpp new file mode 100644 index 000000000..ee0a99286 --- /dev/null +++ b/ddprof-lib/src/main/cpp/wallclock/wallClockASGCT.cpp @@ -0,0 +1,151 @@ +/* + * Copyright The async-profiler authors + * Copyright 2026, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "debugSupport.h" +#include "guards.h" +#include "stackFrame.h" +#include "threadState.inline.h" +#include "wallclock/wallClock.inline.h" +#include "wallclock/wallClockASGCT.h" + +bool WallClockASGCT::inSyscall(void *ucontext) { + StackFrame frame(ucontext); + uintptr_t pc = frame.pc(); + + // Consider a thread sleeping, if it has been interrupted in the middle of + // syscall execution, either when PC points to the syscall instruction, or if + // syscall has just returned with EINTR + if (StackFrame::isSyscall((instruction_t *)pc)) { + return true; + } + + // Make sure the previous instruction address is readable + uintptr_t prev_pc = pc - SYSCALL_SIZE; + if ((pc & 0xfff) >= SYSCALL_SIZE || + Libraries::instance()->findLibraryByAddress((instruction_t *)prev_pc) != + NULL) { + if (StackFrame::isSyscall((instruction_t *)prev_pc) && + frame.checkInterruptedSyscall()) { + return true; + } + } + + return false; +} + +void WallClockASGCT::sharedSignalHandler(int signo, siginfo_t *siginfo, + void *ucontext) { + WallClockASGCT *engine = reinterpret_cast(Profiler::instance()->wallEngine()); + if (signo == SIGVTALRM) { + engine->signalHandler(signo, siginfo, ucontext, engine->_interval); + } +} + +void WallClockASGCT::signalHandler(int signo, siginfo_t *siginfo, void *ucontext, + u64 last_sample) { + // Atomically try to enter critical section - prevents all reentrancy races + CriticalSection cs; + if (!cs.entered()) { + return; // Another critical section is active, defer profiling + } + ProfiledThread *current = ProfiledThread::currentSignalSafe(); + int tid = current != NULL ? current->tid() : OS::threadId(); + Shims::instance().setSighandlerTid(tid); + u64 call_trace_id = 0; + if (current != NULL && _collapsing) { + StackFrame frame(ucontext); + Context &context = Contexts::get(); + call_trace_id = current->lookupWallclockCallTraceId( + (u64)frame.pc(), (u64)frame.sp(), + Profiler::instance()->recordingEpoch(), + context.spanId, context.rootSpanId); + if (call_trace_id != 0) { + Counters::increment(SKIPPED_WALLCLOCK_UNWINDS); + } + } + + ExecutionEvent event; + VMThread *vm_thread = VMThread::current(); + if (vm_thread != NULL && !vm_thread->isThreadAccessible()) { + vm_thread = NULL; + } + int raw_thread_state = vm_thread ? vm_thread->state() : 0; + bool is_java_thread = raw_thread_state >= 4 && raw_thread_state < 12; + bool is_initialized = is_java_thread; + OSThreadState state = OSThreadState::UNKNOWN; + ExecutionMode mode = ExecutionMode::UNKNOWN; + if (vm_thread && is_initialized) { + OSThreadState os_state = vm_thread->osThreadState(); + if (os_state != OSThreadState::UNKNOWN) { + state = os_state; + } + mode = getThreadExecutionMode(); + } + if (state == OSThreadState::UNKNOWN) { + if (inSyscall(ucontext)) { + state = OSThreadState::SYSCALL; + mode = ExecutionMode::SYSCALL; + } else { + state = OSThreadState::RUNNABLE; + } + } + event._thread_state = state; + event._execution_mode = mode; + event._weight = 1; + Profiler::instance()->recordSample(ucontext, last_sample, tid, BCI_WALL, + call_trace_id, &event); + Shims::instance().setSighandlerTid(-1); +} + + +void WallClockASGCT::timerLoop() { + // todo: re-allocating the vector every time is not efficient + auto collectThreads = [&](std::vector& tids) { + // Get thread IDs from the filter if it's enabled + // Otherwise list all threads in the system + if (Profiler::instance()->threadFilter()->enabled()) { + Profiler::instance()->threadFilter()->collect_os_tids(tids); + } else { + ThreadList *thread_list = OS::listThreads(); + while (thread_list->hasNext()) { + int tid = thread_list->next(); + // Don't include the current thread + if (tid != OS::threadId()) { + tids.push_back(tid); + } + tid = thread_list->next(); + } + delete thread_list; + } + }; + + auto sampleThreads = [&](int tid, int& num_failures, int& threads_already_exited, int& permission_denied) { + if (!OS::sendSignalToThread(tid, SIGVTALRM)) { + num_failures++; + if (errno != 0) { + if (errno == ESRCH) { + threads_already_exited++; + } else if (errno == EPERM) { + permission_denied++; + } else { + Log::debug("unexpected error %s", strerror(errno)); + } + } + return false; + } + return true; + }; + + auto doNothing = []() { + }; + + timerLoopCommon(collectThreads, sampleThreads, doNothing, _reservoir_size, _interval); +} + +void WallClockASGCT::initialize(Arguments& args) { + _collapsing = args._wall_collapsing; + OS::installSignalHandler(SIGVTALRM, sharedSignalHandler); +} diff --git a/ddprof-lib/src/main/cpp/wallclock/wallClockASGCT.h b/ddprof-lib/src/main/cpp/wallclock/wallClockASGCT.h new file mode 100644 index 000000000..c49d5924f --- /dev/null +++ b/ddprof-lib/src/main/cpp/wallclock/wallClockASGCT.h @@ -0,0 +1,31 @@ +/* + * Copyright The async-profiler authors + * Copyright 2026, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ +#ifndef _WALLCLOCKASGCT_H +#define _WALLCLOCKASGCT_H + +#include "wallclock/wallClock.h" + +class WallClockASGCT : public BaseWallClock { + private: + bool _collapsing; + + static bool inSyscall(void* ucontext); + + static void sharedSignalHandler(int signo, siginfo_t* siginfo, void* ucontext); + void signalHandler(int signo, siginfo_t* siginfo, void* ucontext, u64 last_sample); + + void initialize(Arguments& args) override; + void timerLoop() override; + + public: + WallClockASGCT() : BaseWallClock(), _collapsing(false) {} + const char* name() const override { + return "WallClock (ASGCT)"; + } + Mode mode() const override { return Mode::ASGCT; } +}; + +#endif // _WALLCLOCKASGCT_H diff --git a/ddprof-lib/src/main/cpp/wallclock/wallClockJVMTI.cpp b/ddprof-lib/src/main/cpp/wallclock/wallClockJVMTI.cpp new file mode 100644 index 000000000..43eaf6783 --- /dev/null +++ b/ddprof-lib/src/main/cpp/wallclock/wallClockJVMTI.cpp @@ -0,0 +1,125 @@ +/* + * Copyright The async-profiler authors + * Copyright 2026, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "wallclock/wallClock.inline.h" +#include "wallclock/wallClockJVMTI.h" + +/* This method is extremely racy! + * Thread references, that are returned from JVMTI::GetAllThreads(), only guarantee thread objects + * are not collected by GCs, they don't prevent threads from exiting. + * We have to be extremely careful when accessing thread's data, so it may not be valid. + */ +void WallClockJVMTI::timerLoop() { + +TEST_LOG("VMTI::timerLoop()"); + + // Check for enablement before attaching/dettaching the current thread + if (!isEnabled()) { + return; + } + + jvmtiEnv* jvmti = VM::jvmti(); + if (jvmti == nullptr) { + return; + } + + // Notice: + // We want to cache threads that are captured by collectThread(), so that we can + // clean them up in cleanThreadRefs(). + // The approach is not ideal, but it is cleaner than cleaning individual thread + // during filtering phases. + jint threads_count = 0; + jthread* threads_ptr = nullptr; + + // Attach to JVM as the first step + VM::attachThread("Datadog Profiler Wallclock Sampler"); + + jthread self; + jvmtiError error = jvmti->GetCurrentThread(&self); + assert(error == JVMTI_ERROR_NONE); + + JNIEnv* jni = VM::jni(); + jclass thread_class = jni->FindClass("java/lang/Thread"); + assert(thread_class != nullptr); + jmethodID thread_get_id = jni->GetMethodID(thread_class, "threadId", "()J"); + // java.lang.thread.threadId() only exists in 19+ + if (thread_get_id == nullptr) { + thread_get_id = jni->GetMethodID(thread_class, "getId", "()J"); + } + assert(thread_get_id != nullptr); + long current_thread_id = jni->CallLongMethod(self, thread_get_id); + + auto collectThreads = [&](std::vector& threads) { + jvmtiEnv* jvmti = VM::jvmti(); + if (jvmti == nullptr) { + return; + } + + if (jvmti->GetAllThreads(&threads_count, &threads_ptr) != JVMTI_ERROR_NONE || + threads_count == 0) { + return; + } + + ThreadFilter* threadFilter = Profiler::instance()->threadFilter(); + bool do_filter = threadFilter->enabled(); + + // If filtering is enabled, collect the filtered TIDs first + std::vector java_tids; + + if (do_filter) { + Profiler::instance()->threadFilter()->collect_java_tids(java_tids); + // Sort the TIDs for efficient lookup + std::sort(java_tids.begin(), java_tids.end()); + } + + for (int i = 0; i < threads_count; i++) { + jthread thread = threads_ptr[i]; + long tid = jni->CallLongMethod(thread, thread_get_id); + if (tid == current_thread_id) continue; + + if (!do_filter || + // Use binary search to efficiently find if tid is in filtered_tids + std::binary_search(java_tids.begin(), java_tids.end(), tid)) { + threads.push_back({thread, tid}); + } + } + }; + + ThreadStateResolver* const resolver = ThreadStateResolver::getInstance(); + + auto sampleThreads = [&](ThreadEntry& thread_entry, int& num_failures, int& threads_already_exited, int& permission_denied) { + static jint max_stack_depth = (jint)Profiler::instance()->max_stack_depth(); + + ExecutionEvent event; + jthread thread = thread_entry.java; + + event._thread_state = resolver->resolveThreadState(thread); + if (event._thread_state == OSThreadState::TERMINATED) { + return false; + } + event._execution_mode = resolver->resolveThreadExecutionMode(thread); + event._weight = 1; + + Profiler::instance()->recordJVMTISample(1, thread_entry.tid, thread_entry.java, BCI_WALL, &event, false); + return true; + }; + + auto cleanThreadRefs = [&]() { + JNIEnv* jni = VM::jni(); + for (jint index = 0; index < threads_count; index++) { + jni->DeleteLocalRef(threads_ptr[index]); + } + jvmti->Deallocate((unsigned char*)threads_ptr); + threads_ptr = nullptr; + threads_count = 0; + }; + + timerLoopCommon(collectThreads, sampleThreads, cleanThreadRefs, _reservoir_size, _interval); + + + // Don't forget to detach the thread + VM::detachThread(); +} diff --git a/ddprof-lib/src/main/cpp/wallclock/wallClockJVMTI.h b/ddprof-lib/src/main/cpp/wallclock/wallClockJVMTI.h new file mode 100644 index 000000000..801a97266 --- /dev/null +++ b/ddprof-lib/src/main/cpp/wallclock/wallClockJVMTI.h @@ -0,0 +1,29 @@ +/* + * Copyright The async-profiler authors + * Copyright 2026, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef _WALLCLOCKJVMTI_H +#define _WALLCLOCKJVMTI_H + +#include "wallclock/wallClock.h" + +class WallClockJVMTI : public BaseWallClock { +protected: + void timerLoop() override; + + public: + struct ThreadEntry { + jthread java; + long tid; + }; + WallClockJVMTI() : BaseWallClock() {} + const char* name() const override { + return "WallClock (JVMTI)"; + } + + Mode mode() const override { return Mode::JVMTI; } +}; + +#endif // _WALLCLOCKJVMTI_H diff --git a/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java b/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java index c49b4479e..426711fa7 100644 --- a/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java +++ b/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java @@ -181,7 +181,7 @@ public boolean recordTraceRoot(long rootSpanId, String endpoint, int sizeLimit) * 'filter' option must be enabled to use this method. */ public void addThread() { - filterThreadAdd0(); + filterThreadAdd0(Thread.currentThread().getId()); } /** @@ -318,7 +318,7 @@ private static ThreadContext initializeThreadContext() { private native void stop0() throws IllegalStateException; private native String execute0(String command) throws IllegalArgumentException, IllegalStateException, IOException; - private static native void filterThreadAdd0(); + private static native void filterThreadAdd0(long java_tid); private static native void filterThreadRemove0(); private static native int getTid0();