-
Notifications
You must be signed in to change notification settings - Fork 11
Refactor thread filter mechanisms #209
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
3b91530
913d2b6
873fcb2
be0752b
0f92917
258f23e
1d4efc0
115a60a
efa094e
1be1c3a
4772169
60ef060
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,156 +1,108 @@ | ||
| /* | ||
| * Copyright 2020 Andrei Pangin | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| // todo copyright stuff | ||
| // as I rewrote all the implem | ||
|
|
||
| #include "threadFilter.h" | ||
| #include "counters.h" | ||
| #include "os.h" | ||
| #include <stdlib.h> | ||
| #include <string.h> | ||
| #include <vector> | ||
| #include <mutex> | ||
| #include <thread> | ||
| #include <cstdlib> | ||
| #include <cstring> | ||
|
|
||
| void trackPage() { | ||
| Counters::increment(THREAD_FILTER_PAGES, 1); | ||
| Counters::increment(THREAD_FILTER_BYTES, BITMAP_SIZE); | ||
| } | ||
| static ThreadFilter* global_filter = nullptr; | ||
| thread_local ThreadFilter::SlotID tls_slot_id = -1; // todo, use the signal safe stuff | ||
| static std::mutex slot_mutex; | ||
|
|
||
| ThreadFilter::ThreadFilter() { | ||
| _max_thread_id = OS::getMaxThreadId(128 * 1024); | ||
| _max_bitmaps = (_max_thread_id + BITMAP_SIZE - 1) / BITMAP_SIZE; | ||
| u32 capacity = _max_bitmaps * sizeof(u64 *); | ||
| _bitmap = (u64 **)OS::safeAlloc(capacity); | ||
| memset(_bitmap, 0, capacity); | ||
| _bitmap[0] = (u64 *)OS::safeAlloc(BITMAP_SIZE); | ||
| trackPage(); | ||
| _enabled = false; | ||
| _size = 0; | ||
| ThreadFilter::ThreadFilter() : _next_index(0) { | ||
| std::lock_guard<std::mutex> lock(slot_mutex); | ||
| _slots.resize(128); // preallocate some slots | ||
| } | ||
|
|
||
| ThreadFilter::~ThreadFilter() { | ||
| for (int i = 0; i < _max_bitmaps; i++) { | ||
| if (_bitmap[i] != NULL) { | ||
| OS::safeFree(_bitmap[i], BITMAP_SIZE); | ||
| } | ||
| } | ||
| if (_bitmap) { | ||
| OS::safeFree(_bitmap, _max_bitmaps * sizeof(u64 *)); | ||
| } | ||
| std::lock_guard<std::mutex> lock(slot_mutex); | ||
| _slots.clear(); | ||
| } | ||
|
|
||
|
Comment on lines
+21
to
+33
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks good. Have you experimented with a custom stride or thread-local random starting offset to improve the contended situation? Basically, to prevent all contending threads starting at 0 and then competing for all subsequent slots until everyone is satisfied - forcing the most unlucky thread to do N checks, N being the number of competing threads. |
||
| void ThreadFilter::init(const char *filter) { | ||
| if (filter == NULL) { | ||
| _enabled = false; | ||
| return; | ||
| ThreadFilter::SlotID ThreadFilter::registerThread() { | ||
| int index = _next_index.fetch_add(1, std::memory_order_relaxed); | ||
| if (index < static_cast<int>(_slots.size())) { | ||
| return index; | ||
| } | ||
|
|
||
| char *end; | ||
| do { | ||
| int id = strtol(filter, &end, 0); | ||
| if (id <= 0) { | ||
| break; | ||
| // Lock required to safely grow the vector | ||
| { | ||
| std::lock_guard<std::mutex> lock(slot_mutex); | ||
| size_t current_size = _slots.size(); | ||
| if (static_cast<size_t>(index) >= current_size) { | ||
| _slots.resize(current_size * 2); | ||
| } | ||
| } | ||
| return index; | ||
| } | ||
|
|
||
| if (*end == '-') { | ||
| int to = strtol(end + 1, &end, 0); | ||
| while (id <= to) { | ||
| add(id++); | ||
| } | ||
| } else { | ||
| add(id); | ||
| ThreadFilter::SlotID ThreadFilter::ensureThreadRegistered() { | ||
| if (tls_slot_id == -1) { | ||
| tls_slot_id = registerThread(); | ||
| } | ||
|
|
||
| filter = end + 1; | ||
| } while (*end); | ||
|
|
||
| _enabled = true; | ||
| return tls_slot_id; | ||
| } | ||
|
|
||
| void ThreadFilter::clear() { | ||
| for (int i = 0; i < _max_bitmaps; i++) { | ||
| if (_bitmap[i] != NULL) { | ||
| memset(_bitmap[i], 0, BITMAP_SIZE); | ||
| std::lock_guard<std::mutex> lock(slot_mutex); | ||
| for (auto& slot : _slots) { | ||
| slot.value.store(-1, std::memory_order_relaxed); | ||
| } | ||
| } | ||
| _size = 0; | ||
| } | ||
|
|
||
| bool ThreadFilter::accept(int thread_id) { | ||
| u64 *b = bitmap(thread_id); | ||
| if (b == NULL) return false; | ||
| u32 reversed = reverseBits(thread_id); | ||
| return word(b, thread_id) & (1ULL << (reversed & 0x3f)); | ||
| bool ThreadFilter::accept(int tid) const { | ||
| if (!_enabled) return true; | ||
| SlotID id = tls_slot_id; | ||
| return id >= 0 && id < static_cast<int>(_slots.size()) && _slots[id].value.load(std::memory_order_acquire) != -1; | ||
| } | ||
|
|
||
| void ThreadFilter::add(int thread_id) { | ||
| u64 *b = bitmap(thread_id); | ||
| if (b == NULL) { | ||
| b = (u64 *)OS::safeAlloc(BITMAP_SIZE); | ||
| u64 *oldb = __sync_val_compare_and_swap( | ||
| &_bitmap[(u32)thread_id / BITMAP_CAPACITY], NULL, b); | ||
| if (oldb != NULL) { | ||
| OS::safeFree(b, BITMAP_SIZE); | ||
| b = oldb; | ||
| } else { | ||
| trackPage(); | ||
| } | ||
| } | ||
|
|
||
| u32 reversed = reverseBits(thread_id); | ||
| u64 bit = 1ULL << (reversed & 0x3f); | ||
| if (!(__sync_fetch_and_or(&word(b, thread_id), bit) & bit)) { | ||
| atomicInc(_size); | ||
| } | ||
| void ThreadFilter::add(int tid) { | ||
| SlotID id = ensureThreadRegistered(); | ||
| _slots[id].value.store(tid, std::memory_order_relaxed); | ||
| } | ||
|
|
||
| void ThreadFilter::remove(int thread_id) { | ||
| u64 *b = bitmap(thread_id); | ||
| if (b == NULL) { | ||
| return; | ||
| } | ||
|
|
||
| u32 reversed = reverseBits(thread_id); | ||
| u64 bit = 1ULL << (reversed & 0x3f); | ||
| if (__sync_fetch_and_and(&word(b, thread_id), ~bit) & bit) { | ||
| atomicInc(_size, -1); | ||
| } | ||
| void ThreadFilter::remove(int /*tid*/) { | ||
| SlotID id = ensureThreadRegistered(); // we probably are already registered | ||
| _slots[id].value.store(-1, std::memory_order_relaxed); | ||
| } | ||
|
|
||
| void ThreadFilter::collect(std::vector<int>& tids) { | ||
| tids.reserve(_size); // Pre-allocate space for efficiency | ||
|
|
||
| // Iterate through the bitmap array | ||
| for (int i = 0; i < _max_bitmaps; i++) { | ||
| u64* b = _bitmap[i]; | ||
| if (b != NULL) { | ||
| int start_id = i * BITMAP_CAPACITY; | ||
| for (int j = 0; j < BITMAP_SIZE / sizeof(u64); j++) { | ||
| u64 word = __atomic_load_n(&b[j], __ATOMIC_ACQUIRE); | ||
| while (word != 0) { | ||
| int bit_pos = __builtin_ctzl(word); | ||
| // For each bit position, we need to find all thread IDs that would map to it | ||
| for (int k = 0; k < 64; k++) { | ||
| int thread_id = start_id + (j << 6) + k; | ||
| u32 reversed = reverseBits(thread_id); | ||
| if ((reversed & 0x3f) == bit_pos) { | ||
| tids.push_back(thread_id); | ||
| } | ||
| } | ||
| word &= (word - 1); | ||
| } | ||
| } | ||
| void ThreadFilter::collect(std::vector<int>& tids) const { | ||
| tids.clear(); | ||
| std::lock_guard<std::mutex> lock(slot_mutex); | ||
| for (const auto& slot : _slots) { | ||
| int tid = slot.value.load(std::memory_order_acquire); | ||
| if (tid != -1) { | ||
| tids.push_back(tid); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| void ThreadFilter::init(const char* filter) { | ||
| if (filter == nullptr) { | ||
| return; | ||
| } | ||
| // char* end; | ||
| // // todo understand this strange init | ||
| // do { | ||
| // int id = strtol(filter, &end, 0); | ||
| // if (id <= 0) { | ||
| // break; | ||
| // } | ||
|
|
||
| // if (*end == '-') { | ||
| // int to = strtol(end + 1, &end, 0); | ||
| // while (id <= to) { | ||
| // add(id++); | ||
| // } | ||
| // } else { | ||
| // add(id); | ||
| // } | ||
| // filter = end + 1; | ||
| // } while (*end); | ||
| _enabled = true; | ||
| } | ||
|
|
||
| bool ThreadFilter::enabled() const { | ||
| return _enabled; | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,84 +1,40 @@ | ||
| /* | ||
| * Copyright 2020 Andrei Pangin | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| #ifndef _THREADFILTER_H | ||
| #define _THREADFILTER_H | ||
|
|
||
| #include "arch.h" | ||
| #include <atomic> | ||
| #include <vector> | ||
| #include <cstdint> | ||
|
|
||
| // The size of thread ID bitmap in bytes. Must be at least 64K to allow mmap() | ||
| const u32 BITMAP_SIZE = 65536; | ||
| // How many thread IDs one bitmap can hold | ||
| const u32 BITMAP_CAPACITY = BITMAP_SIZE * 8; | ||
|
|
||
| // ThreadFilter query operations must be lock-free and signal-safe; | ||
| // update operations are mostly lock-free, except rare bitmap allocations | ||
| class ThreadFilter { | ||
| private: | ||
| // Total number of bitmaps required to hold the entire range of thread IDs | ||
| u32 _max_thread_id; | ||
| u32 _max_bitmaps; | ||
| u64 **_bitmap; | ||
| bool _enabled; | ||
| volatile int _size; | ||
|
|
||
| u64 *bitmap(int thread_id) { | ||
| if (thread_id >= _max_thread_id) { | ||
| return NULL; | ||
| } | ||
| return __atomic_load_n( | ||
| &(_bitmap[static_cast<u32>(thread_id) / BITMAP_CAPACITY]), | ||
| __ATOMIC_ACQUIRE); | ||
| } | ||
|
|
||
| static u32 reverseBits(u32 n) { | ||
| u32 x = n & 0x3f; // isolate lower 6 bits | ||
| x = ((x & 0x01) << 5) | | ||
| ((x & 0x02) << 3) | | ||
| ((x & 0x04) << 1) | | ||
| ((x & 0x08) >> 1) | | ||
| ((x & 0x10) >> 3) | | ||
| ((x & 0x20) >> 5); | ||
| return (n & ~0x3f) | x; | ||
| } | ||
|
|
||
|
|
||
| // Map thread ID to word index | ||
| u64& word(u64 *bitmap, u32 thread_id) { | ||
| return bitmap[thread_id >> 6]; | ||
| } | ||
|
|
||
| public: | ||
| ThreadFilter(); | ||
| ThreadFilter(ThreadFilter &threadFilter) = delete; | ||
| ~ThreadFilter(); | ||
|
|
||
| bool enabled() { return _enabled; } | ||
| using SlotID = int; | ||
|
|
||
| int size() { return _size; } | ||
| ThreadFilter(); | ||
| ~ThreadFilter(); | ||
|
|
||
| void init(const char *filter); | ||
| void clear(); | ||
| void init(const char* filter); | ||
| void clear(); | ||
| bool enabled() const; | ||
| bool accept(int thread_id) const; | ||
| void add(int thread_id); | ||
| void remove(int thread_id); // tid unused, for API consistency | ||
| void collect(std::vector<int>& tids) const; | ||
| SlotID ensureThreadRegistered(); | ||
|
|
||
| bool accept(int thread_id); | ||
| void add(int thread_id); | ||
| void remove(int thread_id); | ||
|
|
||
| void collect(std::vector<int> &v); | ||
| private: | ||
| struct Slot { | ||
| std::atomic<int> value{-1}; | ||
| Slot() = default; | ||
| Slot(const Slot&o) { value.store(o.value.load(std::memory_order_relaxed), std::memory_order_relaxed); } | ||
| Slot& operator=(const Slot& o) { value.store(o.value.load(std::memory_order_relaxed), | ||
| std::memory_order_relaxed); return *this; } | ||
| }; | ||
|
|
||
| SlotID registerThread(); | ||
|
|
||
| bool _enabled = false; | ||
| std::vector<Slot> _slots; | ||
| std::atomic<int> _next_index; | ||
| }; | ||
|
|
||
| #endif // _THREADFILTER_H |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ok to remove this call? We should make sure that the recording thread is also included as events can be associated with it - recording info, settings, config - all point to _tid thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I do (with the
threads.insert(_tid);)I had to move it down a few lines.