Skip to content

Commit afb360a

Browse files
committed
Added Connection.open method. This will be useful when we want to ensure connection pool minimum size.
1 parent 81bc484 commit afb360a

10 files changed

Lines changed: 91 additions & 11 deletions

File tree

driver/src/main/org/mongodb/connection/Connection.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,47 @@
1717
package org.mongodb.connection;
1818

1919
import org.bson.ByteBuf;
20+
import org.mongodb.annotations.NotThreadSafe;
2021

2122
import java.util.List;
2223

24+
/**
25+
* A connection to a MongoDB server with blocking operations.
26+
* <p>
27+
* This class is not completely thread safe. At most one thread can have an active call to sendMessage, and one thread an active call
28+
* to receiveMessage.
29+
* </p>
30+
*
31+
* @since 3.0
32+
*/
33+
@NotThreadSafe
2334
public interface Connection extends BaseConnection {
2435

36+
/**
37+
* Open the connection. This method can be called multiple times, and all but the first should be a no-op.
38+
*/
39+
void open();
40+
41+
/**
42+
* Send a message to the server. The connection may not make any attempt to validate the integrity of the message.
43+
* <p>
44+
* This method blocks until all bytes have been written. This method is not thread safe: only one thread at a time can have an active
45+
* call to this method.
46+
* </p>
47+
*
48+
* @param byteBuffers the list of byte buffers to send.
49+
*/
2550
void sendMessage(final List<ByteBuf> byteBuffers);
2651

52+
/**
53+
* Receive a response to a sent message from the server.
54+
* <p>
55+
* This method blocks until the entire message has been read. This method is not thread safe: only one thread at a time can have an
56+
* active call to this method.
57+
* </p>
58+
*
59+
* @param responseSettings the settings that the response should conform to.
60+
* @return the response
61+
*/
2762
ResponseBuffers receiveMessage(final ResponseSettings responseSettings);
2863
}

driver/src/main/org/mongodb/connection/impl/AuthenticatingConnection.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,17 +64,22 @@ public ServerAddress getServerAddress() {
6464
return wrapped.getServerAddress();
6565
}
6666

67+
@Override
68+
public void open() {
69+
isTrue("open", !isClosed());
70+
wrapped.open();
71+
authenticateAll();
72+
}
73+
6774
@Override
6875
public void sendMessage(final List<ByteBuf> byteBuffers) {
6976
isTrue("open", wrapped != null);
70-
authenticateAll();
7177
wrapped.sendMessage(byteBuffers);
7278
}
7379

7480
@Override
7581
public ResponseBuffers receiveMessage(final ResponseSettings responseSettings) {
7682
isTrue("open", wrapped != null);
77-
authenticateAll();
7883
return wrapped.receiveMessage(responseSettings);
7984
}
8085

driver/src/main/org/mongodb/connection/impl/ConnectingServerConnection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ public ServerDescription getDescription() {
4545
return serverDescription;
4646
}
4747

48+
@Override
49+
public void open() {
50+
connection.open();
51+
}
52+
4853
@Override
4954
public void sendMessage(final List<ByteBuf> byteBuffers) {
5055
connection.sendMessage(byteBuffers);

driver/src/main/org/mongodb/connection/impl/DefaultConnection.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.nio.channels.ClosedByInterruptException;
3939
import java.util.List;
4040

41+
import static org.mongodb.assertions.Assertions.isTrue;
4142
import static org.mongodb.connection.ReplyHeader.REPLY_HEADER_LENGTH;
4243

4344
abstract class DefaultConnection implements Connection {
@@ -66,12 +67,14 @@ public ServerAddress getServerAddress() {
6667
return serverAddress;
6768
}
6869

69-
public DefaultConnectionSettings getSettings() {
70-
return settings;
70+
@Override
71+
public void open() {
72+
isTrue("open", !isClosed());
73+
ensureOpen();
7174
}
7275

7376
public void sendMessage(final List<ByteBuf> byteBuffers) {
74-
check();
77+
isTrue("open", !isClosed());
7578
try {
7679
write(byteBuffers);
7780
} catch (IOException e) {
@@ -82,7 +85,7 @@ public void sendMessage(final List<ByteBuf> byteBuffers) {
8285

8386
@Override
8487
public ResponseBuffers receiveMessage(final ResponseSettings responseSettings) {
85-
check();
88+
isTrue("open", !isClosed());
8689
try {
8790
return receiveMessage(responseSettings, System.nanoTime());
8891
} catch (IOException e) {
@@ -149,10 +152,6 @@ private ResponseBuffers receiveMessage(final ResponseSettings responseSettings,
149152
return new ResponseBuffers(replyHeader, bodyByteBuffer, System.nanoTime() - start);
150153
}
151154

152-
private void check() {
153-
ensureOpen();
154-
}
155-
156155
protected void initializeSocket(final Socket socket) throws IOException {
157156
socket.setTcpNoDelay(true);
158157
socket.setSoTimeout(settings.getReadTimeoutMS());

driver/src/main/org/mongodb/connection/impl/DefaultConnectionProvider.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public Connection get(final long timeout, final TimeUnit timeUnit) {
6262
if (connection == null) {
6363
throw new MongoTimeoutException(String.format("Timeout waiting for a connection after %d %s", timeout, timeUnit));
6464
}
65+
connection.open();
6566
return wrap(connection);
6667
} finally {
6768
waitQueueSize.decrementAndGet();
@@ -91,6 +92,7 @@ public SimpleConnectionPool(final ServerAddress serverAddress, final ConnectionF
9192
@Override
9293
protected Connection createNew() {
9394
return connectionFactory.create(serverAddress);
95+
9496
}
9597

9698
@Override
@@ -121,10 +123,16 @@ public boolean isClosed() {
121123

122124
@Override
123125
public ServerAddress getServerAddress() {
124-
isTrue("open", wrapped != null);
126+
isTrue("open", !isClosed());
125127
return wrapped.getServerAddress();
126128
}
127129

130+
@Override
131+
public void open() {
132+
isTrue("open", !isClosed());
133+
wrapped.open();
134+
}
135+
128136
@Override
129137
public void sendMessage(final List<ByteBuf> byteBuffers) {
130138
isTrue("open", wrapped != null);

driver/src/main/org/mongodb/connection/impl/DefaultServer.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,17 @@ public ServerAddress getServerAddress() {
167167
return wrapped.getServerAddress();
168168
}
169169

170+
@Override
171+
public void open() {
172+
isTrue("open", !isClosed());
173+
try {
174+
wrapped.open();
175+
} catch (MongoException e) {
176+
handleException();
177+
throw e;
178+
}
179+
}
180+
170181
@Override
171182
public void sendMessage(final List<ByteBuf> byteBuffers) {
172183
isTrue("open", !isClosed());

driver/src/main/org/mongodb/connection/impl/IsMasterServerStateNotifier.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ public synchronized void run() {
9191
try {
9292
if (connection == null) {
9393
connection = connectionFactory.create(serverAddress);
94+
connection.open();
9495
}
9596
try {
9697
final CommandResult commandResult = new CommandOperation("admin",

driver/src/main/org/mongodb/session/DelayedCloseConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ public DelayedCloseConnection(final ServerConnection wrapped) {
3737
this.wrapped = notNull("wrapped", wrapped);
3838
}
3939

40+
@Override
41+
public void open() {
42+
isTrue("open", !isClosed());
43+
wrapped.open();
44+
}
45+
4046
@Override
4147
public void sendMessage(final List<ByteBuf> byteBuffers) {
4248
isTrue("open", !isClosed());

driver/src/test/functional/org/mongodb/MongoQueryCursorExhaustTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,12 @@ public ServerAddress getServerAddress() {
163163
return getWrapped().getServerAddress();
164164
}
165165

166+
@Override
167+
public void open() {
168+
isTrue("open", !isClosed());
169+
wrapped.open();
170+
}
171+
166172
@Override
167173
public void sendMessage(final List<ByteBuf> byteBuffers) {
168174
isTrue("open", !isClosed());

driver/src/test/unit/org/mongodb/connection/DefaultConnectionProviderTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@ private static class TestConnectionFactory implements ConnectionFactory {
115115
@Override
116116
public Connection create(final ServerAddress serverAddress) {
117117
return new Connection() {
118+
@Override
119+
public void open() {
120+
}
121+
118122
@Override
119123
public void sendMessage(final List<ByteBuf> byteBuffers) {
120124
throw new UnsupportedOperationException();

0 commit comments

Comments
 (0)