Skip to content

Commit 7ebfe51

Browse files
authored
Add an example for Dataproc PySpark Hudi (GoogleCloudPlatform#8828)
## Description Add an example for creating/writing/reading Hudi table with PySpark on Dataproc. ## Checklist - [ ] I have followed [Sample Guidelines from AUTHORING_GUIDE.MD](https://togithub.com/GoogleCloudPlatform/python-docs-samples/blob/main/AUTHORING_GUIDE.md) - [ ] README is updated to include [all relevant information](https://togithub.com/GoogleCloudPlatform/python-docs-samples/blob/main/AUTHORING_GUIDE.md#readme-file) - [ ] **Tests** pass: `nox -s py-3.9` (see [Test Environment Setup](https://togithub.com/GoogleCloudPlatform/python-docs-samples/blob/main/AUTHORING_GUIDE.md#test-environment-setup)) - [ ] **Lint** pass: `nox -s lint` (see [Test Environment Setup](https://togithub.com/GoogleCloudPlatform/python-docs-samples/blob/main/AUTHORING_GUIDE.md#test-environment-setup)) - [ ] These samples need a new **API enabled** in testing projects to pass (let us know which ones) - [ ] These samples need a new/updated **env vars** in testing projects set to pass (let us know which ones) - [ ] Please **merge** this PR for me once it is approved. - [ ] This sample adds a new sample directory, and I updated the [CODEOWNERS file](https://togithub.com/GoogleCloudPlatform/python-docs-samples/blob/main/.github/CODEOWNERS) with the codeowners for this sample
1 parent 935e10d commit 7ebfe51

File tree

1 file changed

+168
-0
lines changed

1 file changed

+168
-0
lines changed

dataproc/snippets/pyspark_hudi.py

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2022 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
"""Pyspark Hudi example."""
18+
19+
import sys
20+
21+
# pylint: disable=import-error
22+
from pyspark.sql import SparkSession
23+
24+
25+
def create_hudi_table(spark, table_name, table_uri):
26+
"""Creates Hudi table."""
27+
create_table_sql = f"""
28+
CREATE TABLE IF NOT EXISTS {table_name} (
29+
uuid string,
30+
begin_lat double,
31+
begin_lon double,
32+
end_lat double,
33+
end_lon double,
34+
driver string,
35+
rider string,
36+
fare double,
37+
partitionpath string,
38+
ts long
39+
) USING hudi
40+
LOCATION '{table_uri}'
41+
TBLPROPERTIES (
42+
type = 'cow',
43+
primaryKey = 'uuid',
44+
preCombineField = 'ts'
45+
)
46+
PARTITIONED BY (partitionpath)
47+
"""
48+
spark.sql(create_table_sql)
49+
50+
51+
def delete_hudi_table(spark, table_name):
52+
"""Deletes Hudi table."""
53+
spark.sql(f'DROP TABLE IF EXISTS {table_name}')
54+
55+
56+
def generate_test_dataframe(spark, n_rows):
57+
"""Generates test dataframe with Hudi's built-in data generator."""
58+
spark_context = spark.sparkContext
59+
# pylint: disable=protected-access
60+
utils = spark_context._jvm.org.apache.hudi.QuickstartUtils
61+
data_generator = utils.DataGenerator()
62+
inserts = utils.convertToStringList(data_generator.generateInserts(n_rows))
63+
return spark.read.json(spark_context.parallelize(inserts, 2))
64+
65+
66+
def write_hudi_table(name, uri, dataframe):
67+
"""Writes Hudi table."""
68+
options = {
69+
'hoodie.table.name': name,
70+
'hoodie.datasource.write.recordkey.field': 'uuid',
71+
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
72+
'hoodie.datasource.write.table.name': name,
73+
'hoodie.datasource.write.operation': 'upsert',
74+
'hoodie.datasource.write.precombine.field': 'ts',
75+
'hoodie.upsert.shuffle.parallelism': 2,
76+
'hoodie.insert.shuffle.parallelism': 2,
77+
}
78+
dataframe.write.format('hudi').options(**options).mode('append').save(uri)
79+
80+
81+
def query_commit_history(spark, name, uri):
82+
"""Query commit history."""
83+
tmp_table = f'{name}_commit_history'
84+
spark.read.format('hudi').load(uri).createOrReplaceTempView(tmp_table)
85+
query = f"""
86+
SELECT DISTINCT(_hoodie_commit_time)
87+
FROM {tmp_table}
88+
ORDER BY _hoodie_commit_time
89+
DESC
90+
"""
91+
return spark.sql(query)
92+
93+
94+
def read_hudi_table(spark, table_name, table_uri, commit_ts=''):
95+
"""Reads Hudi table at the given commit timestamp."""
96+
if commit_ts:
97+
options = {'as.of.instant': commit_ts}
98+
else:
99+
options = {}
100+
tmp_table = f'{table_name}_snapshot'
101+
spark.read.format('hudi').options(**options).load(
102+
table_uri
103+
).createOrReplaceTempView(tmp_table)
104+
query = f"""
105+
SELECT _hoodie_commit_time, begin_lat, begin_lon,
106+
driver, end_lat, end_lon, fare, partitionpath,
107+
rider, ts, uuid
108+
FROM {tmp_table}
109+
"""
110+
return spark.sql(query)
111+
112+
113+
def main():
114+
"""Test create write and read Hudi table."""
115+
if len(sys.argv) != 3:
116+
raise Exception('Expected arguments: <table_name> <table_uri>')
117+
118+
table_name = sys.argv[1]
119+
table_uri = sys.argv[2]
120+
121+
app_name = f'pyspark-hudi-test_{table_name}'
122+
print(f'Creating Spark session {app_name} ...')
123+
spark = SparkSession.builder.appName(app_name).getOrCreate()
124+
spark.sparkContext.setLogLevel('WARN')
125+
126+
print(f'Creating Hudi table {table_name} at {table_uri} ...')
127+
create_hudi_table(spark, table_name, table_uri)
128+
129+
print('Generating test data batch 1...')
130+
n_rows1 = 10
131+
input_df1 = generate_test_dataframe(spark, n_rows1)
132+
input_df1.show(truncate=False)
133+
134+
print('Writing Hudi table, batch 1 ...')
135+
write_hudi_table(table_name, table_uri, input_df1)
136+
137+
print('Generating test data batch 2...')
138+
n_rows2 = 10
139+
input_df2 = generate_test_dataframe(spark, n_rows2)
140+
input_df2.show(truncate=False)
141+
142+
print('Writing Hudi table, batch 2 ...')
143+
write_hudi_table(table_name, table_uri, input_df2)
144+
145+
print('Querying commit history ...')
146+
commits_df = query_commit_history(spark, table_name, table_uri)
147+
commits_df.show(truncate=False)
148+
# pylint: disable=protected-access
149+
previous_commit = commits_df.collect()[1]._hoodie_commit_time
150+
151+
print('Reading the Hudi table snapshot at the latest commit ...')
152+
output_df1 = read_hudi_table(spark, table_name, table_uri)
153+
output_df1.show(truncate=False)
154+
155+
print(f'Reading the Hudi table snapshot at {previous_commit} ...')
156+
output_df2 = read_hudi_table(spark, table_name, table_uri, previous_commit)
157+
output_df2.show(truncate=False)
158+
159+
print('Deleting Hudi table ...')
160+
delete_hudi_table(spark, table_name)
161+
162+
print('Stopping Spark session ...')
163+
spark.stop()
164+
165+
print('All done')
166+
167+
168+
main()

0 commit comments

Comments
 (0)