forked from palatable/lambda
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTurnstile.java
More file actions
53 lines (45 loc) · 1.57 KB
/
Turnstile.java
File metadata and controls
53 lines (45 loc) · 1.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package testsupport.concurrent;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.locks.LockSupport.park;
import static java.util.concurrent.locks.LockSupport.unpark;
import static java.util.stream.StreamSupport.stream;
public final class Turnstile {
private final int parties;
private final BlockingQueue<Thread> arrivals;
public Turnstile(int parties) {
this.parties = parties;
arrivals = new ArrayBlockingQueue<>(parties);
}
public void arrive() {
synchronized (this) {
arrivals.add(currentThread());
if (arrivals.size() == parties) {
allowThrough(parties);
return;
}
}
park();
}
public void allowAllThrough() {
allowThrough(arrivals.size());
}
@SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
private void allowThrough(int parked) {
Thread currentThread = currentThread();
Collection<Thread> allowedThrough = new ArrayList<>();
arrivals.drainTo(allowedThrough, parked);
stream(allowedThrough.spliterator(), false)
.filter(t -> !t.equals(currentThread))
.forEach(t -> {
unpark(t);
try {
t.join();
} catch (InterruptedException ignored) {
}
});
}
}