Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
thread filter - refactor recording implementation
remove the usage of the thread filter in the recording
  • Loading branch information
r1viollet committed Apr 28, 2025
commit 1d4efc094d1a429ec96f9f6c02e5d4dbf5fb89f2
38 changes: 25 additions & 13 deletions ddprof-lib/src/main/cpp/flightRecorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ char *Recording::_jvm_flags = NULL;
char *Recording::_java_command = NULL;

Recording::Recording(int fd, Arguments &args)
: _fd(fd), _thread_set(), _method_map() {
: _fd(fd), _method_map() {

args.save(_args);
_chunk_start = lseek(_fd, 0, SEEK_END);
Expand All @@ -329,6 +329,8 @@ Recording::Recording(int fd, Arguments &args)
_bytes_written = 0;

_tid = OS::threadId();
_active_index.store(0, std::memory_order_relaxed);

VM::jvmti()->GetAvailableProcessors(&_available_processors);

writeHeader(_buf);
Expand Down Expand Up @@ -1053,10 +1055,18 @@ void Recording::writeExecutionModes(Buffer *buf) {
}

void Recording::writeThreads(Buffer *buf) {
addThread(_tid);
Copy link
Copy Markdown
Collaborator

@jbachorik jbachorik Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to remove this call? We should make sure that the recording thread is also included as events can be associated with it - recording info, settings, config - all point to _tid thread.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I do (with the threads.insert(_tid);)
I had to move it down a few lines.

std::vector<int> threads;
_thread_set.collect(threads);
_thread_set.clear();
int old_index = _active_index.fetch_xor(1, std::memory_order_acq_rel);
// After flip: new samples go into the new active set
// We flush from old_index (the previous active set)

std::unordered_set<int> threads;
threads.insert(_tid);

for (int i = 0; i < CONCURRENCY_LEVEL; ++i) {
// I can not use merge : cpp 17
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated - but we might try to bump the accepted C++ level to 17. It should be possible ...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

threads.insert(_thread_ids[i][old_index].begin(), _thread_ids[i][old_index].end());
_thread_ids[i][old_index].clear();
}

Profiler *profiler = Profiler::instance();
ThreadInfo *t_info = &profiler->_thread_info;
Expand All @@ -1065,15 +1075,15 @@ void Recording::writeThreads(Buffer *buf) {

buf->putVar64(T_THREAD);
buf->putVar64(threads.size());
for (int i = 0; i < threads.size(); i++) {
for (auto tid : threads) {
const char *thread_name;
jlong thread_id;
std::pair<std::shared_ptr<std::string>, u64> info = t_info->get(threads[i]);
std::pair<std::shared_ptr<std::string>, u64> info = t_info->get(tid);
if (info.first) {
thread_name = info.first->c_str();
thread_id = info.second;
} else {
snprintf(name_buf, sizeof(name_buf), "[tid=%d]", threads[i]);
snprintf(name_buf, sizeof(name_buf), "[tid=%d]", tid);
thread_name = name_buf;
thread_id = 0;
}
Expand All @@ -1083,9 +1093,9 @@ void Recording::writeThreads(Buffer *buf) {
(thread_id == 0 ? length + 1 : 2 * length) -
3 * 10; // 3x max varint length
flushIfNeeded(buf, required);
buf->putVar64(threads[i]);
buf->putVar64(tid);
buf->putUtf8(thread_name, length);
buf->putVar64(threads[i]);
buf->putVar64(tid);
if (thread_id == 0) {
buf->put8(0);
} else {
Expand Down Expand Up @@ -1419,8 +1429,10 @@ void Recording::recordCpuLoad(Buffer *buf, float proc_user, float proc_system,
flushIfNeeded(buf);
}

void Recording::addThread(int tid) {
_thread_set.add(tid, 0); // todo: add slot_id management
// assumption is that we hold the lock (with lock_index)
void Recording::addThread(int lock_index, int tid) {
int active = _active_index.load(std::memory_order_acquire);
_thread_ids[lock_index][active].insert(tid);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder - does the ordering matter here?
Eg. if we have [active][lock_index] can it make things better/worse in terms of data locality?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not acceptable: no allocations can take place here.

}

Error FlightRecorder::start(Arguments &args, bool reset) {
Expand Down Expand Up @@ -1578,7 +1590,7 @@ void FlightRecorder::recordEvent(int lock_index, int tid, u32 call_trace_id,
break;
}
_rec->flushIfNeeded(buf);
_rec->addThread(tid);
_rec->addThread(lock_index, tid);
}
}

Expand Down
10 changes: 7 additions & 3 deletions ddprof-lib/src/main/cpp/flightRecorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#define _FLIGHTRECORDER_H

#include <map>
#include <unordered_set>

#include <limits.h>
#include <string.h>
Expand Down Expand Up @@ -127,9 +128,11 @@ class Recording {
static char *_java_command;

RecordingBuffer _buf[CONCURRENCY_LEVEL];
std::unordered_set<int> _thread_ids[CONCURRENCY_LEVEL][2];
std::atomic<int> _active_index{0}; // 0 or 1 globally
Copy link
Copy Markdown
Collaborator

@jbachorik jbachorik Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder - can this be factored out to a separate type?


int _fd;
off_t _chunk_start;
ThreadFilter _thread_set;
MethodMap _method_map;

Arguments _args;
Expand Down Expand Up @@ -158,7 +161,7 @@ class Recording {
public:
Recording(int fd, Arguments &args);
~Recording();

void copyTo(int target_fd);
off_t finishChunk();

Expand Down Expand Up @@ -256,7 +259,8 @@ class Recording {
LockEvent *event);
void recordCpuLoad(Buffer *buf, float proc_user, float proc_system,
float machine_total);
void addThread(int tid);

void addThread(int lock_index, int tid);
};

class Lookup {
Expand Down
4 changes: 2 additions & 2 deletions ddprof-lib/src/main/cpp/javaApi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ Java_com_datadoghq_profiler_JavaProfiler_filterThread_1add(JNIEnv *env,
jobject unused) {
ProfiledThread *current = ProfiledThread::current();
int tid = current->tid();
if (tid < 0) {
if (unlikely(tid < 0)) {
return;
}
ThreadFilter *thread_filter = Profiler::instance()->threadFilter();
Expand All @@ -145,7 +145,7 @@ Java_com_datadoghq_profiler_JavaProfiler_filterThread_1remove(JNIEnv *env,
jobject unused) {
ProfiledThread *current = ProfiledThread::current();
int tid = current->tid();
if (tid < 0) {
if (unlikely(tid < 0)) {
return;
}
ThreadFilter *thread_filter = Profiler::instance()->threadFilter();
Expand Down
132 changes: 61 additions & 71 deletions ddprof-lib/src/main/cpp/threadFilter.cpp
Original file line number Diff line number Diff line change
@@ -1,114 +1,104 @@
// todo copyright stuff
// as I rewrote all the implem
// Copyright (C) Datadog 2025
// Implementation of thread filter management

#include "threadFilter.h"
#include <mutex>
#include <thread>
#include <cstdlib>
#include <cstring>

static std::mutex slot_mutex;

#include <cstdlib>
#include <thread>

ThreadFilter::ThreadFilter() : _next_index(0), _enabled(false) {
std::lock_guard<std::mutex> lock(slot_mutex);
_slots.resize(128); // preallocate some slots
// Initialize all slots to -1
for (auto& slot : _slots) {
slot.value.store(-1, std::memory_order_relaxed);
}
ThreadFilter::ThreadFilter()
: _next_index(0), _enabled(false) {
std::unique_lock<std::mutex> lock(_slot_mutex);
_slots.emplace_back(); // Allocate first chunk
clear();
}

ThreadFilter::~ThreadFilter() {
std::lock_guard<std::mutex> lock(slot_mutex);
std::unique_lock<std::mutex> lock(_slot_mutex);
_slots.clear();
_free_list.clear(); // todo: implement this, not needed for benchmark
}

Comment on lines +21 to +33
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good. Have you experimented with a custom stride or thread-local random starting offset to improve the contended situation? Basically, to prevent all contending threads starting at 0 and then competing for all subsequent slots until everyone is satisfied - forcing the most unlucky thread to do N checks, N being the number of competing threads.

ThreadFilter::SlotID ThreadFilter::registerThread() {
int index = _next_index.fetch_add(1, std::memory_order_relaxed);
if (index < static_cast<int>(_slots.size())) {
return index;
SlotID index = _next_index.fetch_add(1, std::memory_order_relaxed);
if (index >= kMaxThreads) {
return -1;
}
// Lock required to safely grow the vector
const int outer_idx = index >> kChunkShift;
{
std::lock_guard<std::mutex> lock(slot_mutex);
size_t current_size = _slots.size();
if (static_cast<size_t>(index) >= current_size) {
_slots.resize(current_size * 2);
// Initialize new slots
for (size_t i = current_size; i < current_size * 2; ++i) {
_slots[i].value.store(-1, std::memory_order_relaxed);
}
if (outer_idx < static_cast<int>(_slots.size())) {
return index;
}
}

{
std::unique_lock<std::mutex> write_lock(_slot_mutex);
while (outer_idx >= static_cast<int>(_slots.size())) {
_slots.emplace_back();
}
}

return index;
}

void ThreadFilter::clear() {
std::lock_guard<std::mutex> lock(slot_mutex);
for (auto& slot : _slots) {
slot.value.store(-1, std::memory_order_relaxed);
for (auto& chunk : _slots) {
for (auto& slot : chunk) {
slot.value.store(-1, std::memory_order_relaxed);
}
}
}

bool ThreadFilter::accept(SlotID slot_id) const {
if (!_enabled) {
return true;
}
if (slot_id < 0) return false;
int outer_idx = slot_id >> kChunkShift;
int inner_idx = slot_id & kChunkMask;
if (outer_idx >= static_cast<int>(_slots.size())) return false;
return _slots[outer_idx][inner_idx].value.load(std::memory_order_acquire) != -1;
}

bool ThreadFilter::accept(int slot_id) const {
if (!_enabled) return true;
return slot_id >= 0 && slot_id < static_cast<int>(_slots.size()) && _slots[slot_id].value.load(std::memory_order_acquire) != -1;
void ThreadFilter::add(int tid, SlotID slot_id) {
int outer_idx = slot_id >> kChunkShift;
int inner_idx = slot_id & kChunkMask;
_slots[outer_idx][inner_idx].value.store(tid, std::memory_order_relaxed);
}

void ThreadFilter::add(int tid, int slot_id) {
_slots[slot_id].value.store(tid, std::memory_order_relaxed);
void ThreadFilter::remove(SlotID slot_id) {
int outer_idx = slot_id >> kChunkShift;
int inner_idx = slot_id & kChunkMask;
_slots[outer_idx][inner_idx].value.store(-1, std::memory_order_relaxed);
}

void ThreadFilter::remove(int slot_id) {
_slots[slot_id].value.store(-1, std::memory_order_relaxed);
void ThreadFilter::unregisterThread(SlotID slot_id) {
if (slot_id < 0) return;
int outer_idx = slot_id >> kChunkShift;
int inner_idx = slot_id & kChunkMask;
_slots[outer_idx][inner_idx].value.store(-1, std::memory_order_relaxed);
}

void ThreadFilter::collect(std::vector<int>& tids) const {
tids.clear();
std::lock_guard<std::mutex> lock(slot_mutex);
for (const auto& slot : _slots) {
int slot_tid = slot.value.load(std::memory_order_acquire);
if (slot_tid != -1) {
tids.push_back(slot_tid);
// std::unique_lock<std::mutex> lock(_slot_mutex);
for (const auto& chunk : _slots) {
for (const auto& slot : chunk) {
int slot_tid = slot.value.load(std::memory_order_acquire);
if (slot_tid != -1) {
tids.push_back(slot_tid);
}
}
}
}

void ThreadFilter::init(const char* filter) {
if (filter == nullptr) {
if (!filter) {
return;
}
// char* end;
// // todo understand this strange init
// do {
// int id = strtol(filter, &end, 0);
// if (id <= 0) {
// break;
// }

// if (*end == '-') {
// int to = strtol(end + 1, &end, 0);
// while (id <= to) {
// add(id++);
// }
// } else {
// add(id);
// }
// filter = end + 1;
// } while (*end);
// TODO: Implement parsing of filter string if needed
_enabled = true;
}

bool ThreadFilter::enabled() const {
return _enabled;
}

// Implementation of unregisterThread - releases a slot by its ID
void ThreadFilter::unregisterThread(SlotID slot_id) {
if (slot_id >= 0 && slot_id < static_cast<int>(_slots.size())) {
// Reset this slot to be available again
_slots[slot_id].value.store(-1, std::memory_order_relaxed);
}
}
28 changes: 18 additions & 10 deletions ddprof-lib/src/main/cpp/threadFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,27 @@
#define _THREADFILTER_H

#include <atomic>
#include <array>
#include <vector>
#include <cstdint>
#include <mutex>

class ThreadFilter {
public:
using SlotID = int;

static constexpr int kChunkSize = 128;
static constexpr int kChunkShift = 7;
static constexpr int kChunkMask = kChunkSize - 1;
static constexpr int kMaxThreads = 2048;
ThreadFilter();
~ThreadFilter();

void init(const char* filter);
void clear();
bool enabled() const;
bool accept(int slot_id) const;
void add(int tid, int slot_id);
void remove(int slot_id); // tid unused, for API consistency
bool accept(SlotID slot_id) const;
void add(int tid, SlotID slot_id);
void remove(SlotID slot_id);
void collect(std::vector<int>& tids) const;

SlotID registerThread();
Expand All @@ -27,15 +32,18 @@ class ThreadFilter {
struct Slot {
std::atomic<int> value{-1};
Slot() = default;
Slot(const Slot&o) { value.store(o.value.load(std::memory_order_relaxed), std::memory_order_relaxed); }
Slot& operator=(const Slot& o) { value.store(o.value.load(std::memory_order_relaxed),
std::memory_order_relaxed); return *this; }
Slot(const Slot& o) { value.store(o.value.load(std::memory_order_relaxed), std::memory_order_relaxed); }
Slot& operator=(const Slot& o) {
value.store(o.value.load(std::memory_order_relaxed), std::memory_order_relaxed);
return *this;
}
};

bool _enabled = false;
std::vector<Slot> _slots;
std::vector<Slot> _free_list;
std::atomic<int> _next_index;
std::vector<std::array<Slot, kChunkSize>> _slots;
std::atomic<SlotID> _next_index;

mutable std::mutex _slot_mutex;
};

#endif // _THREADFILTER_H
1 change: 1 addition & 0 deletions ddprof-lib/src/main/cpp/wallClock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ void WallClockJVMTI::timerLoop() {
}

void WallClockASGCT::timerLoop() {
// todo: re-allocating the vector every time is not efficient
auto collectThreads = [&](std::vector<int>& tids) {
// Get thread IDs from the filter if it's enabled
// Otherwise list all threads in the system
Expand Down