Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Support for Objectify-context bound completeAllPendingFutures() calls…
… (see #326)
  • Loading branch information
nbali committed Aug 14, 2018
commit 0fe1d2ba019ea25d8430df9d53928c50da182907
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void close() {

ofy.flush();

PendingFutures.completeAllPendingFutures();
PendingFutures.completeAllPendingFutures(ofy);

stack.removeLast();
}
Expand Down
62 changes: 47 additions & 15 deletions src/main/java/com/googlecode/objectify/cache/PendingFutures.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.googlecode.objectify.Objectify;
import com.googlecode.objectify.ObjectifyService;
import com.googlecode.objectify.util.FutureHelper;

import lombok.NonNull;
import lombok.extern.java.Log;

/**
* <p>This class maintains a thread local list of all the outstanding Future<?> objects
Expand All @@ -13,19 +21,17 @@
*
* @author Jeff Schnitzer <jeff@infohazard.org>
*/
@Log
public class PendingFutures
{
/** */
private static final Logger log = Logger.getLogger(PendingFutures.class.getName());

/**
* We use ConcurrentHashMap not for concurrency but because it doesn't throw
* ConcurrentModificationException. We need to be able to iterate while Futures remove
* themselves from the set. A Set is just a Map of key to key.
* ConcurrentModificationException. We need to be able to iterate while Futures remove
* themselves from the Map.
*/
private static ThreadLocal<Map<Future<?>, Future<?>>> pending = new ThreadLocal<Map<Future<?>, Future<?>>>() {
private static ThreadLocal<Map<Future<?>, Optional<Objectify>>> pending = new ThreadLocal<Map<Future<?>, Optional<Objectify>>>() {
@Override
protected Map<Future<?>, Future<?>> initialValue() {
protected Map<Future<?>, Optional<Objectify>> initialValue() {
return new ConcurrentHashMap<>(64, 0.75f, 1);
}
};
Expand All @@ -35,7 +41,16 @@ protected Map<Future<?>, Future<?>> initialValue() {
* @param future must have at least one callback
*/
public static void addPending(Future<?> future) {
pending.get().put(future, future);
pending.get().put(future, ofyOptional());
}

// it is possible that CachingAsyncDatastoreService gets used - which utilizes this functionality - without any Objectify context
private static Optional<Objectify> ofyOptional() {
try {
return Optional.of(ObjectifyService.ofy());
} catch (@SuppressWarnings("unused") RuntimeException e) {
return Optional.absent();
}
}

/**
Expand All @@ -52,13 +67,30 @@ public static void removePending(Future<?> future) {
*/
public static void completeAllPendingFutures() {
// This will cause done Futures to fire callbacks and remove themselves
for (Future<?> fut: pending.get().keySet()) {
try {
fut.get();
}
catch (Exception e) {
log.log(Level.SEVERE, "Error cleaning up pending Future: " + fut, e);
for (Future<?> fut : pending.get().keySet()) {
quietGet(fut);
}
}

/**
* Iterate through all pending futures that belong to the provided Objectify instance and get() them, forcing any callbacks to be called.
* This is used only by the close() method called on the Objectify instances created by ObjectifyService.begin().
*/
public static void completeAllPendingFutures(@NonNull Objectify ofy) {
// This will cause done Futures to fire callbacks and remove themselves
for (Map.Entry<Future<?>, Optional<Objectify>> entry : pending.get().entrySet()) {
if (Objects.equal(entry.getValue().orNull(), ofy)) {
quietGet(entry.getKey());
}
}
}

private static void quietGet(Future<?> fut) {
try {
FutureHelper.quietGet(fut);
} catch (RuntimeException e) {
log.log(Level.SEVERE, "Error cleaning up pending Future: " + fut, e);
}
}

}
4 changes: 4 additions & 0 deletions src/main/java/com/googlecode/objectify/util/FutureHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ else if (ex instanceof Error)
throw (Error)ex;
else if (ex instanceof ExecutionException)
unwrapAndThrow(ex.getCause());
else if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
throw new UndeclaredThrowableException(ex);
}
else
throw new UndeclaredThrowableException(ex);
}
Expand Down