Skip to content

Commit 67eaa36

Browse files
committed
Catch OperationTimedOut exception, removed sleeps
1 parent c10f8f8 commit 67eaa36

3 files changed

Lines changed: 40 additions & 24 deletions

File tree

tests/integration/long/test_consistency.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,25 +12,24 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import struct
16-
import traceback
15+
import struct, logging, sys, traceback
1716

18-
import cassandra
19-
from cassandra import ConsistencyLevel
17+
from cassandra import ConsistencyLevel, OperationTimedOut, ReadTimeout, WriteTimeout, Unavailable
2018
from cassandra.cluster import Cluster
21-
from cassandra.policies import TokenAwarePolicy, RoundRobinPolicy, \
22-
DowngradingConsistencyRetryPolicy
19+
from cassandra.policies import TokenAwarePolicy, RoundRobinPolicy, DowngradingConsistencyRetryPolicy
2320
from cassandra.query import SimpleStatement
2421
from tests.integration import use_singledc, PROTOCOL_VERSION
2522

26-
from tests.integration.long.utils import force_stop, create_schema, \
27-
wait_for_down, wait_for_up, start, CoordinatorStats
23+
from tests.integration.long.utils import (force_stop, create_schema, wait_for_down, wait_for_up,
24+
start, CoordinatorStats)
2825

2926
try:
3027
import unittest2 as unittest
3128
except ImportError:
3229
import unittest # noqa
3330

31+
log = logging.getLogger(__name__)
32+
3433
ALL_CONSISTENCY_LEVELS = set([
3534
ConsistencyLevel.ANY, ConsistencyLevel.ONE, ConsistencyLevel.TWO,
3635
ConsistencyLevel.QUORUM, ConsistencyLevel.THREE,
@@ -74,7 +73,14 @@ def _query(self, session, keyspace, count, consistency_level=ConsistencyLevel.ON
7473
ss = SimpleStatement('SELECT * FROM cf WHERE k = 0',
7574
consistency_level=consistency_level,
7675
routing_key=routing_key)
77-
self.coordinator_stats.add_coordinator(session.execute_async(ss))
76+
while True:
77+
try:
78+
self.coordinator_stats.add_coordinator(session.execute_async(ss))
79+
break
80+
except (OperationTimedOut, ReadTimeout):
81+
ex_type, ex, tb = sys.exc_info()
82+
log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb)))
83+
del tb
7884

7985
def _assert_writes_succeed(self, session, keyspace, consistency_levels):
8086
for cl in consistency_levels:
@@ -103,7 +109,7 @@ def _assert_writes_fail(self, session, keyspace, consistency_levels):
103109
try:
104110
self._insert(session, keyspace, 1, cl)
105111
self._cl_expected_failure(cl)
106-
except (cassandra.Unavailable, cassandra.WriteTimeout):
112+
except (Unavailable, WriteTimeout):
107113
pass
108114

109115
def _assert_reads_fail(self, session, keyspace, consistency_levels):
@@ -112,7 +118,7 @@ def _assert_reads_fail(self, session, keyspace, consistency_levels):
112118
try:
113119
self._query(session, keyspace, 1, cl)
114120
self._cl_expected_failure(cl)
115-
except (cassandra.Unavailable, cassandra.ReadTimeout):
121+
except (Unavailable, ReadTimeout):
116122
pass
117123

118124
def _test_tokenaware_one_node_down(self, keyspace, rf, accepted):

tests/integration/long/test_loadbalancingpolicies.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import struct
16-
import time
17-
from cassandra import ConsistencyLevel, Unavailable
15+
import struct, time, logging, sys, traceback
16+
17+
from cassandra import ConsistencyLevel, Unavailable, OperationTimedOut, ReadTimeout
1818
from cassandra.cluster import Cluster, NoHostAvailable
1919
from cassandra.concurrent import execute_concurrent_with_args
2020
from cassandra.policies import (RoundRobinPolicy, DCAwareRoundRobinPolicy,
@@ -32,6 +32,8 @@
3232
except ImportError:
3333
import unittest # noqa
3434

35+
log = logging.getLogger(__name__)
36+
3537

3638
class LoadBalancingPolicyTests(unittest.TestCase):
3739

@@ -59,14 +61,28 @@ def _query(self, session, keyspace, count=12,
5961
self.prepared = session.prepare(query_string)
6062

6163
for i in range(count):
62-
self.coordinator_stats.add_coordinator(session.execute_async(self.prepared.bind((0,))))
64+
while True:
65+
try:
66+
self.coordinator_stats.add_coordinator(session.execute_async(self.prepared.bind((0,))))
67+
break
68+
except (OperationTimedOut, ReadTimeout):
69+
ex_type, ex, tb = sys.exc_info()
70+
log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb)))
71+
del tb
6372
else:
6473
routing_key = struct.pack('>i', 0)
6574
for i in range(count):
6675
ss = SimpleStatement('SELECT * FROM %s.cf WHERE k = 0' % keyspace,
6776
consistency_level=consistency_level,
6877
routing_key=routing_key)
69-
self.coordinator_stats.add_coordinator(session.execute_async(ss))
78+
while True:
79+
try:
80+
self.coordinator_stats.add_coordinator(session.execute_async(ss))
81+
break
82+
except (OperationTimedOut, ReadTimeout):
83+
ex_type, ex, tb = sys.exc_info()
84+
log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb)))
85+
del tb
7086

7187
def test_roundrobin(self):
7288
use_singledc()

tests/integration/long/utils.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,12 +135,10 @@ def wait_for_up(cluster, node, wait=True):
135135
host = cluster.metadata.get_host(IP_FORMAT % node)
136136
time.sleep(0.1)
137137
if host and host.is_up:
138-
# BUG: shouldn't have to, but we do
139-
if wait:
140-
log.debug("Sleeping 30s until host is up")
141-
time.sleep(30)
142138
log.debug("Done waiting for node %s to be up", node)
143139
return
140+
else:
141+
log.debug("Host is still marked down, waiting")
144142

145143

146144
def wait_for_down(cluster, node, wait=True):
@@ -149,10 +147,6 @@ def wait_for_down(cluster, node, wait=True):
149147
host = cluster.metadata.get_host(IP_FORMAT % node)
150148
time.sleep(0.1)
151149
if not host or not host.is_up:
152-
# BUG: shouldn't have to, but we do
153-
if wait:
154-
log.debug("Sleeping 10s until host is down")
155-
time.sleep(10)
156150
log.debug("Done waiting for node %s to be down", node)
157151
return
158152
else:

0 commit comments

Comments
 (0)