Skip to content

Commit d56fede

Browse files
committed
PYTHON-871 Added test simulating network isolation
1 parent f432958 commit d56fede

2 files changed

Lines changed: 95 additions & 3 deletions

File tree

tests/integration/simulacron/test_connection.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@
3636
prime_query, prime_request,
3737
start_and_prime_cluster_defaults,
3838
start_and_prime_singledc,
39-
clear_queries)
39+
clear_queries, RejectConnections,
40+
RejectType, AcceptConnections)
4041

4142

4243
class TrackDownListener(HostStateListener):
@@ -46,6 +47,15 @@ def __init__(self):
4647
def on_down(self, host):
4748
self.hosts_marked_down.append(host)
4849

50+
def on_up(self, host):
51+
pass
52+
53+
def on_add(self, host):
54+
pass
55+
56+
def on_remove(self, host):
57+
pass
58+
4959
class ThreadTracker(ThreadPoolExecutor):
5060
called_functions = []
5161

@@ -339,3 +349,37 @@ class ExtendedConnection(AsyncoreConnection):
339349
connection_class=ExtendedConnection)
340350
cluster.connect()
341351
cluster.shutdown()
352+
353+
def test_driver_recovers_nework_isolation(self):
354+
start_and_prime_singledc()
355+
356+
idle_heartbeat_timeout = 3
357+
idle_heartbeat_interval = 1
358+
359+
listener = TrackDownListener()
360+
361+
cluster = Cluster(['127.0.0.1'],
362+
load_balancing_policy=RoundRobinPolicy(),
363+
idle_heartbeat_timeout=idle_heartbeat_timeout,
364+
idle_heartbeat_interval=idle_heartbeat_interval,
365+
executor_threads=16)
366+
session = cluster.connect(wait_for_all_pools=True)
367+
368+
cluster.register_listener(listener)
369+
370+
prime_request(PrimeOptions(then=NO_THEN))
371+
prime_request(RejectConnections(RejectType.REJECT_STARTUP))
372+
373+
time.sleep((idle_heartbeat_timeout + idle_heartbeat_interval) * 2)
374+
375+
for host in cluster.metadata.all_hosts():
376+
self.assertIn(host, listener.hosts_marked_down)
377+
378+
self.assertRaises(NoHostAvailable, session.execute, "SELECT * from system.local")
379+
380+
clear_queries()
381+
prime_request(AcceptConnections())
382+
383+
time.sleep(idle_heartbeat_timeout + idle_heartbeat_interval + 2)
384+
385+
self.assertIsNotNone(session.execute("SELECT * from system.local"))

tests/integration/simulacron/utils.py

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ def submit_request(self, query):
103103

104104
request = Request("http://{}/{}{}".format(
105105
self.admin_addr, query.path, query.fetch_url_params()), data=data)
106-
request.get_method = lambda: 'POST'
106+
request.get_method = lambda: query.method
107107
request.add_header("Content-Type", 'application/json')
108108
request.add_header("Content-Length", len(data))
109109

@@ -142,9 +142,13 @@ def clear_all_queries(self, cluster_name=DEFAULT_CLUSTER):
142142

143143
class SimulacronRequest(object):
144144
def fetch_json(self):
145-
raise NotImplementedError()
145+
return {}
146146

147147
def fetch_url_params(self):
148+
return ""
149+
150+
@property
151+
def method(self):
148152
raise NotImplementedError()
149153

150154

@@ -175,6 +179,44 @@ def fetch_json(self):
175179
def fetch_url_params(self):
176180
return ""
177181

182+
@property
183+
def method(self):
184+
return "POST"
185+
186+
187+
class RejectType():
188+
UNBIND = "UNBIND"
189+
STOP = "STOP"
190+
REJECT_STARTUP = "REJECT_STARTUP"
191+
192+
193+
class RejectConnections(SimulacronRequest):
194+
"""
195+
Class used for making simulacron reject new connections
196+
"""
197+
def __init__(self, reject_type, cluster_name=DEFAULT_CLUSTER):
198+
self.path = "listener/{}".format(cluster_name)
199+
self.reject_type = reject_type
200+
201+
def fetch_url_params(self):
202+
return "?type={0}".format(self.reject_type)
203+
204+
@property
205+
def method(self):
206+
return "DELETE"
207+
208+
209+
class AcceptConnections(SimulacronRequest):
210+
"""
211+
Class used for making simulacron reject new connections
212+
"""
213+
def __init__(self, cluster_name=DEFAULT_CLUSTER):
214+
self.path = "listener/{}".format(cluster_name)
215+
216+
@property
217+
def method(self):
218+
return "PUT"
219+
178220

179221
class PrimeQuery(SimulacronRequest):
180222
"""
@@ -228,6 +270,9 @@ def set_node(self, cluster_id, datacenter_id, node_id):
228270
def fetch_url_params(self):
229271
return ""
230272

273+
@property
274+
def method(self):
275+
return "POST"
231276

232277
class ClusterQuery(SimulacronRequest):
233278
"""
@@ -251,6 +296,9 @@ def fetch_url_params(self):
251296
return "?cassandra_version={0}&data_centers={1}&name={2}".\
252297
format(self.cassandra_version, self.data_centers, self.cluster_name)
253298

299+
@property
300+
def method(self):
301+
return "POST"
254302

255303
def prime_driver_defaults():
256304
"""

0 commit comments

Comments
 (0)