Skip to content

Commit a95ae26

Browse files
committed
Added snapshot view of in-flight refresh operations
The ongoing refresh operations can now be observed by using Policy.refreshes(). This does not provide a live view due to weak keys needing to be unwrapped from reference key (and possibly discarded if null). Instead an unmodifiable copy is provided for inspection. The futures may be canceled, completed, or waited upon.
1 parent 0d23987 commit a95ae26

7 files changed

Lines changed: 100 additions & 3 deletions

File tree

caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.Collection;
4343
import java.util.Collections;
4444
import java.util.Comparator;
45+
import java.util.IdentityHashMap;
4546
import java.util.Iterator;
4647
import java.util.LinkedHashMap;
4748
import java.util.List;
@@ -1508,8 +1509,7 @@ public void cleanUp() {
15081509
}
15091510

15101511
/**
1511-
* Performs the maintenance work, blocking until the lock is acquired. Any exception thrown, such
1512-
* as by {@link CacheWriter#delete}, is propagated to the caller.
1512+
* Performs the maintenance work, blocking until the lock is acquired.
15131513
*
15141514
* @param task an additional pending task to run, or {@code null} if not present
15151515
*/
@@ -3485,6 +3485,27 @@ static final class BoundedPolicy<K, V> implements Policy<K, V> {
34853485
}
34863486
return transformer.apply(node.getValue());
34873487
}
3488+
@Override public Map<K, CompletableFuture<V>> refreshes() {
3489+
var refreshes = cache.refreshes;
3490+
if ((refreshes == null) || refreshes.isEmpty()) {
3491+
return Map.of();
3492+
} if (cache.collectKeys()) {
3493+
var inFlight = new IdentityHashMap<K, CompletableFuture<V>>(refreshes.size());
3494+
for (var entry : refreshes.entrySet()) {
3495+
@SuppressWarnings("unchecked")
3496+
var key = ((InternalReference<K>) entry.getKey()).get();
3497+
@SuppressWarnings("unchecked")
3498+
var future = (CompletableFuture<V>) entry.getValue();
3499+
if (key != null) {
3500+
inFlight.put(key, future);
3501+
}
3502+
}
3503+
return Collections.unmodifiableMap(inFlight);
3504+
}
3505+
@SuppressWarnings("unchecked")
3506+
var castedRefreshes = (Map<K, CompletableFuture<V>>) (Object) refreshes;
3507+
return Map.copyOf(castedRefreshes);
3508+
}
34883509
@Override public Optional<Eviction<K, V>> eviction() {
34893510
return cache.evicts()
34903511
? (eviction == null) ? (eviction = Optional.of(new BoundedEviction())) : eviction

caffeine/src/main/java/com/github/benmanes/caffeine/cache/Policy.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Optional;
2121
import java.util.OptionalInt;
2222
import java.util.OptionalLong;
23+
import java.util.concurrent.CompletableFuture;
2324
import java.util.concurrent.TimeUnit;
2425

2526
import org.checkerframework.checker.index.qual.NonNegative;
@@ -56,6 +57,13 @@ public interface Policy<K extends @NonNull Object, V extends @NonNull Object> {
5657
@Nullable
5758
V getIfPresentQuietly(K key);
5859

60+
/**
61+
* Returns an unmodifiable snapshot {@link Map} view of the in-flight refresh operations.
62+
*
63+
* @return a snapshot view of the in-flight refresh operations
64+
*/
65+
Map<K, CompletableFuture<V>> refreshes();
66+
5967
/**
6068
* Returns access to perform operations based on the maximum size or maximum weight eviction
6169
* policy. If the cache was not constructed with a size-based bound or the implementation does

caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -921,6 +921,15 @@ static final class UnboundedPolicy<K, V> implements Policy<K, V> {
921921
@Override public V getIfPresentQuietly(Object key) {
922922
return transformer.apply(cache.data.get(key));
923923
}
924+
@Override public Map<K, CompletableFuture<V>> refreshes() {
925+
var refreshes = cache.refreshes;
926+
if (refreshes == null) {
927+
return Map.of();
928+
}
929+
@SuppressWarnings("unchecked")
930+
var castedRefreshes = (Map<K, CompletableFuture<V>>) (Object) refreshes;
931+
return Map.copyOf(castedRefreshes);
932+
}
924933
@Override public Optional<Eviction<K, V>> eviction() {
925934
return Optional.empty();
926935
}

caffeine/src/test/java/com/github/benmanes/caffeine/cache/CacheTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static java.util.stream.Collectors.toMap;
2323
import static org.hamcrest.MatcherAssert.assertThat;
2424
import static org.hamcrest.Matchers.aMapWithSize;
25+
import static org.hamcrest.Matchers.anEmptyMap;
2526
import static org.hamcrest.Matchers.equalTo;
2627
import static org.hamcrest.Matchers.hasItem;
2728
import static org.hamcrest.Matchers.is;
@@ -727,4 +728,20 @@ public void getIfPresentQuietly_present(Cache<Integer, Integer> cache, CacheCont
727728
assertThat(cache.policy().getIfPresentQuietly(context.middleKey()), is(not(nullValue())));
728729
assertThat(cache.policy().getIfPresentQuietly(context.lastKey()), is(not(nullValue())));
729730
}
731+
732+
/* --------------- Policy: refreshes --------------- */
733+
734+
@CacheSpec
735+
@CheckNoStats
736+
@Test(dataProvider = "caches")
737+
public void refreshes_empty(Cache<Integer, Integer> cache, CacheContext context) {
738+
assertThat(cache.policy().refreshes(), is(anEmptyMap()));
739+
}
740+
741+
@CacheSpec
742+
@CheckNoStats
743+
@Test(dataProvider = "caches", expectedExceptions = UnsupportedOperationException.class)
744+
public void refreshes_unmodifiable(Cache<Integer, Integer> cache, CacheContext context) {
745+
cache.policy().refreshes().clear();
746+
}
730747
}

caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static java.util.stream.Collectors.toMap;
2424
import static org.hamcrest.MatcherAssert.assertThat;
2525
import static org.hamcrest.Matchers.aMapWithSize;
26+
import static org.hamcrest.Matchers.anEmptyMap;
2627
import static org.hamcrest.Matchers.containsInAnyOrder;
2728
import static org.hamcrest.Matchers.either;
2829
import static org.hamcrest.Matchers.empty;
@@ -811,4 +812,22 @@ public void bulk_present() throws Exception {
811812
assertThat(loader.loadAll(Set.of(1, 2)), is(Map.of(1, 1, 2, 2)));
812813
assertThat(loader.load(1), is(1));
813814
}
815+
816+
/* --------------- Policy: refreshes --------------- */
817+
818+
@Test(dataProvider = "caches")
819+
@CacheSpec(loader = Loader.ASYNC_INCOMPLETE, implementation = Implementation.Caffeine)
820+
public void refreshes(LoadingCache<Integer, Integer> cache, CacheContext context) {
821+
var key1 = Iterables.get(context.absentKeys(), 0);
822+
var key2 = context.original().isEmpty()
823+
? Iterables.get(context.absentKeys(), 1)
824+
: context.firstKey();
825+
var future1 = cache.refresh(key1);
826+
var future2 = cache.refresh(key2);
827+
assertThat(cache.policy().refreshes(), is(equalTo(Map.of(key1, future1, key2, future2))));
828+
829+
future1.complete(1);
830+
future2.cancel(true);
831+
assertThat(cache.policy().refreshes(), is(anEmptyMap()));
832+
}
814833
}

caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import static com.github.benmanes.caffeine.testing.IsFutureValue.futureOf;
2222
import static java.util.stream.Collectors.toList;
2323
import static org.hamcrest.MatcherAssert.assertThat;
24+
import static org.hamcrest.Matchers.aMapWithSize;
25+
import static org.hamcrest.Matchers.anEmptyMap;
2426
import static org.hamcrest.Matchers.containsInAnyOrder;
2527
import static org.hamcrest.Matchers.is;
2628
import static org.hamcrest.Matchers.not;
@@ -373,7 +375,25 @@ public void refresh(LoadingCache<Integer, Integer> cache, CacheContext context)
373375
future2.cancel(true);
374376
}
375377

376-
/* --------------- Policy --------------- */
378+
/* --------------- Policy: refreshes --------------- */
379+
380+
@Test(dataProvider = "caches")
381+
@CacheSpec(loader = Loader.ASYNC_INCOMPLETE, implementation = Implementation.Caffeine,
382+
refreshAfterWrite = Expire.ONE_MINUTE, population = Population.FULL)
383+
public void refreshes(LoadingCache<Integer, Integer> cache, CacheContext context) {
384+
context.ticker().advance(2, TimeUnit.MINUTES);
385+
cache.getIfPresent(context.firstKey());
386+
assertThat(cache.policy().refreshes(), is(aMapWithSize(1)));
387+
388+
var future = cache.policy().refreshes().get(context.firstKey());
389+
assertThat(future, is(not(nullValue())));
390+
391+
future.complete(Integer.MAX_VALUE);
392+
assertThat(cache.policy().refreshes(), is(anEmptyMap()));
393+
assertThat(cache.getIfPresent(context.firstKey()), is(Integer.MAX_VALUE));
394+
}
395+
396+
/* --------------- Policy: refreshAfterWrite --------------- */
377397

378398
@Test(dataProvider = "caches")
379399
@CacheSpec(implementation = Implementation.Caffeine, refreshAfterWrite = Expire.ONE_MINUTE)

caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,9 @@ public Policy<K, V> policy() {
436436
@Override public V getIfPresentQuietly(K key) {
437437
return cache.asMap().get(key);
438438
}
439+
@Override public Map<K, CompletableFuture<V>> refreshes() {
440+
return Map.of();
441+
}
439442
@Override public Optional<Eviction<K, V>> eviction() {
440443
return Optional.empty();
441444
}

0 commit comments

Comments
 (0)