-
Notifications
You must be signed in to change notification settings - Fork 11
Refactor thread filter mechanisms #209
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
3b91530
913d2b6
873fcb2
be0752b
0f92917
258f23e
1d4efc0
115a60a
efa094e
1be1c3a
4772169
60ef060
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
remove the usage of the thread filter in the recording
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -317,7 +317,7 @@ char *Recording::_jvm_flags = NULL; | |
| char *Recording::_java_command = NULL; | ||
|
|
||
| Recording::Recording(int fd, Arguments &args) | ||
| : _fd(fd), _thread_set(), _method_map() { | ||
| : _fd(fd), _method_map() { | ||
|
|
||
| args.save(_args); | ||
| _chunk_start = lseek(_fd, 0, SEEK_END); | ||
|
|
@@ -329,6 +329,8 @@ Recording::Recording(int fd, Arguments &args) | |
| _bytes_written = 0; | ||
|
|
||
| _tid = OS::threadId(); | ||
| _active_index.store(0, std::memory_order_relaxed); | ||
|
|
||
| VM::jvmti()->GetAvailableProcessors(&_available_processors); | ||
|
|
||
| writeHeader(_buf); | ||
|
|
@@ -1053,10 +1055,18 @@ void Recording::writeExecutionModes(Buffer *buf) { | |
| } | ||
|
|
||
| void Recording::writeThreads(Buffer *buf) { | ||
| addThread(_tid); | ||
| std::vector<int> threads; | ||
| _thread_set.collect(threads); | ||
| _thread_set.clear(); | ||
| int old_index = _active_index.fetch_xor(1, std::memory_order_acq_rel); | ||
| // After flip: new samples go into the new active set | ||
| // We flush from old_index (the previous active set) | ||
|
|
||
| std::unordered_set<int> threads; | ||
| threads.insert(_tid); | ||
|
|
||
| for (int i = 0; i < CONCURRENCY_LEVEL; ++i) { | ||
| // I can not use merge : cpp 17 | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unrelated - but we might try to bump the accepted C++ level to 17. It should be possible ...
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! |
||
| threads.insert(_thread_ids[i][old_index].begin(), _thread_ids[i][old_index].end()); | ||
| _thread_ids[i][old_index].clear(); | ||
| } | ||
|
|
||
| Profiler *profiler = Profiler::instance(); | ||
| ThreadInfo *t_info = &profiler->_thread_info; | ||
|
|
@@ -1065,15 +1075,15 @@ void Recording::writeThreads(Buffer *buf) { | |
|
|
||
| buf->putVar64(T_THREAD); | ||
| buf->putVar64(threads.size()); | ||
| for (int i = 0; i < threads.size(); i++) { | ||
| for (auto tid : threads) { | ||
| const char *thread_name; | ||
| jlong thread_id; | ||
| std::pair<std::shared_ptr<std::string>, u64> info = t_info->get(threads[i]); | ||
| std::pair<std::shared_ptr<std::string>, u64> info = t_info->get(tid); | ||
| if (info.first) { | ||
| thread_name = info.first->c_str(); | ||
| thread_id = info.second; | ||
| } else { | ||
| snprintf(name_buf, sizeof(name_buf), "[tid=%d]", threads[i]); | ||
| snprintf(name_buf, sizeof(name_buf), "[tid=%d]", tid); | ||
| thread_name = name_buf; | ||
| thread_id = 0; | ||
| } | ||
|
|
@@ -1083,9 +1093,9 @@ void Recording::writeThreads(Buffer *buf) { | |
| (thread_id == 0 ? length + 1 : 2 * length) - | ||
| 3 * 10; // 3x max varint length | ||
| flushIfNeeded(buf, required); | ||
| buf->putVar64(threads[i]); | ||
| buf->putVar64(tid); | ||
| buf->putUtf8(thread_name, length); | ||
| buf->putVar64(threads[i]); | ||
| buf->putVar64(tid); | ||
| if (thread_id == 0) { | ||
| buf->put8(0); | ||
| } else { | ||
|
|
@@ -1419,8 +1429,10 @@ void Recording::recordCpuLoad(Buffer *buf, float proc_user, float proc_system, | |
| flushIfNeeded(buf); | ||
| } | ||
|
|
||
| void Recording::addThread(int tid) { | ||
| _thread_set.add(tid, 0); // todo: add slot_id management | ||
| // assumption is that we hold the lock (with lock_index) | ||
| void Recording::addThread(int lock_index, int tid) { | ||
| int active = _active_index.load(std::memory_order_acquire); | ||
| _thread_ids[lock_index][active].insert(tid); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder - does the ordering matter here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not acceptable: no allocations can take place here. |
||
| } | ||
|
|
||
| Error FlightRecorder::start(Arguments &args, bool reset) { | ||
|
|
@@ -1578,7 +1590,7 @@ void FlightRecorder::recordEvent(int lock_index, int tid, u32 call_trace_id, | |
| break; | ||
| } | ||
| _rec->flushIfNeeded(buf); | ||
| _rec->addThread(tid); | ||
| _rec->addThread(lock_index, tid); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| #define _FLIGHTRECORDER_H | ||
|
|
||
| #include <map> | ||
| #include <unordered_set> | ||
|
|
||
| #include <limits.h> | ||
| #include <string.h> | ||
|
|
@@ -127,9 +128,11 @@ class Recording { | |
| static char *_java_command; | ||
|
|
||
| RecordingBuffer _buf[CONCURRENCY_LEVEL]; | ||
| std::unordered_set<int> _thread_ids[CONCURRENCY_LEVEL][2]; | ||
| std::atomic<int> _active_index{0}; // 0 or 1 globally | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder - can this be factored out to a separate type? |
||
|
|
||
| int _fd; | ||
| off_t _chunk_start; | ||
| ThreadFilter _thread_set; | ||
| MethodMap _method_map; | ||
|
|
||
| Arguments _args; | ||
|
|
@@ -158,7 +161,7 @@ class Recording { | |
| public: | ||
| Recording(int fd, Arguments &args); | ||
| ~Recording(); | ||
|
|
||
| void copyTo(int target_fd); | ||
| off_t finishChunk(); | ||
|
|
||
|
|
@@ -256,7 +259,8 @@ class Recording { | |
| LockEvent *event); | ||
| void recordCpuLoad(Buffer *buf, float proc_user, float proc_system, | ||
| float machine_total); | ||
| void addThread(int tid); | ||
|
|
||
| void addThread(int lock_index, int tid); | ||
| }; | ||
|
|
||
| class Lookup { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,114 +1,104 @@ | ||
| // todo copyright stuff | ||
| // as I rewrote all the implem | ||
| // Copyright (C) Datadog 2025 | ||
| // Implementation of thread filter management | ||
|
|
||
| #include "threadFilter.h" | ||
| #include <mutex> | ||
| #include <thread> | ||
| #include <cstdlib> | ||
| #include <cstring> | ||
|
|
||
| static std::mutex slot_mutex; | ||
|
|
||
| #include <cstdlib> | ||
| #include <thread> | ||
|
|
||
| ThreadFilter::ThreadFilter() : _next_index(0), _enabled(false) { | ||
| std::lock_guard<std::mutex> lock(slot_mutex); | ||
| _slots.resize(128); // preallocate some slots | ||
| // Initialize all slots to -1 | ||
| for (auto& slot : _slots) { | ||
| slot.value.store(-1, std::memory_order_relaxed); | ||
| } | ||
| ThreadFilter::ThreadFilter() | ||
| : _next_index(0), _enabled(false) { | ||
| std::unique_lock<std::mutex> lock(_slot_mutex); | ||
| _slots.emplace_back(); // Allocate first chunk | ||
| clear(); | ||
| } | ||
|
|
||
| ThreadFilter::~ThreadFilter() { | ||
| std::lock_guard<std::mutex> lock(slot_mutex); | ||
| std::unique_lock<std::mutex> lock(_slot_mutex); | ||
| _slots.clear(); | ||
| _free_list.clear(); // todo: implement this, not needed for benchmark | ||
| } | ||
|
|
||
|
Comment on lines
+21
to
+33
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks good. Have you experimented with a custom stride or thread-local random starting offset to improve the contended situation? Basically, to prevent all contending threads starting at 0 and then competing for all subsequent slots until everyone is satisfied - forcing the most unlucky thread to do N checks, N being the number of competing threads. |
||
| ThreadFilter::SlotID ThreadFilter::registerThread() { | ||
| int index = _next_index.fetch_add(1, std::memory_order_relaxed); | ||
| if (index < static_cast<int>(_slots.size())) { | ||
| return index; | ||
| SlotID index = _next_index.fetch_add(1, std::memory_order_relaxed); | ||
| if (index >= kMaxThreads) { | ||
| return -1; | ||
| } | ||
| // Lock required to safely grow the vector | ||
| const int outer_idx = index >> kChunkShift; | ||
| { | ||
| std::lock_guard<std::mutex> lock(slot_mutex); | ||
| size_t current_size = _slots.size(); | ||
| if (static_cast<size_t>(index) >= current_size) { | ||
| _slots.resize(current_size * 2); | ||
| // Initialize new slots | ||
| for (size_t i = current_size; i < current_size * 2; ++i) { | ||
| _slots[i].value.store(-1, std::memory_order_relaxed); | ||
| } | ||
| if (outer_idx < static_cast<int>(_slots.size())) { | ||
| return index; | ||
| } | ||
| } | ||
|
|
||
| { | ||
| std::unique_lock<std::mutex> write_lock(_slot_mutex); | ||
| while (outer_idx >= static_cast<int>(_slots.size())) { | ||
| _slots.emplace_back(); | ||
| } | ||
| } | ||
|
|
||
| return index; | ||
| } | ||
|
|
||
| void ThreadFilter::clear() { | ||
| std::lock_guard<std::mutex> lock(slot_mutex); | ||
| for (auto& slot : _slots) { | ||
| slot.value.store(-1, std::memory_order_relaxed); | ||
| for (auto& chunk : _slots) { | ||
| for (auto& slot : chunk) { | ||
| slot.value.store(-1, std::memory_order_relaxed); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| bool ThreadFilter::accept(SlotID slot_id) const { | ||
| if (!_enabled) { | ||
| return true; | ||
| } | ||
| if (slot_id < 0) return false; | ||
| int outer_idx = slot_id >> kChunkShift; | ||
| int inner_idx = slot_id & kChunkMask; | ||
| if (outer_idx >= static_cast<int>(_slots.size())) return false; | ||
| return _slots[outer_idx][inner_idx].value.load(std::memory_order_acquire) != -1; | ||
| } | ||
|
|
||
| bool ThreadFilter::accept(int slot_id) const { | ||
| if (!_enabled) return true; | ||
| return slot_id >= 0 && slot_id < static_cast<int>(_slots.size()) && _slots[slot_id].value.load(std::memory_order_acquire) != -1; | ||
| void ThreadFilter::add(int tid, SlotID slot_id) { | ||
| int outer_idx = slot_id >> kChunkShift; | ||
| int inner_idx = slot_id & kChunkMask; | ||
| _slots[outer_idx][inner_idx].value.store(tid, std::memory_order_relaxed); | ||
| } | ||
|
|
||
| void ThreadFilter::add(int tid, int slot_id) { | ||
| _slots[slot_id].value.store(tid, std::memory_order_relaxed); | ||
| void ThreadFilter::remove(SlotID slot_id) { | ||
| int outer_idx = slot_id >> kChunkShift; | ||
| int inner_idx = slot_id & kChunkMask; | ||
| _slots[outer_idx][inner_idx].value.store(-1, std::memory_order_relaxed); | ||
| } | ||
|
|
||
| void ThreadFilter::remove(int slot_id) { | ||
| _slots[slot_id].value.store(-1, std::memory_order_relaxed); | ||
| void ThreadFilter::unregisterThread(SlotID slot_id) { | ||
| if (slot_id < 0) return; | ||
| int outer_idx = slot_id >> kChunkShift; | ||
| int inner_idx = slot_id & kChunkMask; | ||
| _slots[outer_idx][inner_idx].value.store(-1, std::memory_order_relaxed); | ||
| } | ||
|
|
||
| void ThreadFilter::collect(std::vector<int>& tids) const { | ||
| tids.clear(); | ||
| std::lock_guard<std::mutex> lock(slot_mutex); | ||
| for (const auto& slot : _slots) { | ||
| int slot_tid = slot.value.load(std::memory_order_acquire); | ||
| if (slot_tid != -1) { | ||
| tids.push_back(slot_tid); | ||
| // std::unique_lock<std::mutex> lock(_slot_mutex); | ||
| for (const auto& chunk : _slots) { | ||
| for (const auto& slot : chunk) { | ||
| int slot_tid = slot.value.load(std::memory_order_acquire); | ||
| if (slot_tid != -1) { | ||
| tids.push_back(slot_tid); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| void ThreadFilter::init(const char* filter) { | ||
| if (filter == nullptr) { | ||
| if (!filter) { | ||
| return; | ||
| } | ||
| // char* end; | ||
| // // todo understand this strange init | ||
| // do { | ||
| // int id = strtol(filter, &end, 0); | ||
| // if (id <= 0) { | ||
| // break; | ||
| // } | ||
|
|
||
| // if (*end == '-') { | ||
| // int to = strtol(end + 1, &end, 0); | ||
| // while (id <= to) { | ||
| // add(id++); | ||
| // } | ||
| // } else { | ||
| // add(id); | ||
| // } | ||
| // filter = end + 1; | ||
| // } while (*end); | ||
| // TODO: Implement parsing of filter string if needed | ||
| _enabled = true; | ||
| } | ||
|
|
||
| bool ThreadFilter::enabled() const { | ||
| return _enabled; | ||
| } | ||
|
|
||
| // Implementation of unregisterThread - releases a slot by its ID | ||
| void ThreadFilter::unregisterThread(SlotID slot_id) { | ||
| if (slot_id >= 0 && slot_id < static_cast<int>(_slots.size())) { | ||
| // Reset this slot to be available again | ||
| _slots[slot_id].value.store(-1, std::memory_order_relaxed); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ok to remove this call? We should make sure that the recording thread is also included as events can be associated with it - recording info, settings, config - all point to _tid thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I do (with the
threads.insert(_tid);)I had to move it down a few lines.