|
23 | 23 |
|
24 | 24 | from cassandra.query import SimpleStatement |
25 | 25 | from cassandra import ConsistencyLevel, WriteTimeout, Unavailable, ReadTimeout |
| 26 | +from cassandra.protocol import SyntaxException |
26 | 27 |
|
27 | 28 | from cassandra.cluster import Cluster, NoHostAvailable |
28 | 29 | from tests.integration import get_cluster, get_node, use_singledc, PROTOCOL_VERSION, execute_until_pass |
29 | 30 | from greplin import scales |
30 | | -from tests.integration import BasicSharedKeyspaceUnitTestCaseWTable |
| 31 | +from tests.integration import BasicSharedKeyspaceUnitTestCaseWTable, BasicExistingKeyspaceUnitTestCase |
31 | 32 |
|
32 | 33 | def setup_module(): |
33 | 34 | use_singledc() |
@@ -271,5 +272,84 @@ def test_duplicate_metrics_per_cluster(self): |
271 | 272 | self.assertTrue("devops" in scales._Stats.stats.keys()) |
272 | 273 |
|
273 | 274 |
|
| 275 | +class RequestAnalyzer(object): |
| 276 | + """ |
| 277 | + Class used to track request and error counts for a Session. |
| 278 | + Also computes statistics on encoded request size. |
| 279 | + """ |
| 280 | + |
| 281 | + requests = scales.PmfStat('request size') |
| 282 | + errors = scales.IntStat('errors') |
| 283 | + successful = scales.IntStat("success") |
| 284 | + # Throw exceptions when invoked. |
| 285 | + throw_on_success = False |
| 286 | + throw_on_fail = False |
| 287 | + |
| 288 | + def __init__(self, session, throw_on_success=False, throw_on_fail=False): |
| 289 | + scales.init(self, '/request') |
| 290 | + # each instance will be registered with a session, and receive a callback for each request generated |
| 291 | + session.add_request_init_listener(self.on_request) |
| 292 | + self.throw_on_fail = throw_on_fail |
| 293 | + self.throw_on_success = throw_on_success |
| 294 | + |
| 295 | + def on_request(self, rf): |
| 296 | + # This callback is invoked each time a request is created, on the thread creating the request. |
| 297 | + # We can use this to count events, or add callbacks |
| 298 | + rf.add_callbacks(self.on_success, self.on_error, callback_args=(rf,), errback_args=(rf,)) |
| 299 | + |
| 300 | + def on_success(self, _, response_future): |
| 301 | + # future callback on a successful request; just record the size |
| 302 | + self.requests.addValue(response_future.request_encoded_size) |
| 303 | + self.successful += 1 |
| 304 | + if self.throw_on_success: |
| 305 | + raise AttributeError |
| 306 | + |
| 307 | + def on_error(self, _, response_future): |
| 308 | + # future callback for failed; record size and increment errors |
| 309 | + self.requests.addValue(response_future.request_encoded_size) |
| 310 | + self.errors += 1 |
| 311 | + if self.throw_on_fail: |
| 312 | + raise AttributeError |
| 313 | + |
| 314 | + def __str__(self): |
| 315 | + # just extracting request count from the size stats (which are recorded on all requests) |
| 316 | + request_sizes = dict(self.requests) |
| 317 | + count = request_sizes.pop('count') |
| 318 | + return "%d requests (%d errors)\nRequest size statistics:\n%s" % (count, self.errors, pp.pformat(request_sizes)) |
| 319 | + |
| 320 | + |
| 321 | +class MetricsRequestSize(BasicExistingKeyspaceUnitTestCase): |
274 | 322 |
|
| 323 | + def test_metrics_per_cluster(self): |
| 324 | + """ |
| 325 | + Test to validate that requests listeners. |
| 326 | +
|
| 327 | + This test creates a simple metrics based request listener to track request size, it then |
| 328 | + check to ensure that on_success and on_error methods are invoked appropriately. |
| 329 | + @since 3.7.0 |
| 330 | + @jira_ticket PYTHON-284 |
| 331 | + @expected_result in_error, and on_success should be invoked apropriately |
| 332 | +
|
| 333 | + @test_category metrics |
| 334 | + """ |
| 335 | + |
| 336 | + ra = RequestAnalyzer(self.session) |
| 337 | + for _ in range(10): |
| 338 | + self.session.execute("SELECT release_version FROM system.local") |
| 339 | + |
| 340 | + for _ in range(3): |
| 341 | + try: |
| 342 | + self.session.execute("nonesense") |
| 343 | + except SyntaxException: |
| 344 | + continue |
275 | 345 |
|
| 346 | + self.assertEqual(ra.errors, 3) |
| 347 | + self.assertEqual(ra.successful, 10) |
| 348 | + |
| 349 | + # Make sure a poorly coded RA doesn't cause issues |
| 350 | + RequestAnalyzer(self.session, throw_on_success=False, throw_on_fail=True) |
| 351 | + self.session.execute("SELECT release_version FROM system.local") |
| 352 | + try: |
| 353 | + self.session.execute("nonesense") |
| 354 | + except SyntaxException: |
| 355 | + pass |
0 commit comments