2424from cassandra import ConsistencyLevel
2525from cassandra .cluster import Cluster
2626from cassandra .query import PreparedStatement , UNSET_VALUE , tuple_factory
27- from tests .integration import get_server_versions
27+ from tests .integration import get_server_versions , greaterthanorequalcass40 , BasicSharedKeyspaceUnitTestCase
2828
2929
3030def setup_module ():
@@ -390,8 +390,21 @@ def test_raise_error_on_prepared_statement_execution_dropped_table(self):
390390 with self .assertRaises (InvalidRequest ):
391391 self .session .execute (prepared , [0 ])
392392
393- # TODO revisit this test, it on hold now due to CASSANDRA-10786
394- @unittest .skip
393+
394+ @greaterthanorequalcass40
395+ class PreparedStatementInvalidationTest (BasicSharedKeyspaceUnitTestCase ):
396+
397+ def setUp (self ):
398+ self .table_name = "{}.prepared_statement_invalidation_test" .format (self .keyspace_name )
399+ self .session .execute ("CREATE TABLE {} (a int PRIMARY KEY, b int, d int);" .format (self .table_name ))
400+ self .session .execute ("INSERT INTO {} (a, b, d) VALUES (1, 1, 1);" .format (self .table_name ))
401+ self .session .execute ("INSERT INTO {} (a, b, d) VALUES (2, 2, 2);" .format (self .table_name ))
402+ self .session .execute ("INSERT INTO {} (a, b, d) VALUES (3, 3, 3);" .format (self .table_name ))
403+ self .session .execute ("INSERT INTO {} (a, b, d) VALUES (4, 4, 4);" .format (self .table_name ))
404+
405+ def tearDown (self ):
406+ self .session .execute ("DROP TABLE {}" .format (self .table_name ))
407+
395408 def test_invalidated_result_metadata (self ):
396409 """
397410 Tests to make sure cached metadata is updated when an invalidated prepared statement is reprepared.
@@ -402,29 +415,107 @@ def test_invalidated_result_metadata(self):
402415 Prior to this fix, the request would blow up with a protocol error when the result was decoded expecting a different
403416 number of columns.
404417 """
405- s = self .session
406- s .result_factory = tuple_factory
407-
408- table = "test1rf.%s" % self ._testMethodName .lower ()
409-
410- s .execute ("DROP TABLE IF EXISTS %s" % table )
411- s .execute ("CREATE TABLE %s (k int PRIMARY KEY, a int, b int, c int)" % table )
412- s .execute ("INSERT INTO %s (k, a, b, c) VALUES (0, 0, 0, 0)" % table )
413-
414- wildcard_prepared = s .prepare ("SELECT * FROM %s" % table )
418+ wildcard_prepared = self .session .prepare ("SELECT * FROM {}" .format (self .table_name ))
415419 original_result_metadata = wildcard_prepared .result_metadata
416- self .assertEqual (len (original_result_metadata ), 4 )
420+ self .assertEqual (len (original_result_metadata ), 3 )
417421
418- r = s .execute (wildcard_prepared )
419- self .assertEqual (r [0 ], (0 , 0 , 0 , 0 ))
422+ r = self . session .execute (wildcard_prepared )
423+ self .assertEqual (r [0 ], (1 , 1 , 1 ))
420424
421- s . execute ("ALTER TABLE %s DROP c" % table )
425+ self . session . execute ("ALTER TABLE {} DROP d" . format ( self . table_name ) )
422426
423427 # Get a bunch of requests in the pipeline with varying states of result_meta, reprepare, resolved
424- futures = set (s .execute_async (wildcard_prepared .bind (None )) for _ in range (200 ))
428+ futures = set (self . session .execute_async (wildcard_prepared .bind (None )) for _ in range (200 ))
425429 for f in futures :
430+ self .assertEqual (f .result ()[0 ], (1 , 1 ))
426431
427- self .assertEqual (f .result ()[0 ], (0 , 0 , 0 ))
428432 self .assertIsNot (wildcard_prepared .result_metadata , original_result_metadata )
429- s .execute ("DROP TABLE %s" % table )
430433
434+ def test_prepared_id_is_update (self ):
435+ """
436+ Tests that checks the query id from the prepared statement
437+ is updated properly if the table that the prepared statement is querying
438+ is altered.
439+
440+ @since 3.12
441+ @jira_ticket PYTHON-808
442+
443+ The query id from the prepared statment must have changed
444+ """
445+ prepared_statement = self .session .prepare ("SELECT * from {} WHERE a = ?" .format (self .table_name ))
446+ id_before = prepared_statement .query_id
447+
448+ self .session .execute ("ALTER TABLE {} ADD c int" .format (self .table_name ))
449+ bound_statement = prepared_statement .bind ((1 , ))
450+ self .session .execute (bound_statement )
451+
452+ id_after = prepared_statement .query_id
453+
454+ self .assertNotEqual (id_before , id_after )
455+
456+ def test_prepared_id_is_updated_across_pages (self ):
457+ """
458+ Test that checks that the query id from the prepared statement
459+ is updated if the table hat the prepared statement is querying
460+ is altered while fetching pages in a single query.
461+ Then it checks that the updated rows have the expected result.
462+
463+ @since 3.12
464+ @jira_ticket PYTHON-808
465+ """
466+ prepared_statement = self .session .prepare ("SELECT * from {}" .format (self .table_name ))
467+ id_before = prepared_statement .query_id
468+
469+ prepared_statement .fetch_size = 2
470+ result = self .session .execute (prepared_statement .bind ((None )))
471+
472+
473+ self .assertTrue (result .has_more_pages )
474+
475+ self .session .execute ("ALTER TABLE {} ADD c int" .format (self .table_name ))
476+
477+ result_set = set (x for x in ((1 , 1 , 1 ),(2 , 2 , 2 ), (3 , 3 , None , 3 ), (4 , 4 , None , 4 )))
478+ expected_result_set = set (row for row in result )
479+
480+ id_after = prepared_statement .query_id
481+
482+ self .assertEqual (result_set , expected_result_set )
483+ self .assertNotEqual (id_before , id_after )
484+
485+ def test_prepare_id_is_updated_across_session (self ):
486+ """
487+ Test that checks that the query id from the prepared statement
488+ is updated if the table hat the prepared statement is querying
489+ is altered by a different session
490+
491+ @since 3.12
492+ @jira_ticket PYTHON-808
493+ """
494+ one_cluster = Cluster ()
495+ one_session = one_cluster .connect ()
496+ self .addCleanup (one_cluster .shutdown )
497+
498+ stm = "SELECT * from {} WHERE a = ?" .format (self .table_name )
499+ one_prepared_stm = one_session .prepare (stm )
500+
501+ one_id_before = one_prepared_stm .query_id
502+
503+ self .session .execute ("ALTER TABLE {} ADD c int" .format (self .table_name ))
504+ one_session .execute (one_prepared_stm , (1 , ))
505+
506+ one_id_after = one_prepared_stm .query_id
507+ self .assertNotEqual (one_id_before , one_id_after )
508+
509+ def test_not_reprepare_invalid_statements (self ):
510+ """
511+ Test that checks that an InvalidRequest is arisen if a column
512+ expected by the prepared statement is dropped.
513+
514+ @since 3.12
515+ @jira_ticket PYTHON-808
516+ """
517+ prepared_statement = self .session .prepare (
518+ "SELECT a, b, d FROM {} WHERE a = ?" .format (self .table_name ))
519+ self .session .execute ("ALTER TABLE {} DROP d" .format (self .table_name ))
520+ with self .assertRaises (InvalidRequest ):
521+ self .session .execute (prepared_statement .bind ((1 , )))
0 commit comments