Skip to content

Commit 7fbb571

Browse files
authored
Merge pull request #493 from eddavisson/prev-txn
Specify previous_transaction when retrying a txn
2 parents f1cfd87 + e852d8b commit 7fbb571

5 files changed

Lines changed: 38 additions & 6 deletions

File tree

src/main/java/com/googlecode/objectify/cache/CachingAsyncDatastore.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.google.cloud.datastore.Entity;
44
import com.google.cloud.datastore.Key;
55
import com.google.cloud.datastore.ReadOption;
6+
import com.google.protobuf.ByteString;
67
import com.googlecode.objectify.cache.EntityMemcache.Bucket;
78
import com.googlecode.objectify.impl.AsyncDatastore;
89
import com.googlecode.objectify.impl.AsyncTransaction;
@@ -62,6 +63,11 @@ public AsyncTransaction newTransaction(final Runnable afterCommit) {
6263
return new CachingAsyncTransaction(raw.newTransaction(afterCommit), memcache);
6364
}
6465

66+
@Override
67+
public AsyncTransaction newTransaction(final Runnable afterCommit, ByteString prevTxnHandle) {
68+
return new CachingAsyncTransaction(raw.newTransaction(afterCommit, prevTxnHandle), memcache);
69+
}
70+
6571
@Override
6672
public Future<Map<Key, Entity>> get(final Collection<Key> keys, final ReadOption... options) {
6773
final Map<Key, Bucket> soFar = this.memcache.getAll(keys);

src/main/java/com/googlecode/objectify/impl/AsyncDatastore.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.googlecode.objectify.impl;
22

3+
import com.google.protobuf.ByteString;
4+
35
/**
46
* The new datastore SDK has a neat structure of interfaces and implementations (transaction, datastorereader, etc)
57
* but doesn't currently support async operations. We need to shim in a Future-based API so that we can seamlessly
@@ -10,4 +12,8 @@ public interface AsyncDatastore extends AsyncDatastoreReaderWriter {
1012
/**
1113
*/
1214
AsyncTransaction newTransaction(Runnable afterCommit);
15+
16+
/**
17+
*/
18+
AsyncTransaction newTransaction(Runnable afterCommit, ByteString prevTxnHandle);
1319
}

src/main/java/com/googlecode/objectify/impl/AsyncDatastoreImpl.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.googlecode.objectify.impl;
22

33
import com.google.cloud.datastore.Datastore;
4+
import com.google.datastore.v1.TransactionOptions;
5+
import com.google.protobuf.ByteString;
46

57
/** */
68
public class AsyncDatastoreImpl extends AsyncDatastoreReaderWriterImpl implements AsyncDatastore {
@@ -15,6 +17,15 @@ public AsyncDatastoreImpl(final Datastore raw) {
1517

1618
@Override
1719
public AsyncTransaction newTransaction(final Runnable afterCommit) {
18-
return new AsyncTransactionImpl(datastore.newTransaction(), afterCommit);
20+
return newTransaction(afterCommit, null);
21+
}
22+
23+
@Override
24+
public AsyncTransaction newTransaction(final Runnable afterCommit, final ByteString prevTxnHandle) {
25+
TransactionOptions.Builder txnOptions = TransactionOptions.newBuilder();
26+
if (prevTxnHandle != null) {
27+
txnOptions.getReadWriteBuilder().setPreviousTransaction(prevTxnHandle);
28+
}
29+
return new AsyncTransactionImpl(datastore.newTransaction(txnOptions.build()), afterCommit);
1930
}
2031
}

src/main/java/com/googlecode/objectify/impl/TransactorNo.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@
22

33
import com.google.cloud.datastore.DatastoreException;
44
import com.google.common.base.Preconditions;
5+
import com.google.protobuf.ByteString;
56
import com.google.rpc.Code;
67
import com.googlecode.objectify.ObjectifyFactory;
78
import com.googlecode.objectify.TxnType;
89
import com.googlecode.objectify.Work;
10+
11+
import java.util.concurrent.atomic.AtomicReference;
12+
913
import lombok.extern.slf4j.Slf4j;
1014

1115
/**
@@ -90,9 +94,10 @@ public <R> R transactNew(final ObjectifyImpl parent, int limitTries, final Work<
9094
Preconditions.checkArgument(limitTries >= 1);
9195
final int ORIGINAL_TRIES = limitTries;
9296

97+
AtomicReference<ByteString> prevTxnHandle = new AtomicReference<>();
9398
while (true) {
9499
try {
95-
return transactOnce(parent, work);
100+
return transactOnce(parent, work, prevTxnHandle);
96101
} catch (DatastoreException ex) {
97102

98103
// This doesn't work because the SDK considers all transactions to be non-retryable. Objectify has always
@@ -131,8 +136,10 @@ public AsyncDatastoreReaderWriter asyncDatastore(final ObjectifyImpl ofy) {
131136
/**
132137
* One attempt at executing a transaction
133138
*/
134-
private <R> R transactOnce(final ObjectifyImpl parent, final Work<R> work) {
135-
final ObjectifyImpl txnOfy = parent.factory().open(parent.getOptions(), new TransactorYes(parent.factory(), parent.getOptions().isCache(), this));
139+
private <R> R transactOnce(final ObjectifyImpl parent, final Work<R> work, final AtomicReference<ByteString> prevTxnHandle) {
140+
final ObjectifyImpl txnOfy = parent.factory().open(parent.getOptions(), new TransactorYes(parent.factory(), parent.getOptions().isCache(), this,
141+
prevTxnHandle.get()));
142+
prevTxnHandle.set(txnOfy.getTransaction().getTransactionHandle());
136143

137144
boolean committedSuccessfully = false;
138145
try {

src/main/java/com/googlecode/objectify/impl/TransactorYes.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.googlecode.objectify.impl;
22

3+
import com.google.protobuf.ByteString;
34
import com.googlecode.objectify.ObjectifyFactory;
45
import com.googlecode.objectify.TxnType;
56
import com.googlecode.objectify.Work;
@@ -20,10 +21,11 @@ class TransactorYes extends Transactor
2021

2122
/**
2223
*/
23-
TransactorYes(final ObjectifyFactory factory, final boolean cache, final TransactorNo parentTransactor) {
24+
TransactorYes(final ObjectifyFactory factory, final boolean cache, final TransactorNo parentTransactor,
25+
ByteString prevTxnHandle) {
2426
super(factory);
2527

26-
this.transaction = factory.asyncDatastore(cache).newTransaction(this::committed);
28+
this.transaction = factory.asyncDatastore(cache).newTransaction(this::committed, prevTxnHandle);
2729
this.parentTransactor = parentTransactor;
2830
}
2931

0 commit comments

Comments
 (0)