Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
language: java
jdk:
- oraclejdk8
- oraclejdk7

cache:
directories:
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/com/googlecode/objectify/ObjectifyService.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,15 @@ public void close() {

ofy.flush();

PendingFutures.completeAllPendingFutures();
if (stack.size() > 1) {
// we have multiple Objectify instances in the stack
// so wait only for the pending futures associated with the currently closed one
PendingFutures.completeAllPendingFutures(ofy);
} else {
// this is the very last Objectify instance present
// we should wait for every pending future just to be sure
PendingFutures.completeAllPendingFutures();
}

stack.removeLast();
}
Expand Down
62 changes: 47 additions & 15 deletions src/main/java/com/googlecode/objectify/cache/PendingFutures.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.googlecode.objectify.Objectify;
import com.googlecode.objectify.ObjectifyService;
import com.googlecode.objectify.util.FutureHelper;

import lombok.NonNull;
import lombok.extern.java.Log;

/**
* <p>This class maintains a thread local list of all the outstanding Future<?> objects
Expand All @@ -13,19 +21,17 @@
*
* @author Jeff Schnitzer <jeff@infohazard.org>
*/
@Log
public class PendingFutures
{
/** */
private static final Logger log = Logger.getLogger(PendingFutures.class.getName());

/**
* We use ConcurrentHashMap not for concurrency but because it doesn't throw
* ConcurrentModificationException. We need to be able to iterate while Futures remove
* themselves from the set. A Set is just a Map of key to key.
* ConcurrentModificationException. We need to be able to iterate while Futures remove
* themselves from the Map.
*/
private static ThreadLocal<Map<Future<?>, Future<?>>> pending = new ThreadLocal<Map<Future<?>, Future<?>>>() {
private static ThreadLocal<Map<Future<?>, Optional<Objectify>>> pending = new ThreadLocal<Map<Future<?>, Optional<Objectify>>>() {
@Override
protected Map<Future<?>, Future<?>> initialValue() {
protected Map<Future<?>, Optional<Objectify>> initialValue() {
return new ConcurrentHashMap<>(64, 0.75f, 1);
}
};
Expand All @@ -35,7 +41,16 @@ protected Map<Future<?>, Future<?>> initialValue() {
* @param future must have at least one callback
*/
public static void addPending(Future<?> future) {
pending.get().put(future, future);
pending.get().put(future, ofyOptional());
}

// it is possible that CachingAsyncDatastoreService gets used - which utilizes this functionality - without any Objectify context
private static Optional<Objectify> ofyOptional() {
try {
return Optional.of(ObjectifyService.ofy());
} catch (@SuppressWarnings("unused") RuntimeException e) {
return Optional.absent();
}
}

/**
Expand All @@ -52,13 +67,30 @@ public static void removePending(Future<?> future) {
*/
public static void completeAllPendingFutures() {
// This will cause done Futures to fire callbacks and remove themselves
for (Future<?> fut: pending.get().keySet()) {
try {
fut.get();
}
catch (Exception e) {
log.log(Level.SEVERE, "Error cleaning up pending Future: " + fut, e);
for (Future<?> fut : pending.get().keySet()) {
quietGet(fut);
}
}

/**
* Iterate through all pending futures that belong to the provided Objectify instance and get() them, forcing any callbacks to be called.
* This is used only by the close() method called on the Objectify instances created by ObjectifyService.begin().
*/
public static void completeAllPendingFutures(@NonNull Objectify ofy) {
// This will cause done Futures to fire callbacks and remove themselves
for (Map.Entry<Future<?>, Optional<Objectify>> entry : pending.get().entrySet()) {
if (Objects.equal(entry.getValue().orNull(), ofy)) {
quietGet(entry.getKey());
}
}
}

private static void quietGet(Future<?> fut) {
try {
FutureHelper.quietGet(fut);
} catch (RuntimeException e) {
log.log(Level.SEVERE, "Error cleaning up pending Future: " + fut, e);
}
}

}
4 changes: 4 additions & 0 deletions src/main/java/com/googlecode/objectify/util/FutureHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ else if (ex instanceof Error)
throw (Error)ex;
else if (ex instanceof ExecutionException)
unwrapAndThrow(ex.getCause());
else if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
throw new UndeclaredThrowableException(ex);
}
else
throw new UndeclaredThrowableException(ex);
}
Expand Down