File tree Expand file tree Collapse file tree 1 file changed +44
-0
lines changed
data-science-onramp/data-processing Expand file tree Collapse file tree 1 file changed +44
-0
lines changed Original file line number Diff line number Diff line change 1+ import os
2+ import sys
3+
4+ from py4j .protocol import Py4JJavaError
5+ from pyspark .sql import SparkSession
6+ from pyspark .sql .functions import UserDefinedFunction , lit
7+ from pyspark .sql .types import IntegerType , StringType
8+
9+
10+ PROJECT_ID = sys .argv [1 ]
11+ BUCKET_NAME = sys .argv [2 ]
12+ TABLE = f'{ PROJECT_ID } .new_york_citibike_trips.RAW_DATA'
13+
14+ def station_name (name ):
15+ if name :
16+ return name .replace ('/' , '&' )
17+ else :
18+ return ''
19+
20+ def main ():
21+ '''...'''
22+ # Create a SparkSession under the name 'clean'. Viewable via the Spark UI
23+ spark = SparkSession .builder .appName ('clean' ).getOrCreate ()
24+
25+ # Check if table exists
26+
27+ try :
28+ df = spark .read .format ('bigquery' ).option ('table' , TABLE ).load ()
29+ except Py4JJavaError :
30+ print (f"{ TABLE } does not exist. " )
31+ return
32+
33+ udf_map = {
34+ 'start_station_name' : (station_name , StringType ())
35+ }
36+
37+ for name , (func , col_type ) in udf_map .items ():
38+ df = df .withColumn (name , UserDefinedFunction (func , col_type )(name ).alias (name ))
39+
40+ df = spark .createDataframe
41+ df .show (n = 100 )
42+
43+ if __name__ == '__main__' :
44+ main ()
You can’t perform that action at this time.
0 commit comments