|
19 | 19 | from google.cloud.spanner.pool import BurstyPool |
20 | 20 |
|
21 | 21 | # Import relative to the script's directory |
| 22 | +from streaming_utils import FOUR_KAY |
| 23 | +from streaming_utils import FORTY_KAY |
| 24 | +from streaming_utils import FOUR_HUNDRED_KAY |
| 25 | +from streaming_utils import FOUR_MEG |
22 | 26 | from streaming_utils import DATABASE_NAME |
23 | 27 | from streaming_utils import INSTANCE_NAME |
24 | 28 | from streaming_utils import print_func |
25 | 29 |
|
| 30 | + |
26 | 31 | DDL = """\ |
27 | | -CREATE TABLE four_kay ( |
| 32 | +CREATE TABLE {0.table} ( |
28 | 33 | pkey INT64, |
29 | | - chunk_me STRING(4096) ) |
| 34 | + chunk_me STRING({0.value_size}) ) |
30 | 35 | PRIMARY KEY (pkey); |
31 | | -CREATE TABLE forty_kay ( |
| 36 | +CREATE TABLE {1.table} ( |
32 | 37 | pkey INT64, |
33 | | - chunk_me STRING(40960) ) |
| 38 | + chunk_me STRING({1.value_size}) ) |
34 | 39 | PRIMARY KEY (pkey); |
35 | | -CREATE TABLE four_hundred_kay ( |
| 40 | +CREATE TABLE {2.table} ( |
36 | 41 | pkey INT64, |
37 | | - chunk_me STRING(409600) ) |
| 42 | + chunk_me STRING({2.value_size}) ) |
38 | 43 | PRIMARY KEY (pkey); |
39 | | -CREATE TABLE four_meg ( |
| 44 | +CREATE TABLE {3.table} ( |
40 | 45 | pkey INT64, |
41 | | - chunk_me STRING(2097152), |
42 | | - chunk_me_2 STRING(2097152) ) |
| 46 | + chunk_me STRING({3.value_size}), |
| 47 | + chunk_me_2 STRING({3.value_size}) ) |
43 | 48 | PRIMARY KEY (pkey); |
44 | | -""" |
| 49 | +""".format(FOUR_KAY, FORTY_KAY, FOUR_HUNDRED_KAY, FOUR_MEG) |
| 50 | + |
45 | 51 |
|
46 | 52 | DDL_STATEMENTS = [stmt.strip() for stmt in DDL.split(';') if stmt.strip()] |
47 | 53 |
|
@@ -75,49 +81,51 @@ def ensure_database(client): |
75 | 81 | return database |
76 | 82 |
|
77 | 83 |
|
78 | | -def populate_table(database, table_name, row_count, val_size): |
| 84 | +def populate_table(database, table_desc): |
79 | 85 | all_ = KeySet(all_=True) |
80 | 86 | columns = ('pkey', 'chunk_me') |
81 | 87 | rows = list(database.execute_sql( |
82 | | - 'SELECT COUNT(*) FROM {}'.format(table_name))) |
| 88 | + 'SELECT COUNT(*) FROM {}'.format(table_desc.table))) |
83 | 89 | assert len(rows) == 1 |
84 | 90 | count = rows[0][0] |
85 | | - if count != row_count: |
86 | | - print_func("Repopulating table: {}".format(table_name)) |
87 | | - chunk_me = 'X' * val_size |
88 | | - row_data = [(index, chunk_me) for index in range(row_count)] |
| 91 | + if count != table_desc.row_count: |
| 92 | + print_func("Repopulating table: {}".format(table_desc.table)) |
| 93 | + chunk_me = table_desc.value() |
| 94 | + row_data = [(index, chunk_me) for index in range(table_desc.row_count)] |
89 | 95 | with database.batch() as batch: |
90 | | - batch.delete(table_name, all_) |
91 | | - batch.insert(table_name, columns, row_data) |
| 96 | + batch.delete(table_desc.table, all_) |
| 97 | + batch.insert(table_desc.table, columns, row_data) |
92 | 98 | else: |
93 | | - print_func("Leaving table: {}".format(table_name)) |
| 99 | + print_func("Leaving table: {}".format(table_desc.table)) |
94 | 100 |
|
95 | 101 |
|
96 | | -def populate_table_2_columns(database, table_name, row_count, val_size): |
| 102 | +def populate_table_2_columns(database, table_desc): |
97 | 103 | all_ = KeySet(all_=True) |
98 | 104 | columns = ('pkey', 'chunk_me', 'chunk_me_2') |
99 | 105 | rows = list(database.execute_sql( |
100 | | - 'SELECT COUNT(*) FROM {}'.format(table_name))) |
| 106 | + 'SELECT COUNT(*) FROM {}'.format(table_desc.table))) |
101 | 107 | assert len(rows) == 1 |
102 | 108 | count = rows[0][0] |
103 | | - if count != row_count: |
104 | | - print_func("Repopulating table: {}".format(table_name)) |
105 | | - chunk_me = 'X' * val_size |
106 | | - row_data = [(index, chunk_me, chunk_me) for index in range(row_count)] |
| 109 | + if count != table_desc.row_count: |
| 110 | + print_func("Repopulating table: {}".format(table_desc.table)) |
| 111 | + chunk_me = table_desc.value() |
| 112 | + row_data = [ |
| 113 | + (index, chunk_me, chunk_me) |
| 114 | + for index in range(table_desc.row_count)] |
107 | 115 | with database.batch() as batch: |
108 | | - batch.delete(table_name, all_) |
109 | | - batch.insert(table_name, columns, row_data) |
| 116 | + batch.delete(table_desc.table, all_) |
| 117 | + batch.insert(table_desc.table, columns, row_data) |
110 | 118 | else: |
111 | | - print_func("Leaving table: {}".format(table_name)) |
| 119 | + print_func("Leaving table: {}".format(table_desc.table)) |
112 | 120 |
|
113 | 121 |
|
114 | 122 | def populate_streaming(client): |
115 | 123 | database = ensure_database(client) |
116 | | - populate_table(database, 'four_kay', 1000, 4096) |
117 | | - populate_table(database, 'forty_kay', 100, 4096 * 10) |
118 | | - populate_table(database, 'four_hundred_kay', 25, 4096 * 100) |
| 124 | + populate_table(database, FOUR_KAY) |
| 125 | + populate_table(database, FORTY_KAY) |
| 126 | + populate_table(database, FOUR_HUNDRED_KAY) |
119 | 127 | # Max STRING column size is just larger than 2 Mb, so use two columns |
120 | | - populate_table_2_columns(database, 'four_meg', 10, 2048 * 1024) |
| 128 | + populate_table_2_columns(database, FOUR_MEG) |
121 | 129 |
|
122 | 130 |
|
123 | 131 | if __name__ == '__main__': |
|
0 commit comments