Skip to content

Commit fc13d7d

Browse files
committed
Reorganize WeakConcurrent implementation to allow cleanup thread GC when map is out of scope
1 parent 2fd7858 commit fc13d7d

2 files changed

Lines changed: 88 additions & 35 deletions

File tree

dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/WeakMapSuppliers.java

Lines changed: 64 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.concurrent.ScheduledExecutorService;
1313
import java.util.concurrent.ThreadFactory;
1414
import java.util.concurrent.TimeUnit;
15+
import java.util.concurrent.atomic.AtomicBoolean;
1516

1617
class WeakMapSuppliers {
1718
// Comparison with using WeakConcurrentMap vs Guava's implementation:
@@ -57,55 +58,86 @@ public Thread newThread(final Runnable r) {
5758
private final Queue<WeakReference<WeakConcurrentMap>> suppliedMaps =
5859
new ConcurrentLinkedQueue<>();
5960

60-
private final Runnable runnable =
61-
new Runnable() {
62-
@Override
63-
public void run() {
64-
for (final Iterator<WeakReference<WeakConcurrentMap>> iterator =
65-
suppliedMaps.iterator();
66-
iterator.hasNext(); ) {
67-
final WeakConcurrentMap map = iterator.next().get();
68-
if (map == null) {
69-
iterator.remove();
70-
} else {
71-
map.expungeStaleEntries();
72-
}
73-
}
74-
}
75-
};
61+
private final AtomicBoolean finalized = new AtomicBoolean(false);
7662

7763
WeakConcurrent() {
7864
cleanerExecutorService = Executors.newScheduledThreadPool(1, THREAD_FACTORY);
7965
cleanerExecutorService.scheduleAtFixedRate(
80-
runnable, CLEAN_FREQUENCY_SECONDS, CLEAN_FREQUENCY_SECONDS, TimeUnit.SECONDS);
66+
new CleanupRunnable(cleanerExecutorService, suppliedMaps, finalized),
67+
CLEAN_FREQUENCY_SECONDS,
68+
CLEAN_FREQUENCY_SECONDS,
69+
TimeUnit.SECONDS);
8170

8271
try {
83-
Runtime.getRuntime()
84-
.addShutdownHook(
85-
new Thread() {
86-
@Override
87-
public void run() {
88-
try {
89-
cleanerExecutorService.shutdownNow();
90-
cleanerExecutorService.awaitTermination(
91-
SHUTDOWN_WAIT_SECONDS, TimeUnit.SECONDS);
92-
} catch (final InterruptedException e) {
93-
// Don't bother waiting then...
94-
}
95-
}
96-
});
72+
Runtime.getRuntime().addShutdownHook(new ShutdownCallback(cleanerExecutorService));
9773
} catch (final IllegalStateException ex) {
9874
// The JVM is already shutting down.
9975
}
10076
}
10177

78+
@Override
79+
public void finalize() {
80+
finalized.set(true);
81+
}
82+
10283
@Override
10384
public <K, V> WeakMap<K, V> get() {
10485
final WeakConcurrentMap<K, V> map = new WeakConcurrentMap<>(false);
10586
suppliedMaps.add(new WeakReference<WeakConcurrentMap>(map));
10687
return new Adapter<>(map);
10788
}
10889

90+
private static class CleanupRunnable implements Runnable {
91+
92+
private final ScheduledExecutorService executorService;
93+
private final Queue<WeakReference<WeakConcurrentMap>> suppliedMaps;
94+
private final AtomicBoolean finalized;
95+
96+
public CleanupRunnable(
97+
final ScheduledExecutorService executorService,
98+
final Queue<WeakReference<WeakConcurrentMap>> suppliedMaps,
99+
final AtomicBoolean finalized) {
100+
this.executorService = executorService;
101+
this.suppliedMaps = suppliedMaps;
102+
this.finalized = finalized;
103+
}
104+
105+
@Override
106+
public void run() {
107+
for (final Iterator<WeakReference<WeakConcurrentMap>> iterator = suppliedMaps.iterator();
108+
iterator.hasNext(); ) {
109+
final WeakConcurrentMap map = iterator.next().get();
110+
if (map == null) {
111+
iterator.remove();
112+
} else {
113+
map.expungeStaleEntries();
114+
}
115+
}
116+
if (finalized.get() && suppliedMaps.isEmpty()) {
117+
executorService.shutdown();
118+
}
119+
}
120+
}
121+
122+
private static final class ShutdownCallback extends Thread {
123+
124+
private final ScheduledExecutorService executorService;
125+
126+
public ShutdownCallback(final ScheduledExecutorService executorService) {
127+
this.executorService = executorService;
128+
}
129+
130+
@Override
131+
public void run() {
132+
try {
133+
executorService.shutdownNow();
134+
executorService.awaitTermination(SHUTDOWN_WAIT_SECONDS, TimeUnit.SECONDS);
135+
} catch (final InterruptedException e) {
136+
// Don't bother waiting then...
137+
}
138+
}
139+
}
140+
109141
private static class Adapter<K, V> implements WeakMap<K, V> {
110142
private final WeakConcurrentMap<K, V> map;
111143

@@ -150,7 +182,7 @@ public <K, V> WeakMap<K, V> get() {
150182
return new WeakMap.MapAdapter<>(new MapMaker().weakKeys().<K, V>makeMap());
151183
}
152184

153-
public <K, V> WeakMap<K, V> get(int concurrencyLevel) {
185+
public <K, V> WeakMap<K, V> get(final int concurrencyLevel) {
154186
return new WeakMap.MapAdapter<>(
155187
new MapMaker().concurrencyLevel(concurrencyLevel).weakKeys().<K, V>makeMap());
156188
}

dd-java-agent/agent-tooling/src/test/groovy/datadog/trace/agent/tooling/WeakConcurrentSupplierTest.groovy

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,35 @@ class WeakConcurrentSupplierTest extends Specification {
3838
"Guava" | guavaSupplier
3939
}
4040

41-
def "Unreferenced map get cleaned up on #name"() {
41+
def "Unreferenced supplier gets cleaned up on #name"() {
42+
setup:
43+
// Note: we use 'double supplier' here because Groovy keeps reference to test data preventing it from being GCed
44+
def supplier = supplierSupplier()
45+
def ref = new WeakReference(supplier)
46+
47+
when:
48+
def supplierRef = new WeakReference(supplier)
49+
supplier = null
50+
TestUtils.awaitGC(supplierRef)
51+
52+
then:
53+
ref.get() == null
54+
55+
where:
56+
name | supplierSupplier
57+
"WeakConcurrent" | { -> new WeakMapSuppliers.WeakConcurrent() }
58+
"WeakInline" | { -> new WeakMapSuppliers.WeakConcurrent.Inline() }
59+
"Guava" | { -> new WeakMapSuppliers.Guava() }
60+
}
61+
62+
def "Unreferenced map gets cleaned up on #name"() {
4263
setup:
4364
WeakMap.Provider.provider.set(supplier)
4465
def map = WeakMap.Provider.newWeakMap()
4566
def ref = new WeakReference(map)
4667

4768
when:
48-
def mapRef = new WeakReference<>(map)
69+
def mapRef = new WeakReference(map)
4970
map = null
5071
TestUtils.awaitGC(mapRef)
5172

@@ -69,7 +90,7 @@ class WeakConcurrentSupplierTest extends Specification {
6990
map.size() == 1
7091

7192
when:
72-
def keyRef = new WeakReference<>(key)
93+
def keyRef = new WeakReference(key)
7394
key = null
7495
TestUtils.awaitGC(keyRef)
7596

0 commit comments

Comments
 (0)