Skip to content

Commit 17fb2f5

Browse files
Alexandre Dutraolim7t
authored andcommitted
Use parallel calls when re-preparing statement on other hosts (JAVA-725).
1 parent 7b50976 commit 17fb2f5

2 files changed

Lines changed: 48 additions & 40 deletions

File tree

changelog/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
- [improvement] Allow hosts to remain partially up (JAVA-544)
4242
- [improvement] Remove internal blocking calls and expose async session
4343
creation (JAVA-821, JAVA-822)
44+
- [improvement] Use parallel calls when re-preparing statement on other
45+
hosts (JAVA-725)
4446

4547
Merged from 2.0.10_fixes branch:
4648

driver-core/src/main/java/com/datastax/driver/core/SessionManager.java

Lines changed: 46 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
import java.util.concurrent.atomic.AtomicReference;
2323

2424
import com.google.common.base.Function;
25+
import com.google.common.base.Functions;
2526
import com.google.common.base.Throwables;
2627
import com.google.common.collect.ImmutableList;
2728
import com.google.common.collect.Lists;
2829
import com.google.common.util.concurrent.*;
2930
import org.slf4j.Logger;
3031
import org.slf4j.LoggerFactory;
3132

33+
import com.datastax.driver.core.Message.Response;
3234
import com.datastax.driver.core.exceptions.DriverInternalError;
3335
import com.datastax.driver.core.exceptions.InvalidQueryException;
3436
import com.datastax.driver.core.exceptions.UnsupportedFeatureException;
@@ -179,8 +181,8 @@ public Session.State getState() {
179181
}
180182

181183
private ListenableFuture<PreparedStatement> toPreparedStatement(final String query, final Connection.Future future) {
182-
return Futures.transform(future, new Function<Message.Response, PreparedStatement>() {
183-
public PreparedStatement apply(Message.Response response) {
184+
return Futures.transform(future, new AsyncFunction<Response, PreparedStatement>() {
185+
public ListenableFuture<PreparedStatement> apply(Response response) {
184186
switch (response.type) {
185187
case RESULT:
186188
Responses.Result rm = (Responses.Result)response;
@@ -189,29 +191,26 @@ public PreparedStatement apply(Message.Response response) {
189191
Responses.Result.Prepared pmsg = (Responses.Result.Prepared)rm;
190192
PreparedStatement stmt = DefaultPreparedStatement.fromMessage(pmsg, cluster.getMetadata(), query, poolsState.keyspace);
191193
stmt = cluster.manager.addPrepared(stmt);
192-
if (cluster.getConfiguration().getQueryOptions().isPrepareOnAllHosts()){
193-
try {
194-
// All Sessions are connected to the same nodes so it's enough to prepare only the nodes of this session.
195-
// If that changes, we'll have to make sure this propagate to other sessions too.
196-
prepare(stmt.getQueryString(), future.getAddress());
197-
} catch (InterruptedException e) {
198-
Thread.currentThread().interrupt();
199-
// This method doesn't propagate interruption, at least not for now. However, if we've
200-
// interrupted preparing queries on other node it's not a problem as we'll re-prepare
201-
// later if need be. So just ignore.
202-
}
194+
if (cluster.getConfiguration().getQueryOptions().isPrepareOnAllHosts()) {
195+
// All Sessions are connected to the same nodes so it's enough to prepare only the nodes of this session.
196+
// If that changes, we'll have to make sure this propagate to other sessions too.
197+
return prepare(stmt, future.getAddress());
198+
} else {
199+
return Futures.immediateFuture(stmt);
203200
}
204-
return stmt;
205201
default:
206-
throw new DriverInternalError(String.format("%s response received when prepared statement was expected", rm.kind));
202+
return Futures.immediateFailedFuture(
203+
new DriverInternalError(String.format("%s response received when prepared statement was expected", rm.kind)));
207204
}
208205
case ERROR:
209-
throw ((Responses.Error)response).asException(future.getAddress());
206+
return Futures.immediateFailedFuture(
207+
((Responses.Error)response).asException(future.getAddress()));
210208
default:
211-
throw new DriverInternalError(String.format("%s response received when prepared statement was expected", response.type));
209+
return Futures.immediateFailedFuture(
210+
new DriverInternalError(String.format("%s response received when prepared statement was expected", response.type)));
212211
}
213212
}
214-
}, executor()); // Since the transformation involves querying other nodes, we should not do that in an I/O thread
213+
});
215214
}
216215

217216
Connection.Factory connectionFactory() {
@@ -581,35 +580,42 @@ public void run() {
581580
}, executor());
582581
}
583582

584-
private void prepare(String query, InetSocketAddress toExclude) throws InterruptedException {
585-
for (Map.Entry<Host, HostConnectionPool> entry : pools.entrySet()) {
583+
private ListenableFuture<PreparedStatement> prepare(final PreparedStatement statement, InetSocketAddress toExclude) {
584+
final String query = statement.getQueryString();
585+
List<ListenableFuture<Response>> futures = Lists.newArrayListWithExpectedSize(pools.size());
586+
for (final Map.Entry<Host, HostConnectionPool> entry : pools.entrySet()) {
586587
if (entry.getKey().getSocketAddress().equals(toExclude))
587588
continue;
588589

589-
// Let's not wait too long if we can't get a connection. Things
590-
// will fix themselves once the user tries a query anyway.
591-
Connection c = null;
592-
boolean timedOut = false;
593590
try {
594-
c = entry.getValue().borrowConnection(200, TimeUnit.MILLISECONDS);
595-
c.write(new Requests.Prepare(query)).get();
596-
} catch (ConnectionException e) {
591+
// Preparing is not critical: if it fails, it will fix itself later when the user tries to execute
592+
// the prepared query. So don't block if no connection is available, simply abort.
593+
final Connection c = entry.getValue().borrowConnection(0, TimeUnit.MILLISECONDS);
594+
ListenableFuture<Response> future = c.write(new Requests.Prepare(query));
595+
Futures.addCallback(future, new FutureCallback<Response>() {
596+
@Override
597+
public void onSuccess(Response result) {
598+
c.release();
599+
}
600+
601+
@Override
602+
public void onFailure(Throwable t) {
603+
logger.debug(String.format("Unexpected error while preparing query (%s) on %s", query, entry.getKey()), t);
604+
605+
// If the query timed out, that already released the connection, otherwise do it now
606+
if (!(t instanceof OperationTimedOutException))
607+
c.release();
608+
}
609+
});
610+
futures.add(future);
611+
} catch (Exception e) {
597612
// Again, not being able to prepare the query right now is no big deal, so just ignore
598-
} catch (BusyConnectionException e) {
599-
// Same as above
600-
} catch (TimeoutException e) {
601-
// Same as above
602-
} catch (ExecutionException e) {
603-
// We shouldn't really get exception while preparing a
604-
// query, so log this (but ignore otherwise as it's not a big deal)
605-
logger.error(String.format("Unexpected error while preparing query (%s) on %s", query, entry.getKey()), e);
606-
// If the query timed out, that already released the connection
607-
timedOut = e.getCause() instanceof OperationTimedOutException;
608-
} finally {
609-
if (c != null && !timedOut)
610-
c.release();
611613
}
612614
}
615+
// Return the statement when all futures are done
616+
return Futures.transform(
617+
Futures.successfulAsList(futures),
618+
Functions.constant(statement));
613619
}
614620

615621
ResultSetFuture executeQuery(Message.Request msg, Statement statement) {

0 commit comments

Comments
 (0)