-
-
Notifications
You must be signed in to change notification settings - Fork 40
Expand file tree
/
Copy pathspsc_queue.zig
More file actions
128 lines (104 loc) · 4.4 KB
/
Copy pathspsc_queue.zig
File metadata and controls
128 lines (104 loc) · 4.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
//! Taken from https://github.com/freref/spsc-queue
const std = @import("std");
const cache_line = std.atomic.cache_line;
/// We align the producer and consumer to different cache lines to avoid false
/// sharing between them. We Pad the producer and consumer to ensure that they
/// take up a full cache line each.
fn Pad(comptime N: usize, comptime T: type) type {
const sz = @sizeOf(T);
const rem = sz % N;
return [if (rem == 0) 0 else N - rem]u8;
}
const Producer = struct {
push_cursor: std.atomic.Value(usize) = .{ .raw = 0 },
_pad: Pad(cache_line, std.atomic.Value(usize)) = undefined,
};
const Consumer = struct {
pop_cursor: std.atomic.Value(usize) = .{ .raw = 0 },
_pad: Pad(cache_line, std.atomic.Value(usize)) = undefined,
};
/// A single-producer, single-consumer lock-free queue using a ring buffer.
/// Following the conventions from the Zig standard library.
pub fn SpscQueue(comptime T: type) type {
return struct {
const Self = @This();
items: []T,
len: usize,
mask: usize,
producer: Producer align(cache_line) = .{},
consumer: Consumer align(cache_line) = .{},
pop_cursor_cache: usize = 0,
push_cursor_cache: usize = 0,
pub fn initBuffer(buffer: []T) Self {
std.debug.assert(std.math.isPowerOfTwo(buffer.len));
return Self{
.items = buffer,
.len = buffer.len,
.mask = buffer.len - 1,
};
}
pub fn initCapacity(allocator: std.mem.Allocator, num: usize) !Self {
std.debug.assert(std.math.isPowerOfTwo(num));
const items = try allocator.alloc(T, num);
return .{
.items = items,
.len = items.len,
.mask = items.len - 1,
};
}
pub fn deinit(self: *Self, allocator: std.mem.Allocator) void {
allocator.free(self.items);
}
/// Returns true if the queue is empty.
pub fn isEmpty(self: *Self) bool {
const r = self.consumer.pop_cursor.load(.acquire);
const w = self.producer.push_cursor.load(.acquire);
return r == w;
}
pub fn size(self: *Self) usize {
const r = self.consumer.pop_cursor.load(.acquire);
const w = self.producer.push_cursor.load(.acquire);
return w - r;
}
/// Blocking push, spins until there is room in the queue.
pub fn push(self: *Self, value: T) void {
const w = self.producer.push_cursor.load(.monotonic);
// Spin while full (w - r == len)
while ((w - self.pop_cursor_cache) == self.len) {
self.pop_cursor_cache = self.consumer.pop_cursor.load(.acquire);
if ((w - self.pop_cursor_cache) == self.len) {
std.atomic.spinLoopHint();
}
}
self.items[w & self.mask] = value;
self.producer.push_cursor.store(w + 1, .release);
}
/// Non-blocking push, returns false if the queue is full.
pub fn tryPush(self: *Self, value: T) bool {
const w = self.producer.push_cursor.load(.monotonic);
if ((w - self.pop_cursor_cache) == self.len) {
self.pop_cursor_cache = self.consumer.pop_cursor.load(.acquire);
if ((w - self.pop_cursor_cache) == self.len) return false;
}
self.items[w & self.mask] = value;
self.producer.push_cursor.store(w + 1, .release);
return true;
}
/// Returns a pointer to the front item, or null if the queue is empty.
pub fn front(self: *Self) ?*T {
const r = self.consumer.pop_cursor.load(.monotonic);
if (r == self.push_cursor_cache) {
self.push_cursor_cache = self.producer.push_cursor.load(.acquire);
if (self.push_cursor_cache == r) return null;
}
return &self.items[r & self.mask];
}
/// IMPORTANT: pop must only be called after front() returned non-null.
/// The consumer is responsible for cleaning up the item if needed.
pub fn pop(self: *Self) void {
const r = self.consumer.pop_cursor.load(.monotonic);
std.debug.assert(self.producer.push_cursor.load(.acquire) != r);
self.consumer.pop_cursor.store(r + 1, .release);
}
};
}