Skip to content

Commit 6ee2eb8

Browse files
committed
Start session implementation with pool management
1 parent afca2b6 commit 6ee2eb8

14 files changed

Lines changed: 1469 additions & 20 deletions

File tree

core/console.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
* Use Ctrl+C instead.
1212
*/
1313
import com.datastax.oss.driver.api.core._
14+
import com.datastax.oss.driver.internal.core.metadata.TopologyEvent
15+
import com.datastax.oss.driver.internal.core.context.InternalDriverContext
1416
import java.net.InetSocketAddress
1517
import scala.collection.JavaConversions._
1618

@@ -22,3 +24,12 @@ val address5 = new InetSocketAddress("127.0.0.5", 9042)
2224
val address6 = new InetSocketAddress("127.0.0.6", 9042)
2325

2426
val builder = Cluster.builder().withContactPoints(Set(address1))
27+
28+
println("********************************************")
29+
println("* To start a driver instance, run: *")
30+
println("* implicit val cluster = builder.build *")
31+
println("********************************************")
32+
33+
def fire(event: AnyRef)(implicit cluster: Cluster): Unit = {
34+
cluster.getContext.asInstanceOf[InternalDriverContext].eventBus().fire(event)
35+
}

core/src/main/java/com/datastax/oss/driver/api/core/Cluster.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@
1616
package com.datastax.oss.driver.api.core;
1717

1818
import com.datastax.oss.driver.api.core.context.DriverContext;
19+
import com.datastax.oss.driver.api.core.cql.CqlSession;
1920
import com.datastax.oss.driver.api.core.metadata.Metadata;
2021
import com.datastax.oss.driver.api.core.metadata.Node;
22+
import com.datastax.oss.driver.api.core.session.Session;
23+
import com.datastax.oss.driver.internal.core.util.concurrent.BlockingOperation;
24+
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
2125
import java.util.concurrent.CompletionStage;
2226

2327
/** An instance of the driver, that connects to a Cassandra cluster. */
@@ -47,4 +51,35 @@ static ClusterBuilder builder() {
4751

4852
/** Returns a context that provides access to all the policies used by this driver instance. */
4953
DriverContext getContext();
54+
55+
/** Creates a new session to execute requests against a given keyspace. */
56+
CompletionStage<CqlSession> connectAsync(CqlIdentifier keyspace);
57+
58+
/**
59+
* Creates a new session not tied to any keyspace.
60+
*
61+
* <p>This is equivalent to {@code this.connectAsync(null)}.
62+
*/
63+
default CompletionStage<CqlSession> connectAsync() {
64+
return connectAsync(null);
65+
}
66+
67+
/**
68+
* Convenience method to call {@link #connectAsync(CqlIdentifier)} and block for the result.
69+
*
70+
* <p>This must not be called on a driver thread.
71+
*/
72+
default CqlSession connect(CqlIdentifier keyspace) {
73+
BlockingOperation.checkNotDriverThread();
74+
return CompletableFutures.getUninterruptibly(connectAsync(keyspace));
75+
}
76+
77+
/**
78+
* Convenience method to call {@link #connectAsync()} and block for the result.
79+
*
80+
* <p>This must not be called on a driver thread.
81+
*/
82+
default CqlSession connect() {
83+
return connect(null);
84+
}
5085
}

core/src/main/java/com/datastax/oss/driver/api/core/ClusterBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public CompletionStage<Cluster> buildAsync() {
121121
/** Convenience method to call {@link #buildAsync()} and block on the result. */
122122
public Cluster build() {
123123
BlockingOperation.checkNotDriverThread();
124-
return CompletableFutures.getUninterruptibly(buildAsync().toCompletableFuture());
124+
return CompletableFutures.getUninterruptibly(buildAsync());
125125
}
126126

127127
private static <T> T buildIfNull(T value, Supplier<T> builder) {

core/src/main/java/com/datastax/oss/driver/api/core/session/Session.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.datastax.oss.driver.api.core.session;
1717

18+
import com.datastax.oss.driver.api.core.AsyncAutoCloseable;
1819
import com.datastax.oss.driver.api.core.cql.CqlSession;
1920

2021
/**
@@ -24,7 +25,7 @@
2425
* registered a custom request processor with the driver). For regular CQL queries, see {@link
2526
* CqlSession}.
2627
*/
27-
public interface Session {
28+
public interface Session extends AsyncAutoCloseable {
2829
<SyncResultT, AsyncResultT> SyncResultT execute(Request<SyncResultT, AsyncResultT> request);
2930

3031
<SyncResultT, AsyncResultT> AsyncResultT executeAsync(Request<SyncResultT, AsyncResultT> request);

core/src/main/java/com/datastax/oss/driver/internal/core/DefaultCluster.java

Lines changed: 61 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,19 @@
1717

1818
import com.datastax.oss.driver.api.core.AsyncAutoCloseable;
1919
import com.datastax.oss.driver.api.core.Cluster;
20+
import com.datastax.oss.driver.api.core.CqlIdentifier;
21+
import com.datastax.oss.driver.api.core.DriverException;
2022
import com.datastax.oss.driver.api.core.context.DriverContext;
23+
import com.datastax.oss.driver.api.core.cql.CqlSession;
2124
import com.datastax.oss.driver.api.core.metadata.Metadata;
25+
import com.datastax.oss.driver.api.core.session.Session;
2226
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
2327
import com.datastax.oss.driver.internal.core.metadata.MetadataManager;
2428
import com.datastax.oss.driver.internal.core.metadata.NodeStateManager;
29+
import com.datastax.oss.driver.internal.core.session.DefaultSession;
2530
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
2631
import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule;
32+
import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions;
2733
import com.google.common.collect.ImmutableList;
2834
import io.netty.util.concurrent.EventExecutor;
2935
import java.net.InetSocketAddress;
@@ -72,6 +78,13 @@ public DriverContext getContext() {
7278
return context;
7379
}
7480

81+
@Override
82+
public CompletionStage<CqlSession> connectAsync(CqlIdentifier keyspace) {
83+
CompletableFuture<CqlSession> connectFuture = new CompletableFuture<>();
84+
RunOrSchedule.on(adminExecutor, () -> singleThreaded.connect(keyspace, connectFuture));
85+
return connectFuture;
86+
}
87+
7588
@Override
7689
public CompletionStage<Void> closeFuture() {
7790
return singleThreaded.closeFuture;
@@ -99,12 +112,15 @@ private class SingleThreaded {
99112
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
100113
private boolean closeWasCalled;
101114
private boolean forceCloseWasCalled;
102-
private List<CompletionStage<Void>> childrenCloseFutures;
115+
// Note: closed sessions are not removed from the list. If this creates a memory issue, there
116+
// is something really wrong in the client program
117+
private List<Session> sessions;
103118

104119
private SingleThreaded(InternalDriverContext context, Set<InetSocketAddress> contactPoints) {
105120
this.context = context;
106121
this.nodeStateManager = new NodeStateManager(context);
107122
this.initialContactPoints = contactPoints;
123+
this.sessions = new ArrayList<>();
108124
}
109125

110126
private void init() {
@@ -152,6 +168,30 @@ private void init() {
152168
});
153169
}
154170

171+
private void connect(CqlIdentifier keyspace, CompletableFuture<CqlSession> connectFuture) {
172+
assert adminExecutor.inEventLoop();
173+
if (closeWasCalled) {
174+
connectFuture.completeExceptionally(new DriverException("Cluster was closed"));
175+
} else {
176+
DefaultSession.init(context, keyspace)
177+
.whenCompleteAsync(
178+
(session, error) -> {
179+
if (error != null) {
180+
connectFuture.completeExceptionally(error);
181+
} else if (closeWasCalled) {
182+
connectFuture.completeExceptionally(
183+
new DriverException("Cluster was closed while session was initializing"));
184+
session.forceCloseAsync();
185+
} else {
186+
sessions.add(session);
187+
connectFuture.complete(session);
188+
}
189+
},
190+
adminExecutor)
191+
.exceptionally(UncaughtExceptions::log);
192+
}
193+
}
194+
155195
private void close() {
156196
assert adminExecutor.inEventLoop();
157197
if (closeWasCalled) {
@@ -160,12 +200,13 @@ private void close() {
160200
closeWasCalled = true;
161201

162202
LOG.debug("Closing {}", this);
163-
childrenCloseFutures = new ArrayList<>();
203+
List<CompletionStage<Void>> childrenCloseStages = new ArrayList<>();
164204
for (AsyncAutoCloseable closeable : internalComponentsToClose()) {
165205
LOG.debug("Closing {}", closeable);
166-
childrenCloseFutures.add(closeable.closeAsync());
206+
childrenCloseStages.add(closeable.closeAsync());
167207
}
168-
CompletableFutures.whenAllDone(childrenCloseFutures, this::onChildrenClosed, adminExecutor);
208+
CompletableFutures.whenAllDone(
209+
childrenCloseStages, () -> onChildrenClosed(childrenCloseStages), adminExecutor);
169210
}
170211

171212
private void forceClose() {
@@ -178,26 +219,27 @@ private void forceClose() {
178219
LOG.debug("Force-closing {} (was {}closed before)", this, (closeWasCalled ? "" : "not "));
179220

180221
if (closeWasCalled) {
181-
// childrenCloseFutures is already created, and onChildrenClosed has already been called
222+
// onChildrenClosed has already been called
182223
for (AsyncAutoCloseable closeable : internalComponentsToClose()) {
183224
LOG.debug("Force-closing {}", closeable);
184225
closeable.forceCloseAsync();
185226
}
186227
} else {
187228
closeWasCalled = true;
188-
childrenCloseFutures = new ArrayList<>();
229+
List<CompletionStage<Void>> childrenCloseStages = new ArrayList<>();
189230
for (AsyncAutoCloseable closeable : internalComponentsToClose()) {
190231
LOG.debug("Force-closing {}", closeable);
191-
childrenCloseFutures.add(closeable.forceCloseAsync());
232+
childrenCloseStages.add(closeable.forceCloseAsync());
192233
}
193-
CompletableFutures.whenAllDone(childrenCloseFutures, this::onChildrenClosed, adminExecutor);
234+
CompletableFutures.whenAllDone(
235+
childrenCloseStages, () -> onChildrenClosed(childrenCloseStages), adminExecutor);
194236
}
195237
}
196238

197-
private void onChildrenClosed() {
239+
private void onChildrenClosed(List<CompletionStage<Void>> childrenCloseStages) {
198240
assert adminExecutor.inEventLoop();
199-
for (CompletionStage<Void> future : childrenCloseFutures) {
200-
warnIfFailed(future);
241+
for (CompletionStage<Void> stage : childrenCloseStages) {
242+
warnIfFailed(stage);
201243
}
202244
context
203245
.nettyOptions()
@@ -221,11 +263,14 @@ private void warnIfFailed(CompletionStage<Void> stage) {
221263
}
222264

223265
private List<AsyncAutoCloseable> internalComponentsToClose() {
224-
return ImmutableList.of(
225-
nodeStateManager,
226-
metadataManager,
227-
context.topologyMonitor(),
228-
context.controlConnection());
266+
return ImmutableList.<AsyncAutoCloseable>builder()
267+
.addAll(sessions)
268+
.add(
269+
nodeStateManager,
270+
metadataManager,
271+
context.topologyMonitor(),
272+
context.controlConnection())
273+
.build();
229274
}
230275
}
231276
}

core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.datastax.oss.driver.internal.core.metadata.LoadBalancingPolicyWrapper;
3232
import com.datastax.oss.driver.internal.core.metadata.MetadataManager;
3333
import com.datastax.oss.driver.internal.core.metadata.TopologyMonitor;
34+
import com.datastax.oss.driver.internal.core.pool.ChannelPoolFactory;
3435
import com.datastax.oss.driver.internal.core.protocol.ByteBufPrimitiveCodec;
3536
import com.datastax.oss.driver.internal.core.ssl.JdkSslHandlerFactory;
3637
import com.datastax.oss.driver.internal.core.ssl.SslHandlerFactory;
@@ -103,6 +104,7 @@ public class DefaultDriverContext implements InternalDriverContext {
103104
new LazyReference<>("controlConnection", this::buildControlConnection, cycleDetector);
104105

105106
private final DriverConfig config;
107+
private final ChannelPoolFactory channelPoolFactory = new ChannelPoolFactory();
106108

107109
public DefaultDriverContext(DriverConfig config) {
108110
this.config = config;
@@ -274,6 +276,11 @@ public ChannelFactory channelFactory() {
274276
return channelFactoryRef.get();
275277
}
276278

279+
@Override
280+
public ChannelPoolFactory channelPoolFactory() {
281+
return channelPoolFactory;
282+
}
283+
277284
@Override
278285
public TopologyMonitor topologyMonitor() {
279286
return topologyMonitorRef.get();

core/src/main/java/com/datastax/oss/driver/internal/core/context/InternalDriverContext.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.datastax.oss.driver.internal.core.metadata.LoadBalancingPolicyWrapper;
2424
import com.datastax.oss.driver.internal.core.metadata.MetadataManager;
2525
import com.datastax.oss.driver.internal.core.metadata.TopologyMonitor;
26+
import com.datastax.oss.driver.internal.core.pool.ChannelPoolFactory;
2627
import com.datastax.oss.driver.internal.core.ssl.SslHandlerFactory;
2728
import com.datastax.oss.protocol.internal.Compressor;
2829
import com.datastax.oss.protocol.internal.FrameCodec;
@@ -48,6 +49,8 @@ public interface InternalDriverContext extends DriverContext {
4849

4950
ChannelFactory channelFactory();
5051

52+
ChannelPoolFactory channelPoolFactory();
53+
5154
TopologyMonitor topologyMonitor();
5255

5356
MetadataManager metadataManager();

core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,10 @@ public CompletionStage<Void> setKeyspace(CqlIdentifier newKeyspaceName) {
132132
return RunOrSchedule.on(adminExecutor, () -> singleThreaded.setKeyspace(newKeyspaceName));
133133
}
134134

135+
public void reconnectNow() {
136+
RunOrSchedule.on(adminExecutor, singleThreaded::reconnectNow);
137+
}
138+
135139
@Override
136140
public CompletionStage<Void> closeFuture() {
137141
return singleThreaded.closeFuture;
@@ -362,6 +366,11 @@ private CompletionStage<Void> setKeyspace(CqlIdentifier newKeyspaceName) {
362366
return setKeyspaceFuture;
363367
}
364368

369+
private void reconnectNow() {
370+
assert adminExecutor.inEventLoop();
371+
reconnection.reconnectNow(false);
372+
}
373+
365374
private void close() {
366375
assert adminExecutor.inEventLoop();
367376
if (isClosing) {
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright (C) 2017-2017 DataStax Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.driver.internal.core.pool;
17+
18+
import com.datastax.oss.driver.api.core.CqlIdentifier;
19+
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
20+
import com.datastax.oss.driver.api.core.metadata.Node;
21+
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
22+
import java.util.concurrent.CompletionStage;
23+
24+
/** Just a level of indirection to make testing easier. */
25+
public class ChannelPoolFactory {
26+
public CompletionStage<ChannelPool> init(
27+
Node node, CqlIdentifier keyspaceName, NodeDistance distance, InternalDriverContext context) {
28+
return ChannelPool.init(node, keyspaceName, distance, context);
29+
}
30+
}

0 commit comments

Comments
 (0)