Skip to content

Commit 6e85c95

Browse files
authored
BigQuery: Adds Parquet SourceFormat and samples (googleapis#5057)
* adds parquet and samples * changes StringIO to BytesIO in snippets
1 parent 387f3e3 commit 6e85c95

3 files changed

Lines changed: 131 additions & 6 deletions

File tree

bigquery/google/cloud/bigquery/job.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,13 +154,14 @@ class SourceFormat(_EnumApiResourceProperty):
154154
155155
For CSV files, specify `CSV`. For datastore backups, specify
156156
`DATASTORE_BACKUP`. For newline-delimited json, specify
157-
`NEWLINE_DELIMITED_JSON`. For Avro, specify `AVRO`. The default
158-
value is `CSV`.
157+
`NEWLINE_DELIMITED_JSON`. For Avro, specify `AVRO`. For Parquet, specify
158+
`PARQUET`. The default value is `CSV`.
159159
"""
160160
CSV = 'CSV'
161161
DATASTORE_BACKUP = 'DATASTORE_BACKUP'
162162
NEWLINE_DELIMITED_JSON = 'NEWLINE_DELIMITED_JSON'
163163
AVRO = 'AVRO'
164+
PARQUET = 'PARQUET'
164165

165166

166167
class WriteDisposition(_EnumApiResourceProperty):

docs/bigquery/snippets.py

Lines changed: 110 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -931,6 +931,33 @@ def test_load_table_from_uri_cmek(client, to_delete):
931931
# [END bigquery_load_table_gcs_json_cmek]
932932

933933

934+
def test_load_table_from_uri_parquet(client, to_delete):
935+
dataset_id = 'load_table_dataset_{}'.format(_millis())
936+
dataset = bigquery.Dataset(client.dataset(dataset_id))
937+
client.create_dataset(dataset)
938+
to_delete.append(dataset)
939+
940+
# [START bigquery_load_table_gcs_parquet]
941+
# client = bigquery.Client()
942+
# dataset_id = 'my_dataset'
943+
dataset_ref = client.dataset(dataset_id)
944+
job_config = bigquery.LoadJobConfig()
945+
job_config.source_format = bigquery.SourceFormat.PARQUET
946+
947+
load_job = client.load_table_from_uri(
948+
'gs://cloud-samples-data/bigquery/us-states/us-states.parquet',
949+
dataset_ref.table('us_states'),
950+
job_config=job_config) # API request
951+
952+
assert load_job.job_type == 'load'
953+
954+
load_job.result() # Waits for table load to complete.
955+
956+
assert load_job.state == 'DONE'
957+
assert client.get_table(dataset_ref.table('us_states')).num_rows > 0
958+
# [END bigquery_load_table_gcs_parquet]
959+
960+
934961
def test_load_table_from_uri_autodetect(client, to_delete):
935962
dataset_id = 'load_table_dataset_{}'.format(_millis())
936963
dataset = bigquery.Dataset(client.dataset(dataset_id))
@@ -971,7 +998,7 @@ def test_load_table_from_uri_append(client, to_delete):
971998
bigquery.SchemaField('post_abbr', 'STRING')
972999
]
9731000
table_ref = dataset.table('us_states')
974-
body = six.StringIO('Washington,WA')
1001+
body = six.BytesIO(b'Washington,WA')
9751002
client.load_table_from_file(
9761003
body, table_ref, job_config=job_config).result()
9771004

@@ -997,6 +1024,45 @@ def test_load_table_from_uri_append(client, to_delete):
9971024
# [END bigquery_load_table_gcs_json_append]
9981025

9991026

1027+
def test_load_table_from_uri_parquet_append(client, to_delete):
1028+
dataset_id = 'load_table_dataset_{}'.format(_millis())
1029+
dataset = bigquery.Dataset(client.dataset(dataset_id))
1030+
client.create_dataset(dataset)
1031+
to_delete.append(dataset)
1032+
1033+
job_config = bigquery.LoadJobConfig()
1034+
job_config.schema = [
1035+
bigquery.SchemaField('name', 'STRING'),
1036+
bigquery.SchemaField('post_abbr', 'STRING')
1037+
]
1038+
table_ref = dataset.table('us_states')
1039+
body = six.BytesIO(b'Washington,WA')
1040+
client.load_table_from_file(
1041+
body, table_ref, job_config=job_config).result()
1042+
1043+
# [START bigquery_load_table_gcs_parquet_append]
1044+
# client = bigquery.Client()
1045+
# table_ref = client.dataset('my_dataset').table('existing_table')
1046+
previous_rows = client.get_table(table_ref).num_rows
1047+
job_config = bigquery.LoadJobConfig()
1048+
job_config.source_format = bigquery.SourceFormat.PARQUET
1049+
# The schema of the parquet file must match the table schema in an append
1050+
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
1051+
1052+
load_job = client.load_table_from_uri(
1053+
'gs://cloud-samples-data/bigquery/us-states/us-states.parquet',
1054+
table_ref,
1055+
job_config=job_config) # API request
1056+
1057+
assert load_job.job_type == 'load'
1058+
1059+
load_job.result() # Waits for table load to complete.
1060+
1061+
assert load_job.state == 'DONE'
1062+
assert client.get_table(table_ref).num_rows == previous_rows + 50
1063+
# [END bigquery_load_table_gcs_parquet_append]
1064+
1065+
10001066
def test_load_table_from_uri_truncate(client, to_delete):
10011067
dataset_id = 'load_table_dataset_{}'.format(_millis())
10021068
dataset = bigquery.Dataset(client.dataset(dataset_id))
@@ -1009,7 +1075,7 @@ def test_load_table_from_uri_truncate(client, to_delete):
10091075
bigquery.SchemaField('post_abbr', 'STRING')
10101076
]
10111077
table_ref = dataset.table('us_states')
1012-
body = six.StringIO('Washington,WA')
1078+
body = six.BytesIO(b'Washington,WA')
10131079
client.load_table_from_file(
10141080
body, table_ref, job_config=job_config).result()
10151081

@@ -1037,6 +1103,46 @@ def test_load_table_from_uri_truncate(client, to_delete):
10371103
# [END bigquery_load_table_gcs_json_truncate]
10381104

10391105

1106+
def test_load_table_from_uri_parquet_truncate(client, to_delete):
1107+
dataset_id = 'load_table_dataset_{}'.format(_millis())
1108+
dataset = bigquery.Dataset(client.dataset(dataset_id))
1109+
client.create_dataset(dataset)
1110+
to_delete.append(dataset)
1111+
1112+
job_config = bigquery.LoadJobConfig()
1113+
job_config.schema = [
1114+
bigquery.SchemaField('name', 'STRING'),
1115+
bigquery.SchemaField('post_abbr', 'STRING')
1116+
]
1117+
table_ref = dataset.table('us_states')
1118+
body = six.BytesIO(b'Washington,WA')
1119+
client.load_table_from_file(
1120+
body, table_ref, job_config=job_config).result()
1121+
1122+
# [START bigquery_load_table_gcs_parquet_truncate]
1123+
# client = bigquery.Client()
1124+
# table_ref = client.dataset('my_dataset').table('existing_table')
1125+
previous_rows = client.get_table(table_ref).num_rows
1126+
assert previous_rows > 0
1127+
1128+
job_config = bigquery.LoadJobConfig()
1129+
job_config.source_format = bigquery.SourceFormat.PARQUET
1130+
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
1131+
1132+
load_job = client.load_table_from_uri(
1133+
'gs://cloud-samples-data/bigquery/us-states/us-states.parquet',
1134+
table_ref,
1135+
job_config=job_config) # API request
1136+
1137+
assert load_job.job_type == 'load'
1138+
1139+
load_job.result() # Waits for table load to complete.
1140+
1141+
assert load_job.state == 'DONE'
1142+
assert client.get_table(table_ref).num_rows == 50
1143+
# [END bigquery_load_table_gcs_parquet_truncate]
1144+
1145+
10401146
def _write_csv_to_storage(bucket_name, blob_name, header_row, data_rows):
10411147
import csv
10421148
from google.cloud._testing import _NamedTemporaryFile
@@ -1103,14 +1209,14 @@ def test_copy_table_multiple_source(client, to_delete):
11031209
bigquery.SchemaField('post_abbr', 'STRING')
11041210
]
11051211

1106-
table_data = {'table1': 'Washington,WA', 'table2': 'California,CA'}
1212+
table_data = {'table1': b'Washington,WA', 'table2': b'California,CA'}
11071213
for table_id, data in table_data.items():
11081214
table_ref = source_dataset.table(table_id)
11091215
table = bigquery.Table(table_ref, schema=schema)
11101216
to_delete.insert(0, table)
11111217
job_config = bigquery.LoadJobConfig()
11121218
job_config.schema = schema
1113-
body = six.StringIO(data)
1219+
body = six.BytesIO(data)
11141220
client.load_table_from_file(
11151221
body, table_ref, job_config=job_config).result() # API request
11161222

docs/bigquery/usage.rst

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,12 @@ Load a JSON file from Cloud Storage:
223223
:start-after: [START bigquery_load_table_gcs_json]
224224
:end-before: [END bigquery_load_table_gcs_json]
225225

226+
Load a Parquet file from Cloud Storage:
227+
228+
.. literalinclude:: snippets.py
229+
:start-after: [START bigquery_load_table_gcs_parquet]
230+
:end-before: [END bigquery_load_table_gcs_parquet]
231+
226232
Load a JSON file from Cloud Storage, using an autodetected schema:
227233

228234
.. literalinclude:: snippets.py
@@ -235,12 +241,24 @@ Append a JSON file from Cloud Storage to an existing table:
235241
:start-after: [START bigquery_load_table_gcs_json_append]
236242
:end-before: [END bigquery_load_table_gcs_json_append]
237243

244+
Append a Parquet file from Cloud Storage to an existing table:
245+
246+
.. literalinclude:: snippets.py
247+
:start-after: [START bigquery_load_table_gcs_parquet_append]
248+
:end-before: [END bigquery_load_table_gcs_parquet_append]
249+
238250
Overwrite / replace an existing table with a JSON file from Cloud Storage:
239251

240252
.. literalinclude:: snippets.py
241253
:start-after: [START bigquery_load_table_gcs_json_truncate]
242254
:end-before: [END bigquery_load_table_gcs_json_truncate]
243255

256+
Overwrite / replace an existing table with a Parquet file from Cloud Storage:
257+
258+
.. literalinclude:: snippets.py
259+
:start-after: [START bigquery_load_table_gcs_parquet_truncate]
260+
:end-before: [END bigquery_load_table_gcs_parquet_truncate]
261+
244262
Customer Managed Encryption Keys
245263
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
246264

0 commit comments

Comments
 (0)