diff --git a/bigquery/google/cloud/bigquery/__init__.py b/bigquery/google/cloud/bigquery/__init__.py index 503c83bfb85c..751efd5a671b 100644 --- a/bigquery/google/cloud/bigquery/__init__.py +++ b/bigquery/google/cloud/bigquery/__init__.py @@ -49,6 +49,7 @@ from google.cloud.bigquery.job import QueryJob from google.cloud.bigquery.job import QueryJobConfig from google.cloud.bigquery.job import QueryPriority +from google.cloud.bigquery.job import SchemaUpdateOption from google.cloud.bigquery.job import SourceFormat from google.cloud.bigquery.job import UnknownJob from google.cloud.bigquery.job import WriteDisposition @@ -113,6 +114,7 @@ 'DestinationFormat', 'Encoding', 'QueryPriority', + 'SchemaUpdateOption', 'SourceFormat', 'WriteDisposition' ] diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 393dcdf5dad7..5026445f0287 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -206,6 +206,18 @@ class WriteDisposition(object): returned in the job result.""" +class SchemaUpdateOption(object): + """Specifies an update to the destination table schema as a side effect of + a load job. + """ + + ALLOW_FIELD_ADDITION = 'ALLOW_FIELD_ADDITION' + """Allow adding a nullable field to the schema.""" + + ALLOW_FIELD_RELAXATION = 'ALLOW_FIELD_RELAXATION' + """Allow relaxing a required field in the original schema to nullable.""" + + class _JobReference(object): """A reference to a job. @@ -1004,6 +1016,18 @@ def time_partitioning(self, value): api_repr = value.to_api_repr() self._set_sub_prop('timePartitioning', api_repr) + @property + def schema_update_options(self): + """List[google.cloud.bigquery.job.SchemaUpdateOption]: Specifies + updates to the destination table schema to allow as a side effect of + the load job. + """ + return self._get_sub_prop('schemaUpdateOptions') + + @schema_update_options.setter + def schema_update_options(self, values): + self._set_sub_prop('schemaUpdateOptions', values) + class LoadJob(_AsyncJob): """Asynchronous job for loading data into a table. @@ -1158,6 +1182,13 @@ def time_partitioning(self): """ return self._configuration.time_partitioning + @property + def schema_update_options(self): + """See + :attr:`google.cloud.bigquery.job.LoadJobConfig.schema_update_options`. + """ + return self._configuration.schema_update_options + @property def input_file_bytes(self): """Count of bytes loaded from source files. @@ -1971,6 +2002,18 @@ def time_partitioning(self, value): api_repr = value.to_api_repr() self._set_sub_prop('timePartitioning', api_repr) + @property + def schema_update_options(self): + """List[google.cloud.bigquery.job.SchemaUpdateOption]: Specifies + updates to the destination table schema to allow as a side effect of + the query job. + """ + return self._get_sub_prop('schemaUpdateOptions') + + @schema_update_options.setter + def schema_update_options(self, values): + self._set_sub_prop('schemaUpdateOptions', values) + def to_api_repr(self): """Build an API representation of the query job config. @@ -2149,6 +2192,13 @@ def time_partitioning(self): """ return self._configuration.time_partitioning + @property + def schema_update_options(self): + """See + :attr:`google.cloud.bigquery.job.QueryJobConfig.schema_update_options`. + """ + return self._configuration.schema_update_options + def _build_resource(self): """Generate a resource for :meth:`begin`.""" configuration = self._configuration.to_api_repr() diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index a76d05f4a077..a8d0ed96299f 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -380,6 +380,11 @@ def _verifyEnumConfigProperties(self, job, config): config['writeDisposition']) else: self.assertIsNone(job.write_disposition) + if 'schemaUpdateOptions' in config: + self.assertEqual( + job.schema_update_options, config['schemaUpdateOptions']) + else: + self.assertIsNone(job.schema_update_options) def _verifyResourceProperties(self, job, resource): self._verifyReadonlyResourceProperties(job, resource) @@ -467,6 +472,7 @@ def test_ctor(self): self.assertIsNone(job.write_disposition) self.assertIsNone(job.destination_encryption_configuration) self.assertIsNone(job.time_partitioning) + self.assertIsNone(job.schema_update_options) def test_ctor_w_config(self): from google.cloud.bigquery.schema import SchemaField @@ -780,6 +786,7 @@ def test_begin_w_autodetect(self): def test_begin_w_alternate_client(self): from google.cloud.bigquery.job import CreateDisposition + from google.cloud.bigquery.job import SchemaUpdateOption from google.cloud.bigquery.job import WriteDisposition from google.cloud.bigquery.schema import SchemaField @@ -817,7 +824,10 @@ def test_begin_w_alternate_client(self): 'mode': 'REQUIRED', 'description': None, }, - ]} + ]}, + 'schemaUpdateOptions': [ + SchemaUpdateOption.ALLOW_FIELD_ADDITION, + ], } RESOURCE['configuration']['load'] = LOAD_CONFIGURATION conn1 = _make_connection() @@ -842,6 +852,9 @@ def test_begin_w_alternate_client(self): config.skip_leading_rows = 1 config.source_format = 'CSV' config.write_disposition = WriteDisposition.WRITE_TRUNCATE + config.schema_update_options = [ + SchemaUpdateOption.ALLOW_FIELD_ADDITION, + ] job._begin(client=client2) @@ -2127,6 +2140,11 @@ def _verifyResourceProperties(self, job, resource): 'kmsKeyName']) else: self.assertIsNone(job.destination_encryption_configuration) + if 'schemaUpdateOptions' in query_config: + self.assertEqual( + job.schema_update_options, query_config['schemaUpdateOptions']) + else: + self.assertIsNone(job.schema_update_options) def test_ctor_defaults(self): client = _make_client(project=self.PROJECT) @@ -2157,6 +2175,7 @@ def test_ctor_defaults(self): self.assertIsNone(job.table_definitions) self.assertIsNone(job.destination_encryption_configuration) self.assertIsNone(job.time_partitioning) + self.assertIsNone(job.schema_update_options) def test_ctor_w_udf_resources(self): from google.cloud.bigquery.job import QueryJobConfig @@ -2248,6 +2267,7 @@ def test_from_api_repr_with_encryption(self): def test_from_api_repr_w_properties(self): from google.cloud.bigquery.job import CreateDisposition + from google.cloud.bigquery.job import SchemaUpdateOption from google.cloud.bigquery.job import WriteDisposition client = _make_client(project=self.PROJECT) @@ -2260,6 +2280,9 @@ def test_from_api_repr_w_properties(self): 'datasetId': self.DS_ID, 'tableId': self.DESTINATION_TABLE, } + query_config['schemaUpdateOptions'] = [ + SchemaUpdateOption.ALLOW_FIELD_ADDITION, + ] klass = self._get_target_class() job = klass.from_api_repr(RESOURCE, client=client) self.assertIs(job._client, client) @@ -2841,6 +2864,7 @@ def test_begin_w_alternate_client(self): from google.cloud.bigquery.job import CreateDisposition from google.cloud.bigquery.job import QueryJobConfig from google.cloud.bigquery.job import QueryPriority + from google.cloud.bigquery.job import SchemaUpdateOption from google.cloud.bigquery.job import WriteDisposition PATH = '/projects/%s/jobs' % (self.PROJECT,) @@ -2866,7 +2890,10 @@ def test_begin_w_alternate_client(self): 'useLegacySql': True, 'writeDisposition': WriteDisposition.WRITE_TRUNCATE, 'maximumBillingTier': 4, - 'maximumBytesBilled': '123456' + 'maximumBytesBilled': '123456', + 'schemaUpdateOptions': [ + SchemaUpdateOption.ALLOW_FIELD_RELAXATION, + ] } RESOURCE['configuration']['query'] = QUERY_CONFIGURATION RESOURCE['configuration']['dryRun'] = True @@ -2890,6 +2917,9 @@ def test_begin_w_alternate_client(self): config.use_query_cache = True config.write_disposition = WriteDisposition.WRITE_TRUNCATE config.maximum_bytes_billed = 123456 + config.schema_update_options = [ + SchemaUpdateOption.ALLOW_FIELD_RELAXATION, + ] job = self._make_one( self.JOB_ID, self.QUERY, client1, job_config=config) diff --git a/docs/bigquery/reference.rst b/docs/bigquery/reference.rst index c1546f915bd6..995e8a154a67 100644 --- a/docs/bigquery/reference.rst +++ b/docs/bigquery/reference.rst @@ -62,6 +62,7 @@ Job-Related Types job.QueryPriority job.SourceFormat job.WriteDisposition + job.SchemaUpdateOption Dataset diff --git a/docs/bigquery/snippets.py b/docs/bigquery/snippets.py index 33939ff9ec3c..05a9a3655141 100644 --- a/docs/bigquery/snippets.py +++ b/docs/bigquery/snippets.py @@ -1365,6 +1365,148 @@ def test_load_table_from_uri_truncate(client, to_delete): # [END bigquery_load_table_gcs_parquet_truncate] +def test_load_table_add_column(client, to_delete): + dataset_id = 'load_table_add_column_{}'.format(_millis()) + dataset_ref = client.dataset(dataset_id) + dataset = bigquery.Dataset(dataset_ref) + dataset.location = 'US' + dataset = client.create_dataset(dataset) + to_delete.append(dataset) + + snippets_dir = os.path.abspath(os.path.dirname(__file__)) + filepath = os.path.join( + snippets_dir, '..', '..', 'bigquery', 'tests', 'data', 'people.csv') + table_ref = dataset_ref.table('my_table') + old_schema = [ + bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), + ] + table = client.create_table(bigquery.Table(table_ref, schema=old_schema)) + + # [START bigquery_add_column_load_append] + # from google.cloud import bigquery + # client = bigquery.Client() + # dataset_ref = client.dataset('my_dataset') + # filepath = 'path/to/your_file.csv' + + # Retrieves the destination table and checks the length of the schema + table_id = 'my_table' + table_ref = dataset_ref.table(table_id) + table = client.get_table(table_ref) + print("Table {} contains {} columns.".format(table_id, len(table.schema))) + + # Configures the load job to append the data to the destination table, + # allowing field addition + job_config = bigquery.LoadJobConfig() + job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND + job_config.schema_update_options = [ + bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION, + ] + # In this example, the existing table contains only the 'full_name' column. + # 'REQUIRED' fields cannot be added to an existing schema, so the + # additional column must be 'NULLABLE'. + job_config.schema = [ + bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), + bigquery.SchemaField('age', 'INTEGER', mode='NULLABLE'), + ] + job_config.source_format = bigquery.SourceFormat.CSV + job_config.skip_leading_rows = 1 + + with open(filepath, 'rb') as source_file: + job = client.load_table_from_file( + source_file, + table_ref, + location='US', # Must match the destination dataset location. + job_config=job_config) # API request + + job.result() # Waits for table load to complete. + print('Loaded {} rows into {}:{}.'.format( + job.output_rows, dataset_id, table_ref.table_id)) + + # Checks the updated length of the schema + table = client.get_table(table) + print("Table {} now contains {} columns.".format( + table_id, len(table.schema))) + # [END bigquery_add_column_load_append] + assert len(table.schema) == 2 + assert table.num_rows > 0 + + +def test_load_table_relax_column(client, to_delete): + dataset_id = 'load_table_relax_column_{}'.format(_millis()) + dataset_ref = client.dataset(dataset_id) + dataset = bigquery.Dataset(dataset_ref) + dataset.location = 'US' + dataset = client.create_dataset(dataset) + to_delete.append(dataset) + + snippets_dir = os.path.abspath(os.path.dirname(__file__)) + filepath = os.path.join( + snippets_dir, '..', '..', 'bigquery', 'tests', 'data', 'people.csv') + table_ref = dataset_ref.table('my_table') + old_schema = [ + bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), + bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED'), + bigquery.SchemaField('favorite_color', 'STRING', mode='REQUIRED'), + ] + table = client.create_table(bigquery.Table(table_ref, schema=old_schema)) + + # [START bigquery_relax_column_load_append] + # from google.cloud import bigquery + # client = bigquery.Client() + # dataset_ref = client.dataset('my_dataset') + # filepath = 'path/to/your_file.csv' + + # Retrieves the destination table and checks the number of required fields + table_id = 'my_table' + table_ref = dataset_ref.table(table_id) + table = client.get_table(table_ref) + original_required_fields = sum( + field.mode == 'REQUIRED' for field in table.schema) + # In this example, the existing table has 3 required fields. + print("{} fields in the schema are required.".format( + original_required_fields)) + + # Configures the load job to append the data to a destination table, + # allowing field relaxation + job_config = bigquery.LoadJobConfig() + job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND + job_config.schema_update_options = [ + bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION, + ] + # In this example, the existing table contains three required fields + # ('full_name', 'age', and 'favorite_color'), while the data to load + # contains only the first two fields. + job_config.schema = [ + bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), + bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED'), + ] + job_config.source_format = bigquery.SourceFormat.CSV + job_config.skip_leading_rows = 1 + + with open(filepath, 'rb') as source_file: + job = client.load_table_from_file( + source_file, + table_ref, + location='US', # Must match the destination dataset location. + job_config=job_config) # API request + + job.result() # Waits for table load to complete. + print('Loaded {} rows into {}:{}.'.format( + job.output_rows, dataset_id, table_ref.table_id)) + + # Checks the updated number of required fields + table = client.get_table(table) + current_required_fields = sum( + field.mode == 'REQUIRED' for field in table.schema) + print("{} fields in the schema are now required.".format( + current_required_fields)) + # [END bigquery_relax_column_load_append] + assert original_required_fields - current_required_fields == 1 + assert len(table.schema) == 3 + assert table.schema[2].mode == 'NULLABLE' + assert table.num_rows > 0 + + def _write_csv_to_storage(bucket_name, blob_name, header_row, data_rows): import csv from google.cloud._testing import _NamedTemporaryFile @@ -1810,6 +1952,130 @@ def test_client_query_destination_table_cmek(client, to_delete): # [END bigquery_query_destination_table_cmek] +def test_client_query_relax_column(client, to_delete): + dataset_id = 'query_relax_column_{}'.format(_millis()) + dataset_ref = client.dataset(dataset_id) + dataset = bigquery.Dataset(dataset_ref) + dataset.location = 'US' + dataset = client.create_dataset(dataset) + to_delete.append(dataset) + + table_ref = dataset_ref.table('my_table') + schema = [ + bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), + bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED'), + ] + table = client.create_table( + bigquery.Table(table_ref, schema=schema)) + + # [START bigquery_relax_column_query_append] + # from google.cloud import bigquery + # client = bigquery.Client() + # dataset_ref = client.dataset('my_dataset') + + # Retrieves the destination table and checks the number of required fields + table_id = 'my_table' + table_ref = dataset_ref.table(table_id) + table = client.get_table(table_ref) + original_required_fields = sum( + field.mode == 'REQUIRED' for field in table.schema) + # In this example, the existing table has 2 required fields + print("{} fields in the schema are required.".format( + original_required_fields)) + + # Configures the query to append the results to a destination table, + # allowing field relaxation + job_config = bigquery.QueryJobConfig() + job_config.schema_update_options = [ + bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION, + ] + job_config.destination = table_ref + job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND + + query_job = client.query( + # In this example, the existing table contains 'full_name' and 'age' as + # required columns, but the query results will omit the second column. + 'SELECT "Beyonce" as full_name;', + # Location must match that of the dataset(s) referenced in the query + # and of the destination table. + location='US', + job_config=job_config + ) # API request - starts the query + + query_job.result() # Waits for the query to finish + print("Query job {} complete.".format(query_job.job_id)) + + # Checks the updated number of required fields + table = client.get_table(table) + current_required_fields = sum( + field.mode == 'REQUIRED' for field in table.schema) + print("{} fields in the schema are now required.".format( + current_required_fields)) + # [END bigquery_relax_column_query_append] + assert original_required_fields - current_required_fields > 0 + assert len(table.schema) == 2 + assert table.schema[1].mode == 'NULLABLE' + assert table.num_rows > 0 + + +def test_client_query_add_column(client, to_delete): + dataset_id = 'query_add_column_{}'.format(_millis()) + dataset_ref = client.dataset(dataset_id) + dataset = bigquery.Dataset(dataset_ref) + dataset.location = 'US' + dataset = client.create_dataset(dataset) + to_delete.append(dataset) + + table_ref = dataset_ref.table('my_table') + schema = [ + bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), + bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED'), + ] + table = client.create_table(bigquery.Table(table_ref, schema=schema)) + + # [START bigquery_add_column_query_append] + # from google.cloud import bigquery + # client = bigquery.Client() + # dataset_ref = client.dataset('my_dataset') + + # Retrieves the destination table and checks the length of the schema + table_id = 'my_table' + table_ref = dataset_ref.table(table_id) + table = client.get_table(table_ref) + print("Table {} contains {} columns.".format(table_id, len(table.schema))) + + # Configures the query to append the results to a destination table, + # allowing field addition + job_config = bigquery.QueryJobConfig() + job_config.schema_update_options = [ + bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION, + ] + job_config.destination = table_ref + job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND + + query_job = client.query( + # In this example, the existing table contains only the 'full_name' and + # 'age' columns, while the results of this query will contain an + # additional 'favorite_color' column. + 'SELECT "Timmy" as full_name, 85 as age, "Blue" as favorite_color;', + # Location must match that of the dataset(s) referenced in the query + # and of the destination table. + location='US', + job_config=job_config + ) # API request - starts the query + + query_job.result() # Waits for the query to finish + print("Query job {} complete.".format(query_job.job_id)) + + # Checks the updated length of the schema + table = client.get_table(table) + print("Table {} now contains {} columns.".format( + table_id, len(table.schema))) + # [END bigquery_add_column_query_append] + assert len(table.schema) == 3 + assert table.num_rows > 0 + + def test_client_query_w_named_params(client, capsys): """Run a query using named query parameters"""