From 662bf816526496d7086c1ea2f6c66aca9fbfb27c Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Tue, 29 May 2018 13:42:30 -0700 Subject: [PATCH 1/8] Adds SchemaUpdateOption --- bigquery/google/cloud/bigquery/__init__.py | 2 ++ bigquery/google/cloud/bigquery/job.py | 12 ++++++++++++ 2 files changed, 14 insertions(+) diff --git a/bigquery/google/cloud/bigquery/__init__.py b/bigquery/google/cloud/bigquery/__init__.py index 503c83bfb85c..751efd5a671b 100644 --- a/bigquery/google/cloud/bigquery/__init__.py +++ b/bigquery/google/cloud/bigquery/__init__.py @@ -49,6 +49,7 @@ from google.cloud.bigquery.job import QueryJob from google.cloud.bigquery.job import QueryJobConfig from google.cloud.bigquery.job import QueryPriority +from google.cloud.bigquery.job import SchemaUpdateOption from google.cloud.bigquery.job import SourceFormat from google.cloud.bigquery.job import UnknownJob from google.cloud.bigquery.job import WriteDisposition @@ -113,6 +114,7 @@ 'DestinationFormat', 'Encoding', 'QueryPriority', + 'SchemaUpdateOption', 'SourceFormat', 'WriteDisposition' ] diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 393dcdf5dad7..c37a35b9e6ac 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -206,6 +206,18 @@ class WriteDisposition(object): returned in the job result.""" +class SchemaUpdateOption(object): + """Specifies an update to the destination table schema as a side effect of + a load job. + """ + + ALLOW_FIELD_ADDITION = 'ALLOW_FIELD_ADDITION' + """Allow adding a nullable field to the schema.""" + + ALLOW_FIELD_RELAXATION = 'ALLOW_FIELD_RELAXATION' + """Allow relaxing a required field in the original schema to nullable.""" + + class _JobReference(object): """A reference to a job. From 9ece7a8306af7a3f1c83be7f7c913be2c6dffe43 Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Tue, 29 May 2018 18:42:00 -0700 Subject: [PATCH 2/8] adds schema update options for load jobs --- bigquery/google/cloud/bigquery/job.py | 19 +++++++++++++++++++ bigquery/tests/unit/test_job.py | 15 ++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index c37a35b9e6ac..d7dfaf7a6a1a 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -1016,6 +1016,18 @@ def time_partitioning(self, value): api_repr = value.to_api_repr() self._set_sub_prop('timePartitioning', api_repr) + @property + def schema_update_options(self): + """List[google.cloud.bigquery.job.SchemaUpdateOption]: Specifies + updates to the destination table schema to allow as a side effect of + the load job. + """ + return self._get_sub_prop('schemaUpdateOptions') + + @schema_update_options.setter + def schema_update_options(self, values): + self._set_sub_prop('schemaUpdateOptions', values) + class LoadJob(_AsyncJob): """Asynchronous job for loading data into a table. @@ -1170,6 +1182,13 @@ def time_partitioning(self): """ return self._configuration.time_partitioning + @property + def schema_update_options(self): + """See + :attr:`google.cloud.bigquery.job.LoadJobConfig.schema_update_options`. + """ + return self._configuration.schema_update_options + @property def input_file_bytes(self): """Count of bytes loaded from source files. diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index a76d05f4a077..b52d6bac71c4 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -380,6 +380,11 @@ def _verifyEnumConfigProperties(self, job, config): config['writeDisposition']) else: self.assertIsNone(job.write_disposition) + if 'schemaUpdateOptions' in config: + self.assertEqual( + job.schema_update_options, config['schemaUpdateOptions']) + else: + self.assertIsNone(job.schema_update_options) def _verifyResourceProperties(self, job, resource): self._verifyReadonlyResourceProperties(job, resource) @@ -467,6 +472,7 @@ def test_ctor(self): self.assertIsNone(job.write_disposition) self.assertIsNone(job.destination_encryption_configuration) self.assertIsNone(job.time_partitioning) + self.assertIsNone(job.schema_update_options) def test_ctor_w_config(self): from google.cloud.bigquery.schema import SchemaField @@ -780,6 +786,7 @@ def test_begin_w_autodetect(self): def test_begin_w_alternate_client(self): from google.cloud.bigquery.job import CreateDisposition + from google.cloud.bigquery.job import SchemaUpdateOption from google.cloud.bigquery.job import WriteDisposition from google.cloud.bigquery.schema import SchemaField @@ -817,7 +824,10 @@ def test_begin_w_alternate_client(self): 'mode': 'REQUIRED', 'description': None, }, - ]} + ]}, + 'schemaUpdateOptions': [ + SchemaUpdateOption.ALLOW_FIELD_ADDITION, + ], } RESOURCE['configuration']['load'] = LOAD_CONFIGURATION conn1 = _make_connection() @@ -842,6 +852,9 @@ def test_begin_w_alternate_client(self): config.skip_leading_rows = 1 config.source_format = 'CSV' config.write_disposition = WriteDisposition.WRITE_TRUNCATE + config.schema_update_options = [ + SchemaUpdateOption.ALLOW_FIELD_ADDITION, + ] job._begin(client=client2) From de2bf6abd14c6becbb5fd2656c3ad12efa7f318b Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Tue, 29 May 2018 18:57:20 -0700 Subject: [PATCH 3/8] adds schema update options for query jobs --- bigquery/google/cloud/bigquery/job.py | 19 +++++++++++++++++++ bigquery/tests/unit/test_job.py | 19 ++++++++++++++++++- 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index d7dfaf7a6a1a..5026445f0287 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -2002,6 +2002,18 @@ def time_partitioning(self, value): api_repr = value.to_api_repr() self._set_sub_prop('timePartitioning', api_repr) + @property + def schema_update_options(self): + """List[google.cloud.bigquery.job.SchemaUpdateOption]: Specifies + updates to the destination table schema to allow as a side effect of + the query job. + """ + return self._get_sub_prop('schemaUpdateOptions') + + @schema_update_options.setter + def schema_update_options(self, values): + self._set_sub_prop('schemaUpdateOptions', values) + def to_api_repr(self): """Build an API representation of the query job config. @@ -2180,6 +2192,13 @@ def time_partitioning(self): """ return self._configuration.time_partitioning + @property + def schema_update_options(self): + """See + :attr:`google.cloud.bigquery.job.QueryJobConfig.schema_update_options`. + """ + return self._configuration.schema_update_options + def _build_resource(self): """Generate a resource for :meth:`begin`.""" configuration = self._configuration.to_api_repr() diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index b52d6bac71c4..a8d0ed96299f 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -2140,6 +2140,11 @@ def _verifyResourceProperties(self, job, resource): 'kmsKeyName']) else: self.assertIsNone(job.destination_encryption_configuration) + if 'schemaUpdateOptions' in query_config: + self.assertEqual( + job.schema_update_options, query_config['schemaUpdateOptions']) + else: + self.assertIsNone(job.schema_update_options) def test_ctor_defaults(self): client = _make_client(project=self.PROJECT) @@ -2170,6 +2175,7 @@ def test_ctor_defaults(self): self.assertIsNone(job.table_definitions) self.assertIsNone(job.destination_encryption_configuration) self.assertIsNone(job.time_partitioning) + self.assertIsNone(job.schema_update_options) def test_ctor_w_udf_resources(self): from google.cloud.bigquery.job import QueryJobConfig @@ -2261,6 +2267,7 @@ def test_from_api_repr_with_encryption(self): def test_from_api_repr_w_properties(self): from google.cloud.bigquery.job import CreateDisposition + from google.cloud.bigquery.job import SchemaUpdateOption from google.cloud.bigquery.job import WriteDisposition client = _make_client(project=self.PROJECT) @@ -2273,6 +2280,9 @@ def test_from_api_repr_w_properties(self): 'datasetId': self.DS_ID, 'tableId': self.DESTINATION_TABLE, } + query_config['schemaUpdateOptions'] = [ + SchemaUpdateOption.ALLOW_FIELD_ADDITION, + ] klass = self._get_target_class() job = klass.from_api_repr(RESOURCE, client=client) self.assertIs(job._client, client) @@ -2854,6 +2864,7 @@ def test_begin_w_alternate_client(self): from google.cloud.bigquery.job import CreateDisposition from google.cloud.bigquery.job import QueryJobConfig from google.cloud.bigquery.job import QueryPriority + from google.cloud.bigquery.job import SchemaUpdateOption from google.cloud.bigquery.job import WriteDisposition PATH = '/projects/%s/jobs' % (self.PROJECT,) @@ -2879,7 +2890,10 @@ def test_begin_w_alternate_client(self): 'useLegacySql': True, 'writeDisposition': WriteDisposition.WRITE_TRUNCATE, 'maximumBillingTier': 4, - 'maximumBytesBilled': '123456' + 'maximumBytesBilled': '123456', + 'schemaUpdateOptions': [ + SchemaUpdateOption.ALLOW_FIELD_RELAXATION, + ] } RESOURCE['configuration']['query'] = QUERY_CONFIGURATION RESOURCE['configuration']['dryRun'] = True @@ -2903,6 +2917,9 @@ def test_begin_w_alternate_client(self): config.use_query_cache = True config.write_disposition = WriteDisposition.WRITE_TRUNCATE config.maximum_bytes_billed = 123456 + config.schema_update_options = [ + SchemaUpdateOption.ALLOW_FIELD_RELAXATION, + ] job = self._make_one( self.JOB_ID, self.QUERY, client1, job_config=config) From 602292c6a75a2fc69c9c01eeac852692226a3aa6 Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Wed, 30 May 2018 13:07:10 -0700 Subject: [PATCH 4/8] adds snippets for adding/relaxing column through load or query job --- docs/bigquery/snippets.py | 213 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 213 insertions(+) diff --git a/docs/bigquery/snippets.py b/docs/bigquery/snippets.py index 33939ff9ec3c..f10a2158b6f8 100644 --- a/docs/bigquery/snippets.py +++ b/docs/bigquery/snippets.py @@ -1365,6 +1365,126 @@ def test_load_table_from_uri_truncate(client, to_delete): # [END bigquery_load_table_gcs_parquet_truncate] +def test_load_table_add_column(client, to_delete): + dataset_id = 'load_table_add_column_{}'.format(_millis()) + dataset_ref = client.dataset(dataset_id) + dataset = bigquery.Dataset(dataset_ref) + dataset.location = 'US' + dataset = client.create_dataset(dataset) + to_delete.append(dataset) + + snippets_dir = os.path.abspath(os.path.dirname(__file__)) + filepath = os.path.join( + snippets_dir, '..', '..', 'bigquery', 'tests', 'data', 'people.csv') + + # [START bigquery_add_column_load_append] + # from google.cloud import bigquery + # client = bigquery.Client() + # dataset_ref = client.dataset('my_dataset') + # filepath = 'path/to/your_file.csv' + + table_ref = dataset_ref.table('my_table') + old_schema = [ + bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), + ] + table = client.create_table(bigquery.Table(table_ref, schema=old_schema)) + + # Configure the load job to append the data to a destination table, + # allowing field addition + job_config = bigquery.LoadJobConfig() + job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND + job_config.schema_update_options = [ + bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION, + ] + # 'REQUIRED' fields cannot be added to an existing schema, so the + # additional column must be 'NULLABLE' + job_config.schema = [ + bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), + bigquery.SchemaField('age', 'INTEGER', mode='NULLABLE'), + ] + job_config.source_format = bigquery.SourceFormat.CSV + job_config.skip_leading_rows = 1 + + with open(filepath, 'rb') as source_file: + job = client.load_table_from_file( + source_file, + table_ref, + location='US', # Must match the destination dataset location. + job_config=job_config) # API request + + job.result() # Waits for table load to complete. + + print('Loaded {} rows into {}:{}.'.format( + job.output_rows, dataset_id, table_ref.table_id)) + + # Verify that the column was added + table = client.get_table(table) + assert len(table.schema) == 2 + # [END bigquery_add_column_load_append] + assert table.num_rows > 0 + + +def test_load_table_relax_column(client, to_delete): + dataset_id = 'load_table_relax_column_{}'.format(_millis()) + dataset_ref = client.dataset(dataset_id) + dataset = bigquery.Dataset(dataset_ref) + dataset.location = 'US' + dataset = client.create_dataset(dataset) + to_delete.append(dataset) + + snippets_dir = os.path.abspath(os.path.dirname(__file__)) + filepath = os.path.join( + snippets_dir, '..', '..', 'bigquery', 'tests', 'data', 'people.csv') + + # [START bigquery_relax_column_load_append] + # from google.cloud import bigquery + # client = bigquery.Client() + # dataset_ref = client.dataset('my_dataset') + # filepath = 'path/to/your_file.csv' + + table_ref = dataset_ref.table('my_table') + old_schema = [ + bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), + bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED'), + bigquery.SchemaField('favorite_color', 'STRING', mode='REQUIRED'), + ] + table = client.create_table(bigquery.Table(table_ref, schema=old_schema)) + + # Configure the load job to append the data to a destination table, + # allowing field relaxation + job_config = bigquery.LoadJobConfig() + job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND + job_config.schema_update_options = [ + bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION, + ] + # The source file data to load has one less column than the existing table + job_config.schema = [ + bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), + bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED'), + ] + job_config.source_format = bigquery.SourceFormat.CSV + job_config.skip_leading_rows = 1 + + with open(filepath, 'rb') as source_file: + job = client.load_table_from_file( + source_file, + table_ref, + location='US', # Must match the destination dataset location. + job_config=job_config) # API request + + job.result() # Waits for table load to complete. + + print('Loaded {} rows into {}:{}.'.format( + job.output_rows, dataset_id, table_ref.table_id)) + + # Verify that the third column was relaxed + table = client.get_table(table) + assert len(table.schema) == 3 + assert table.schema[2].mode == 'NULLABLE' + # [END bigquery_relax_column_load_append] + assert table.num_rows > 0 + + def _write_csv_to_storage(bucket_name, blob_name, header_row, data_rows): import csv from google.cloud._testing import _NamedTemporaryFile @@ -1810,6 +1930,99 @@ def test_client_query_destination_table_cmek(client, to_delete): # [END bigquery_query_destination_table_cmek] +def test_client_query_relax_column(client, to_delete): + dataset_id = 'query_relax_column_{}'.format(_millis()) + dataset_ref = client.dataset(dataset_id) + dataset = bigquery.Dataset(dataset_ref) + dataset.location = 'US' + dataset = client.create_dataset(dataset) + to_delete.append(dataset) + + # [START bigquery_relax_column_query_append] + # from google.cloud import bigquery + # client = bigquery.Client() + # dataset_ref = client.dataset('my_dataset') + + table_ref = dataset_ref.table('my_table') + schema = [ + bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), + bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED'), + ] + table = client.create_table(bigquery.Table(table_ref, schema=schema)) + + # Configure the query to append the results to a destination table, + # allowing field relaxation + job_config = bigquery.QueryJobConfig() + job_config.schema_update_options = [ + bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION, + ] + job_config.destination = table_ref + job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND + + query_job = client.query( + # Write a query that will omit a required field + 'SELECT "Beyonce" as full_name;', + # Location must match that of the dataset(s) referenced in the query + # and of the destination table. + location='US', + job_config=job_config + ) # API request - starts the query + + query_job.result() # Waits for the query to finish + + # Verify that the column was relaxed and the results were appended + table = client.get_table(table) + assert table.schema[1].mode == 'NULLABLE' + assert table.num_rows > 0 + # [END bigquery_relax_column_query_append] + +def test_client_query_add_column(client, to_delete): + dataset_id = 'query_add_column_{}'.format(_millis()) + dataset_ref = client.dataset(dataset_id) + dataset = bigquery.Dataset(dataset_ref) + dataset.location = 'US' + dataset = client.create_dataset(dataset) + to_delete.append(dataset) + + # [START bigquery_add_column_query_append] + # from google.cloud import bigquery + # client = bigquery.Client() + # dataset_ref = client.dataset('my_dataset') + + table_ref = dataset_ref.table('my_table') + schema = [ + bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), + bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED'), + ] + table = client.create_table(bigquery.Table(table_ref, schema=schema)) + + # Configure the query to append the results to a destination table, + # allowing field addition + job_config = bigquery.QueryJobConfig() + job_config.schema_update_options = [ + bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION, + ] + job_config.destination = table_ref + job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND + + query_job = client.query( + # Write a query including a column not found in the destination table + 'SELECT "Timmy" as full_name, 85 as age, "Blue" as favorite_color;', + # Location must match that of the dataset(s) referenced in the query + # and of the destination table. + location='US', + job_config=job_config + ) # API request - starts the query + + query_job.result() # Waits for the query to finish + + # Verify that the column was added and the results were appended + table = client.get_table(table) + assert len(table.schema) == 3 + assert table.num_rows > 0 + # [END bigquery_add_column_query_append] + + def test_client_query_w_named_params(client, capsys): """Run a query using named query parameters""" From 82f0a1c435590fb0373e539fd15c611401bc8d46 Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Wed, 30 May 2018 13:18:26 -0700 Subject: [PATCH 5/8] add SchemaUpdateOption to reference page --- docs/bigquery/reference.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/bigquery/reference.rst b/docs/bigquery/reference.rst index c1546f915bd6..995e8a154a67 100644 --- a/docs/bigquery/reference.rst +++ b/docs/bigquery/reference.rst @@ -62,6 +62,7 @@ Job-Related Types job.QueryPriority job.SourceFormat job.WriteDisposition + job.SchemaUpdateOption Dataset From a78df1e692958af73cfaf17f8c1184201ae5e98e Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Wed, 30 May 2018 15:06:18 -0700 Subject: [PATCH 6/8] fixes lint --- docs/bigquery/snippets.py | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/bigquery/snippets.py b/docs/bigquery/snippets.py index f10a2158b6f8..9f6a556d1c2a 100644 --- a/docs/bigquery/snippets.py +++ b/docs/bigquery/snippets.py @@ -1976,6 +1976,7 @@ def test_client_query_relax_column(client, to_delete): assert table.num_rows > 0 # [END bigquery_relax_column_query_append] + def test_client_query_add_column(client, to_delete): dataset_id = 'query_add_column_{}'.format(_millis()) dataset_ref = client.dataset(dataset_id) From 1a3111051ff6d2a55414e4ca55e42de5af8f8ebc Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Wed, 30 May 2018 18:10:21 -0700 Subject: [PATCH 7/8] Refactors code samples to be clearer and closer to actual use case --- docs/bigquery/snippets.py | 134 ++++++++++++++++++++++++++------------ 1 file changed, 93 insertions(+), 41 deletions(-) diff --git a/docs/bigquery/snippets.py b/docs/bigquery/snippets.py index 9f6a556d1c2a..8e1a3afd695e 100644 --- a/docs/bigquery/snippets.py +++ b/docs/bigquery/snippets.py @@ -1376,6 +1376,11 @@ def test_load_table_add_column(client, to_delete): snippets_dir = os.path.abspath(os.path.dirname(__file__)) filepath = os.path.join( snippets_dir, '..', '..', 'bigquery', 'tests', 'data', 'people.csv') + table_ref = dataset_ref.table('my_table') + old_schema = [ + bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), + ] + table = client.create_table(bigquery.Table(table_ref, schema=old_schema)) # [START bigquery_add_column_load_append] # from google.cloud import bigquery @@ -1383,21 +1388,22 @@ def test_load_table_add_column(client, to_delete): # dataset_ref = client.dataset('my_dataset') # filepath = 'path/to/your_file.csv' - table_ref = dataset_ref.table('my_table') - old_schema = [ - bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), - ] - table = client.create_table(bigquery.Table(table_ref, schema=old_schema)) + # Retrieves the destination table and checks the length of the schema + table_id = 'my_table' + table_ref = dataset_ref.table(table_id) + table = client.get_table(table_ref) + print("Table {} contains {} columns.".format(table_id, len(table.schema))) - # Configure the load job to append the data to a destination table, + # Configures the load job to append the data to the destination table, # allowing field addition job_config = bigquery.LoadJobConfig() job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND job_config.schema_update_options = [ bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION, ] + # In this example, the existing table contains only the 'full_name' column. # 'REQUIRED' fields cannot be added to an existing schema, so the - # additional column must be 'NULLABLE' + # additional column must be 'NULLABLE'. job_config.schema = [ bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), bigquery.SchemaField('age', 'INTEGER', mode='NULLABLE'), @@ -1413,14 +1419,15 @@ def test_load_table_add_column(client, to_delete): job_config=job_config) # API request job.result() # Waits for table load to complete. - print('Loaded {} rows into {}:{}.'.format( job.output_rows, dataset_id, table_ref.table_id)) - # Verify that the column was added + # Checks the updated length of the schema table = client.get_table(table) - assert len(table.schema) == 2 + print("Table {} now contains {} columns.".format( + table_id, len(table.schema))) # [END bigquery_add_column_load_append] + assert len(table.schema) == 2 assert table.num_rows > 0 @@ -1435,13 +1442,6 @@ def test_load_table_relax_column(client, to_delete): snippets_dir = os.path.abspath(os.path.dirname(__file__)) filepath = os.path.join( snippets_dir, '..', '..', 'bigquery', 'tests', 'data', 'people.csv') - - # [START bigquery_relax_column_load_append] - # from google.cloud import bigquery - # client = bigquery.Client() - # dataset_ref = client.dataset('my_dataset') - # filepath = 'path/to/your_file.csv' - table_ref = dataset_ref.table('my_table') old_schema = [ bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), @@ -1450,14 +1450,32 @@ def test_load_table_relax_column(client, to_delete): ] table = client.create_table(bigquery.Table(table_ref, schema=old_schema)) - # Configure the load job to append the data to a destination table, + # [START bigquery_relax_column_load_append] + # from google.cloud import bigquery + # client = bigquery.Client() + # dataset_ref = client.dataset('my_dataset') + # filepath = 'path/to/your_file.csv' + + # Retrieves the destination table and checks the number of required fields + table_id = 'my_table' + table_ref = dataset_ref.table(table_id) + table = client.get_table(table_ref) + original_required_fields = sum( + field.mode == 'REQUIRED' for field in table.schema) + # In this example, the existing table has 3 required fields. + print("{} fields in the schema are required.".format( + original_required_fields)) + + # Configures the load job to append the data to a destination table, # allowing field relaxation job_config = bigquery.LoadJobConfig() job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND job_config.schema_update_options = [ bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION, ] - # The source file data to load has one less column than the existing table + # In this example, the existing table contains three required fields + # ('full_name', 'age', and 'favorite_color'), while the data to load + # contains only the first two fields. job_config.schema = [ bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED'), @@ -1473,15 +1491,19 @@ def test_load_table_relax_column(client, to_delete): job_config=job_config) # API request job.result() # Waits for table load to complete. - print('Loaded {} rows into {}:{}.'.format( job.output_rows, dataset_id, table_ref.table_id)) - # Verify that the third column was relaxed + # Checks the updated number of required fields table = client.get_table(table) + current_required_fields = sum( + field.mode == 'REQUIRED' for field in table.schema) + print("{} fields in the schema are now required.".format( + current_required_fields)) + # [END bigquery_relax_column_load_append] + assert original_required_fields - current_required_fields == 1 assert len(table.schema) == 3 assert table.schema[2].mode == 'NULLABLE' - # [END bigquery_relax_column_load_append] assert table.num_rows > 0 @@ -1938,19 +1960,30 @@ def test_client_query_relax_column(client, to_delete): dataset = client.create_dataset(dataset) to_delete.append(dataset) + dest_table_ref = dataset_ref.table('my_table') + schema = [ + bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), + bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED'), + ] + dest_table = client.create_table( + bigquery.Table(dest_table_ref, schema=schema)) + # [START bigquery_relax_column_query_append] # from google.cloud import bigquery # client = bigquery.Client() # dataset_ref = client.dataset('my_dataset') - table_ref = dataset_ref.table('my_table') - schema = [ - bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), - bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED'), - ] - table = client.create_table(bigquery.Table(table_ref, schema=schema)) + # Retrieves the destination table and checks the number of required fields + table_id = 'my_table' + table_ref = dataset_ref.table(table_id) + table = client.get_table(table_ref) + original_required_fields = sum( + field.mode == 'REQUIRED' for field in table.schema) + # In this example, the existing table has 2 required fields + print("{} fields in the schema are required.".format( + original_required_fields)) - # Configure the query to append the results to a destination table, + # Configures the query to append the results to a destination table, # allowing field relaxation job_config = bigquery.QueryJobConfig() job_config.schema_update_options = [ @@ -1960,7 +1993,8 @@ def test_client_query_relax_column(client, to_delete): job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND query_job = client.query( - # Write a query that will omit a required field + # In this example, the existing table contains 'full_name' and 'age' as + # required columns, but the query results will omit the second column. 'SELECT "Beyonce" as full_name;', # Location must match that of the dataset(s) referenced in the query # and of the destination table. @@ -1969,12 +2003,19 @@ def test_client_query_relax_column(client, to_delete): ) # API request - starts the query query_job.result() # Waits for the query to finish + print("Query job {} complete.".format(query_job.job_id)) - # Verify that the column was relaxed and the results were appended + # Checks the updated number of required fields table = client.get_table(table) + current_required_fields = sum( + field.mode == 'REQUIRED' for field in table.schema) + print("{} fields in the schema are now required.".format( + current_required_fields)) + # [END bigquery_relax_column_query_append] + assert original_required_fields - current_required_fields > 0 + assert len(table.schema) == 2 assert table.schema[1].mode == 'NULLABLE' assert table.num_rows > 0 - # [END bigquery_relax_column_query_append] def test_client_query_add_column(client, to_delete): @@ -1985,11 +2026,6 @@ def test_client_query_add_column(client, to_delete): dataset = client.create_dataset(dataset) to_delete.append(dataset) - # [START bigquery_add_column_query_append] - # from google.cloud import bigquery - # client = bigquery.Client() - # dataset_ref = client.dataset('my_dataset') - table_ref = dataset_ref.table('my_table') schema = [ bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), @@ -1997,7 +2033,18 @@ def test_client_query_add_column(client, to_delete): ] table = client.create_table(bigquery.Table(table_ref, schema=schema)) - # Configure the query to append the results to a destination table, + # [START bigquery_add_column_query_append] + # from google.cloud import bigquery + # client = bigquery.Client() + # dataset_ref = client.dataset('my_dataset') + + # Retrieves the destination table and checks the length of the schema + table_id = 'my_table' + table_ref = dataset_ref.table(table_id) + table = client.get_table(table_ref) + print("Table {} contains {} columns.".format(table_id, len(table.schema))) + + # Configures the query to append the results to a destination table, # allowing field addition job_config = bigquery.QueryJobConfig() job_config.schema_update_options = [ @@ -2007,7 +2054,9 @@ def test_client_query_add_column(client, to_delete): job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND query_job = client.query( - # Write a query including a column not found in the destination table + # In this example, the existing table contains only the 'full_name' and + # 'age' columns, while the results of this query will contain an + # additional 'favorite_color' column. 'SELECT "Timmy" as full_name, 85 as age, "Blue" as favorite_color;', # Location must match that of the dataset(s) referenced in the query # and of the destination table. @@ -2016,12 +2065,15 @@ def test_client_query_add_column(client, to_delete): ) # API request - starts the query query_job.result() # Waits for the query to finish + print("Query job {} complete.".format(query_job.job_id)) - # Verify that the column was added and the results were appended + # Checks the updated length of the schema table = client.get_table(table) + print("Table {} now contains {} columns.".format( + table_id, len(table.schema))) + # [END bigquery_add_column_query_append] assert len(table.schema) == 3 assert table.num_rows > 0 - # [END bigquery_add_column_query_append] def test_client_query_w_named_params(client, capsys): From 30d22a308e9694acd81d1a006a04f592774faa40 Mon Sep 17 00:00:00 2001 From: Alix Hamilton Date: Thu, 31 May 2018 08:53:36 -0700 Subject: [PATCH 8/8] fixes lint --- docs/bigquery/snippets.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/bigquery/snippets.py b/docs/bigquery/snippets.py index 8e1a3afd695e..05a9a3655141 100644 --- a/docs/bigquery/snippets.py +++ b/docs/bigquery/snippets.py @@ -1960,13 +1960,13 @@ def test_client_query_relax_column(client, to_delete): dataset = client.create_dataset(dataset) to_delete.append(dataset) - dest_table_ref = dataset_ref.table('my_table') + table_ref = dataset_ref.table('my_table') schema = [ bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED'), ] - dest_table = client.create_table( - bigquery.Table(dest_table_ref, schema=schema)) + table = client.create_table( + bigquery.Table(table_ref, schema=schema)) # [START bigquery_relax_column_query_append] # from google.cloud import bigquery