2626
2727PROJECT_ID = os .environ ["GOOGLE_CLOUD_PROJECT" ]
2828REGION = "us-central1"
29- CLUSTER_NAME = "py-qs-test-{}" .format (str (uuid .uuid4 ()))
30- STAGING_BUCKET = "py-dataproc-qs-bucket-{}" .format (str (uuid .uuid4 ()))
29+
3130JOB_FILE_NAME = "sum.py"
32- JOB_FILE_PATH = "gs://{}/{}" .format (STAGING_BUCKET , JOB_FILE_NAME )
3331SORT_CODE = (
3432 "import pyspark\n "
3533 "sc = pyspark.SparkContext()\n "
3836)
3937
4038
41- @pytest .fixture (autouse = True )
42- def blob ():
43- storage_client = storage .Client ()
39+ @backoff .on_exception (backoff .expo , ServiceUnavailable , max_tries = 5 )
40+ def delete_bucket (bucket ):
41+ bucket .delete ()
42+
43+
44+ @backoff .on_exception (backoff .expo , ServiceUnavailable , max_tries = 5 )
45+ def delete_blob (blob ):
46+ blob .delete ()
4447
45- @backoff .on_exception (backoff .expo , ServiceUnavailable , max_tries = 5 )
46- def create_bucket ():
47- return storage_client .create_bucket (STAGING_BUCKET )
4848
49- bucket = create_bucket ()
49+ @backoff .on_exception (backoff .expo , ServiceUnavailable , max_tries = 5 )
50+ def upload_blob (bucket , contents ):
5051 blob = bucket .blob (JOB_FILE_NAME )
51- blob .upload_from_string (SORT_CODE )
52+ blob .upload_from_string (contents )
53+ return blob
5254
53- yield
5455
55- blob .delete ()
56- bucket .delete ()
56+ @backoff .on_exception (backoff .expo , ServiceUnavailable , max_tries = 5 )
57+ def create_bucket (bucket_name ):
58+ storage_client = storage .Client ()
59+ return storage_client .create_bucket (bucket_name )
5760
5861
59- @pytest .fixture (autouse = True )
60- def cluster ():
61- yield
62+ @pytest .fixture (scope = "module" )
63+ def staging_bucket_name ():
64+ bucket_name = "py-dataproc-qs-bucket-{}" .format (str (uuid .uuid4 ()))
65+ bucket = create_bucket (bucket_name )
66+ blob = upload_blob (bucket , SORT_CODE )
67+ try :
68+ yield bucket_name
69+ finally :
70+ delete_blob (blob )
71+ delete_bucket (bucket )
6272
73+
74+ @backoff .on_exception (backoff .expo , ServiceUnavailable , max_tries = 5 )
75+ def verify_cluster_teardown (cluster_name ):
6376 # The quickstart sample deletes the cluster, but if the test fails
6477 # before cluster deletion occurs, it can be manually deleted here.
6578 cluster_client = dataproc .ClusterControllerClient (
@@ -69,23 +82,31 @@ def cluster():
6982 clusters = cluster_client .list_clusters (
7083 request = {"project_id" : PROJECT_ID , "region" : REGION }
7184 )
72-
7385 for cluster in clusters :
74- if cluster .cluster_name == CLUSTER_NAME :
86+ if cluster .cluster_name == cluster_name :
7587 cluster_client .delete_cluster (
7688 request = {
7789 "project_id" : PROJECT_ID ,
7890 "region" : REGION ,
79- "cluster_name" : CLUSTER_NAME ,
91+ "cluster_name" : cluster_name ,
8092 }
8193 )
8294
8395
8496@backoff .on_exception (backoff .expo , InvalidArgument , max_tries = 3 )
85- def test_quickstart (capsys ):
86- quickstart .quickstart (PROJECT_ID , REGION , CLUSTER_NAME , JOB_FILE_PATH )
87- out , _ = capsys .readouterr ()
88-
89- assert "Cluster created successfully" in out
90- assert "Job finished successfully" in out
91- assert "successfully deleted" in out
97+ def test_quickstart (capsys , staging_bucket_name ):
98+ cluster_name = "py-qs-test-{}" .format (str (uuid .uuid4 ()))
99+ try :
100+ quickstart .quickstart (
101+ PROJECT_ID ,
102+ REGION ,
103+ cluster_name ,
104+ "gs://{}/{}" .format (staging_bucket_name , JOB_FILE_NAME )
105+ )
106+ out , _ = capsys .readouterr ()
107+
108+ assert "Cluster created successfully" in out
109+ assert "Job finished successfully" in out
110+ assert "successfully deleted" in out
111+ finally :
112+ verify_cluster_teardown (cluster_name )
0 commit comments