|
25 | 25 | from airflow import models |
26 | 26 | from airflow.providers.google.cloud.operators.dataproc import ( |
27 | 27 | DataprocCreateClusterOperator, |
28 | | - DataprocCreateWorkflowTemplateOperator, |
29 | 28 | DataprocDeleteClusterOperator, |
30 | | - DataprocInstantiateWorkflowTemplateOperator, |
31 | 29 | DataprocSubmitJobOperator, |
32 | | - DataprocUpdateClusterOperator, |
33 | 30 | ) |
34 | 31 | from airflow.providers.google.cloud.sensors.dataproc import DataprocJobSensor |
35 | 32 | from airflow.utils.dates import days_ago |
36 | 33 |
|
37 | 34 | PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "leah-playground") |
38 | 35 | CLUSTER_NAME = os.environ.get("GCP_DATAPROC_CLUSTER_NAME", "cluster-0c23") |
39 | 36 | REGION = os.environ.get("GCP_LOCATION", "us-central1") |
40 | | -# ZONE = os.environ.get("GCP_REGION", "europe-west1-b") |
41 | | -# BUCKET = os.environ.get("GCP_DATAPROC_BUCKET", "dataproc-system-tests") |
42 | | -# OUTPUT_FOLDER = "wordcount" |
43 | | -# OUTPUT_PATH = f"gs://{BUCKET}/{OUTPUT_FOLDER}/" |
44 | | -# PYSPARK_MAIN = os.environ.get("PYSPARK_MAIN", "hello_world.py") |
45 | | -# PYSPARK_URI = f"gs://{BUCKET}/{PYSPARK_MAIN}" |
46 | | -# SPARKR_MAIN = os.environ.get("SPARKR_MAIN", "hello_world.R") |
47 | | -# SPARKR_URI = f"gs://{BUCKET}/{SPARKR_MAIN}" |
48 | 37 |
|
49 | 38 | # Cluster definition |
50 | 39 | # [START how_to_cloud_dataproc_create_cluster] |
|
64 | 53 |
|
65 | 54 | # [END how_to_cloud_dataproc_create_cluster] |
66 | 55 |
|
67 | | -# # Update options |
68 | | -# # [START how_to_cloud_dataproc_updatemask_cluster_operator] |
69 | | -# CLUSTER_UPDATE = { |
70 | | -# "config": {"worker_config": {"num_instances": 3}, "secondary_worker_config": {"num_instances": 3}} |
71 | | -# } |
72 | | -# UPDATE_MASK = { |
73 | | -# "paths": ["config.worker_config.num_instances", "config.secondary_worker_config.num_instances"] |
74 | | -# } |
75 | | -# # [END how_to_cloud_dataproc_updatemask_cluster_operator] |
76 | | - |
77 | | -# TIMEOUT = {"seconds": 1 * 24 * 60 * 60} |
78 | | - |
79 | | -# [START how_to_cloud_dataproc_pyspark_config] |
80 | | -# PYSPARK_JOB = { |
81 | | -# "reference": {"project_id": PROJECT_ID}, |
82 | | -# "placement": {"cluster_name": CLUSTER_NAME}, |
83 | | -# "pyspark_job": {"main_python_file_uri": PYSPARK_URI}, |
84 | | -# } |
85 | 56 | PYSPARK_JOB = { |
86 | 57 | "reference": {"project_id": "leah-playground"}, |
87 | 58 | "placement": {"cluster_name": CLUSTER_NAME}, |
|
101 | 72 | ) |
102 | 73 | # # [END how_to_cloud_dataproc_create_cluster_operator] |
103 | 74 |
|
104 | | - # # [START how_to_cloud_dataproc_update_cluster_operator] |
105 | | - # scale_cluster = DataprocUpdateClusterOperator( |
106 | | - # task_id="scale_cluster", |
107 | | - # cluster_name=CLUSTER_NAME, |
108 | | - # cluster=CLUSTER_UPDATE, |
109 | | - # update_mask=UPDATE_MASK, |
110 | | - # graceful_decommission_timeout=TIMEOUT, |
111 | | - # project_id=PROJECT_ID, |
112 | | - # location=REGION, |
113 | | - # ) |
114 | | - # # [END how_to_cloud_dataproc_update_cluster_operator] |
115 | | - |
116 | 75 |
|
117 | 76 | # [START how_to_cloud_dataproc_submit_job_to_cluster_operator] |
118 | 77 | pyspark_task = DataprocSubmitJobOperator( |
|
127 | 86 | ) |
128 | 87 | # # [END how_to_cloud_dataproc_delete_cluster_operator] |
129 | 88 | create_cluster >> pyspark_task >> delete_cluster |
130 | | - # create_cluster >> scale_cluster |
131 | | - # scale_cluster >> create_workflow_template >> trigger_workflow >> delete_cluster |
132 | | - # scale_cluster >> hive_task >> delete_cluster |
133 | | - # scale_cluster >> pig_task >> delete_cluster |
134 | | - # scale_cluster >> spark_sql_task >> delete_cluster |
135 | | - # scale_cluster >> spark_task >> delete_cluster |
136 | | - # scale_cluster >> spark_task_async >> spark_task_async_sensor >> delete_cluster |
137 | | - # scale_cluster >> pyspark_task >> delete_cluster |
138 | | - # scale_cluster >> sparkr_task >> delete_cluster |
139 | | - # scale_cluster >> hadoop_task >> delete_cluster |
| 89 | + |
0 commit comments