-
Notifications
You must be signed in to change notification settings - Fork 262
Expand file tree
/
Copy pathAsyncSemaphore.java
More file actions
66 lines (61 loc) · 2.14 KB
/
AsyncSemaphore.java
File metadata and controls
66 lines (61 loc) · 2.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// SPDX-License-Identifier: BSD-3-Clause
package org.xbill.DNS;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@Slf4j
final class AsyncSemaphore {
private final Queue<CompletableFuture<Permit>> queue = new ArrayDeque<>();
private final Permit singletonPermit = new Permit();
private final String name;
private volatile int permits;
final class Permit {
public void release(int id, Executor executor) {
synchronized (queue) {
CompletableFuture<Permit> next = queue.poll();
if (next == null) {
permits++;
log.trace("{} permit released id={}, available={}", name, id, permits);
} else {
log.trace("{} permit released id={}, available={}, immediate next", name, id, permits);
executor.execute(() -> next.complete(this));
}
}
}
}
AsyncSemaphore(int permits, String name) {
this.permits = permits;
this.name = name;
log.debug("Using Java 8 implementation for {}", name);
}
CompletionStage<Permit> acquire(Duration timeout, int id, Executor executor) {
synchronized (queue) {
if (permits > 0) {
permits--;
log.trace("{} permit acquired id={}, available={}", name, id, permits);
return CompletableFuture.completedFuture(singletonPermit);
} else {
TimeoutCompletableFuture<Permit> f = new TimeoutCompletableFuture<>();
f.compatTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS)
.whenCompleteAsync(
(result, ex) -> {
synchronized (queue) {
if (ex != null) {
log.trace("{} permit timed out id={}, available={}", name, id, permits);
}
queue.remove(f);
}
},
executor);
log.trace("{} permit queued id={}, available={}", name, id, permits);
queue.add(f);
return f;
}
}
}
}