2222import java .util .concurrent .atomic .AtomicReference ;
2323
2424import com .google .common .base .Function ;
25+ import com .google .common .base .Functions ;
2526import com .google .common .base .Throwables ;
2627import com .google .common .collect .ImmutableList ;
2728import com .google .common .collect .Lists ;
2829import com .google .common .util .concurrent .*;
2930import org .slf4j .Logger ;
3031import org .slf4j .LoggerFactory ;
3132
33+ import com .datastax .driver .core .Message .Response ;
3234import com .datastax .driver .core .exceptions .DriverInternalError ;
3335import com .datastax .driver .core .exceptions .InvalidQueryException ;
3436import com .datastax .driver .core .exceptions .UnsupportedFeatureException ;
@@ -179,8 +181,8 @@ public Session.State getState() {
179181 }
180182
181183 private ListenableFuture <PreparedStatement > toPreparedStatement (final String query , final Connection .Future future ) {
182- return Futures .transform (future , new Function < Message . Response , PreparedStatement >() {
183- public PreparedStatement apply (Message . Response response ) {
184+ return Futures .transform (future , new AsyncFunction < Response , PreparedStatement >() {
185+ public ListenableFuture < PreparedStatement > apply (Response response ) {
184186 switch (response .type ) {
185187 case RESULT :
186188 Responses .Result rm = (Responses .Result )response ;
@@ -189,29 +191,26 @@ public PreparedStatement apply(Message.Response response) {
189191 Responses .Result .Prepared pmsg = (Responses .Result .Prepared )rm ;
190192 PreparedStatement stmt = DefaultPreparedStatement .fromMessage (pmsg , cluster .getMetadata (), query , poolsState .keyspace );
191193 stmt = cluster .manager .addPrepared (stmt );
192- if (cluster .getConfiguration ().getQueryOptions ().isPrepareOnAllHosts ()){
193- try {
194- // All Sessions are connected to the same nodes so it's enough to prepare only the nodes of this session.
195- // If that changes, we'll have to make sure this propagate to other sessions too.
196- prepare (stmt .getQueryString (), future .getAddress ());
197- } catch (InterruptedException e ) {
198- Thread .currentThread ().interrupt ();
199- // This method doesn't propagate interruption, at least not for now. However, if we've
200- // interrupted preparing queries on other node it's not a problem as we'll re-prepare
201- // later if need be. So just ignore.
202- }
194+ if (cluster .getConfiguration ().getQueryOptions ().isPrepareOnAllHosts ()) {
195+ // All Sessions are connected to the same nodes so it's enough to prepare only the nodes of this session.
196+ // If that changes, we'll have to make sure this propagate to other sessions too.
197+ return prepare (stmt , future .getAddress ());
198+ } else {
199+ return Futures .immediateFuture (stmt );
203200 }
204- return stmt ;
205201 default :
206- throw new DriverInternalError (String .format ("%s response received when prepared statement was expected" , rm .kind ));
202+ return Futures .immediateFailedFuture (
203+ new DriverInternalError (String .format ("%s response received when prepared statement was expected" , rm .kind )));
207204 }
208205 case ERROR :
209- throw ((Responses .Error )response ).asException (future .getAddress ());
206+ return Futures .immediateFailedFuture (
207+ ((Responses .Error )response ).asException (future .getAddress ()));
210208 default :
211- throw new DriverInternalError (String .format ("%s response received when prepared statement was expected" , response .type ));
209+ return Futures .immediateFailedFuture (
210+ new DriverInternalError (String .format ("%s response received when prepared statement was expected" , response .type )));
212211 }
213212 }
214- }, executor ()); // Since the transformation involves querying other nodes, we should not do that in an I/O thread
213+ });
215214 }
216215
217216 Connection .Factory connectionFactory () {
@@ -581,35 +580,42 @@ public void run() {
581580 }, executor ());
582581 }
583582
584- private void prepare (String query , InetSocketAddress toExclude ) throws InterruptedException {
585- for (Map .Entry <Host , HostConnectionPool > entry : pools .entrySet ()) {
583+ private ListenableFuture <PreparedStatement > prepare (final PreparedStatement statement , InetSocketAddress toExclude ) {
584+ final String query = statement .getQueryString ();
585+ List <ListenableFuture <Response >> futures = Lists .newArrayListWithExpectedSize (pools .size ());
586+ for (final Map .Entry <Host , HostConnectionPool > entry : pools .entrySet ()) {
586587 if (entry .getKey ().getSocketAddress ().equals (toExclude ))
587588 continue ;
588589
589- // Let's not wait too long if we can't get a connection. Things
590- // will fix themselves once the user tries a query anyway.
591- Connection c = null ;
592- boolean timedOut = false ;
593590 try {
594- c = entry .getValue ().borrowConnection (200 , TimeUnit .MILLISECONDS );
595- c .write (new Requests .Prepare (query )).get ();
596- } catch (ConnectionException e ) {
591+ // Preparing is not critical: if it fails, it will fix itself later when the user tries to execute
592+ // the prepared query. So don't block if no connection is available, simply abort.
593+ final Connection c = entry .getValue ().borrowConnection (0 , TimeUnit .MILLISECONDS );
594+ ListenableFuture <Response > future = c .write (new Requests .Prepare (query ));
595+ Futures .addCallback (future , new FutureCallback <Response >() {
596+ @ Override
597+ public void onSuccess (Response result ) {
598+ c .release ();
599+ }
600+
601+ @ Override
602+ public void onFailure (Throwable t ) {
603+ logger .debug (String .format ("Unexpected error while preparing query (%s) on %s" , query , entry .getKey ()), t );
604+
605+ // If the query timed out, that already released the connection, otherwise do it now
606+ if (!(t instanceof OperationTimedOutException ))
607+ c .release ();
608+ }
609+ });
610+ futures .add (future );
611+ } catch (Exception e ) {
597612 // Again, not being able to prepare the query right now is no big deal, so just ignore
598- } catch (BusyConnectionException e ) {
599- // Same as above
600- } catch (TimeoutException e ) {
601- // Same as above
602- } catch (ExecutionException e ) {
603- // We shouldn't really get exception while preparing a
604- // query, so log this (but ignore otherwise as it's not a big deal)
605- logger .error (String .format ("Unexpected error while preparing query (%s) on %s" , query , entry .getKey ()), e );
606- // If the query timed out, that already released the connection
607- timedOut = e .getCause () instanceof OperationTimedOutException ;
608- } finally {
609- if (c != null && !timedOut )
610- c .release ();
611613 }
612614 }
615+ // Return the statement when all futures are done
616+ return Futures .transform (
617+ Futures .successfulAsList (futures ),
618+ Functions .constant (statement ));
613619 }
614620
615621 ResultSetFuture executeQuery (Message .Request msg , Statement statement ) {
0 commit comments