Skip to content

Commit df67008

Browse files
authored
Merge pull request googleapis#3414 from GoogleCloudPlatform/3019-spanner-streaming_chunking_systests
Test streaming of various payload row sizes.
2 parents 5bc1e38 + 7d9db45 commit df67008

File tree

3 files changed

+107
-32
lines changed

3 files changed

+107
-32
lines changed

spanner/tests/system/test_system.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -726,6 +726,59 @@ def test_execute_sql_w_query_param(self):
726726
)
727727

728728

729+
class TestStreamingChunking(unittest.TestCase, _TestData):
730+
731+
@classmethod
732+
def setUpClass(cls):
733+
from tests.system.utils.streaming_utils import INSTANCE_NAME
734+
from tests.system.utils.streaming_utils import DATABASE_NAME
735+
736+
instance = Config.CLIENT.instance(INSTANCE_NAME)
737+
if not instance.exists():
738+
raise unittest.SkipTest(
739+
"Run 'tests/system/utils/populate_streaming.py' to enable.")
740+
741+
database = instance.database(DATABASE_NAME)
742+
if not instance.exists():
743+
raise unittest.SkipTest(
744+
"Run 'tests/system/utils/populate_streaming.py' to enable.")
745+
746+
cls._db = database
747+
748+
def _verify_one_column(self, table_desc):
749+
sql = 'SELECT chunk_me FROM {}'.format(table_desc.table)
750+
rows = list(self._db.execute_sql(sql))
751+
self.assertEqual(len(rows), table_desc.row_count)
752+
expected = table_desc.value()
753+
for row in rows:
754+
self.assertEqual(row[0], expected)
755+
756+
def _verify_two_columns(self, table_desc):
757+
sql = 'SELECT chunk_me, chunk_me_2 FROM {}'.format(table_desc.table)
758+
rows = list(self._db.execute_sql(sql))
759+
self.assertEqual(len(rows), table_desc.row_count)
760+
expected = table_desc.value()
761+
for row in rows:
762+
self.assertEqual(row[0], expected)
763+
self.assertEqual(row[1], expected)
764+
765+
def test_four_kay(self):
766+
from tests.system.utils.streaming_utils import FOUR_KAY
767+
self._verify_one_column(FOUR_KAY)
768+
769+
def test_forty_kay(self):
770+
from tests.system.utils.streaming_utils import FOUR_KAY
771+
self._verify_one_column(FOUR_KAY)
772+
773+
def test_four_hundred_kay(self):
774+
from tests.system.utils.streaming_utils import FOUR_HUNDRED_KAY
775+
self._verify_one_column(FOUR_HUNDRED_KAY)
776+
777+
def test_four_meg(self):
778+
from tests.system.utils.streaming_utils import FOUR_MEG
779+
self._verify_two_columns(FOUR_MEG)
780+
781+
729782
class _DatabaseDropper(object):
730783
"""Helper for cleaning up databases created on-the-fly."""
731784

spanner/tests/system/utils/populate_streaming.py

Lines changed: 40 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,35 @@
1919
from google.cloud.spanner.pool import BurstyPool
2020

2121
# Import relative to the script's directory
22+
from streaming_utils import FOUR_KAY
23+
from streaming_utils import FORTY_KAY
24+
from streaming_utils import FOUR_HUNDRED_KAY
25+
from streaming_utils import FOUR_MEG
2226
from streaming_utils import DATABASE_NAME
2327
from streaming_utils import INSTANCE_NAME
2428
from streaming_utils import print_func
2529

30+
2631
DDL = """\
27-
CREATE TABLE four_kay (
32+
CREATE TABLE {0.table} (
2833
pkey INT64,
29-
chunk_me STRING(4096) )
34+
chunk_me STRING({0.value_size}) )
3035
PRIMARY KEY (pkey);
31-
CREATE TABLE forty_kay (
36+
CREATE TABLE {1.table} (
3237
pkey INT64,
33-
chunk_me STRING(40960) )
38+
chunk_me STRING({1.value_size}) )
3439
PRIMARY KEY (pkey);
35-
CREATE TABLE four_hundred_kay (
40+
CREATE TABLE {2.table} (
3641
pkey INT64,
37-
chunk_me STRING(409600) )
42+
chunk_me STRING({2.value_size}) )
3843
PRIMARY KEY (pkey);
39-
CREATE TABLE four_meg (
44+
CREATE TABLE {3.table} (
4045
pkey INT64,
41-
chunk_me STRING(2097152),
42-
chunk_me_2 STRING(2097152) )
46+
chunk_me STRING({3.value_size}),
47+
chunk_me_2 STRING({3.value_size}) )
4348
PRIMARY KEY (pkey);
44-
"""
49+
""".format(FOUR_KAY, FORTY_KAY, FOUR_HUNDRED_KAY, FOUR_MEG)
50+
4551

4652
DDL_STATEMENTS = [stmt.strip() for stmt in DDL.split(';') if stmt.strip()]
4753

@@ -75,49 +81,51 @@ def ensure_database(client):
7581
return database
7682

7783

78-
def populate_table(database, table_name, row_count, val_size):
84+
def populate_table(database, table_desc):
7985
all_ = KeySet(all_=True)
8086
columns = ('pkey', 'chunk_me')
8187
rows = list(database.execute_sql(
82-
'SELECT COUNT(*) FROM {}'.format(table_name)))
88+
'SELECT COUNT(*) FROM {}'.format(table_desc.table)))
8389
assert len(rows) == 1
8490
count = rows[0][0]
85-
if count != row_count:
86-
print_func("Repopulating table: {}".format(table_name))
87-
chunk_me = 'X' * val_size
88-
row_data = [(index, chunk_me) for index in range(row_count)]
91+
if count != table_desc.row_count:
92+
print_func("Repopulating table: {}".format(table_desc.table))
93+
chunk_me = table_desc.value()
94+
row_data = [(index, chunk_me) for index in range(table_desc.row_count)]
8995
with database.batch() as batch:
90-
batch.delete(table_name, all_)
91-
batch.insert(table_name, columns, row_data)
96+
batch.delete(table_desc.table, all_)
97+
batch.insert(table_desc.table, columns, row_data)
9298
else:
93-
print_func("Leaving table: {}".format(table_name))
99+
print_func("Leaving table: {}".format(table_desc.table))
94100

95101

96-
def populate_table_2_columns(database, table_name, row_count, val_size):
102+
def populate_table_2_columns(database, table_desc):
97103
all_ = KeySet(all_=True)
98104
columns = ('pkey', 'chunk_me', 'chunk_me_2')
99105
rows = list(database.execute_sql(
100-
'SELECT COUNT(*) FROM {}'.format(table_name)))
106+
'SELECT COUNT(*) FROM {}'.format(table_desc.table)))
101107
assert len(rows) == 1
102108
count = rows[0][0]
103-
if count != row_count:
104-
print_func("Repopulating table: {}".format(table_name))
105-
chunk_me = 'X' * val_size
106-
row_data = [(index, chunk_me, chunk_me) for index in range(row_count)]
109+
if count != table_desc.row_count:
110+
print_func("Repopulating table: {}".format(table_desc.table))
111+
chunk_me = table_desc.value()
112+
row_data = [
113+
(index, chunk_me, chunk_me)
114+
for index in range(table_desc.row_count)]
107115
with database.batch() as batch:
108-
batch.delete(table_name, all_)
109-
batch.insert(table_name, columns, row_data)
116+
batch.delete(table_desc.table, all_)
117+
batch.insert(table_desc.table, columns, row_data)
110118
else:
111-
print_func("Leaving table: {}".format(table_name))
119+
print_func("Leaving table: {}".format(table_desc.table))
112120

113121

114122
def populate_streaming(client):
115123
database = ensure_database(client)
116-
populate_table(database, 'four_kay', 1000, 4096)
117-
populate_table(database, 'forty_kay', 100, 4096 * 10)
118-
populate_table(database, 'four_hundred_kay', 25, 4096 * 100)
124+
populate_table(database, FOUR_KAY)
125+
populate_table(database, FORTY_KAY)
126+
populate_table(database, FOUR_HUNDRED_KAY)
119127
# Max STRING column size is just larger than 2 Mb, so use two columns
120-
populate_table_2_columns(database, 'four_meg', 10, 2048 * 1024)
128+
populate_table_2_columns(database, FOUR_MEG)
121129

122130

123131
if __name__ == '__main__':

spanner/tests/system/utils/streaming_utils.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,27 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import collections
1516
import os
1617

1718
INSTANCE_NAME = 'gcp-streaming-systests'
1819
DATABASE_NAME = 'testing'
1920
_SHOULD_PRINT = os.getenv('GOOGLE_CLOUD_NO_PRINT') != 'true'
2021

2122

23+
class _TableDesc(collections.namedtuple(
24+
'TableDesc', ('table', 'row_count', 'value_size', 'column_count'))):
25+
26+
def value(self):
27+
return u'X' * self.value_size
28+
29+
30+
FOUR_KAY = _TableDesc('four_kay', 1000, 4096, 1)
31+
FORTY_KAY = _TableDesc('forty_kay', 100, 4096 * 10, 1)
32+
FOUR_HUNDRED_KAY = _TableDesc('four_hundred_kay', 25, 4096 * 100, 1)
33+
FOUR_MEG = _TableDesc('four_meg', 10, 2048 * 1024, 2)
34+
35+
2236
def print_func(message):
2337
if _SHOULD_PRINT:
2438
print(message)

0 commit comments

Comments
 (0)