Skip to content

Commit 45d38ad

Browse files
feat: Batch create job with Pub/Sub notification sample (GoogleCloudPlatform#12048)
* Add create_with_pubsub_notifications.py and update test_basics.py - Added new script to create jobs with Pub/Sub notifications in create_with_pubsub_notifications.py. - Update tests for batch creation with PubSub notifications
1 parent ee5a351 commit 45d38ad

File tree

2 files changed

+157
-0
lines changed

2 files changed

+157
-0
lines changed
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import google.auth
16+
17+
# [START batch_notifications]
18+
from google.cloud import batch_v1
19+
20+
21+
def create_with_pubsub_notification_job(
22+
project_id: str, region: str, job_name: str, topic_name: str
23+
) -> batch_v1.Job:
24+
"""
25+
This method shows how to create a sample Batch Job that will run
26+
a simple command inside a container on Cloud Compute instances.
27+
28+
Args:
29+
project_id: project ID or project number of the Cloud project you want to use.
30+
region: name of the region you want to use to run the job. Regions that are
31+
available for Batch are listed on: https://cloud.google.com/batch/docs/locations
32+
job_name: the name of the job that will be created.
33+
It needs to be unique for each project and region pair.
34+
topic_name: the name of the Pub/Sub topic to which the notification will be sent.
35+
The topic should be created in GCP Pub/Sub before running this method.
36+
The procedure for creating a topic is listed here: https://cloud.google.com/pubsub/docs/create-topic
37+
38+
Returns:
39+
A job object representing the job created.
40+
"""
41+
42+
client = batch_v1.BatchServiceClient()
43+
44+
# Define what will be done as part of the job.
45+
runnable = batch_v1.Runnable()
46+
runnable.container = batch_v1.Runnable.Container()
47+
runnable.container.image_uri = "gcr.io/google-containers/busybox"
48+
runnable.container.entrypoint = "/bin/sh"
49+
runnable.container.commands = [
50+
"-c",
51+
"echo Hello world! This is task ${BATCH_TASK_INDEX}. This job has a total of ${BATCH_TASK_COUNT} tasks.",
52+
]
53+
54+
# Jobs can be divided into tasks. In this case, we have only one task.
55+
task = batch_v1.TaskSpec()
56+
task.runnables = [runnable]
57+
58+
# We can specify what resources are requested by each task.
59+
resources = batch_v1.ComputeResource()
60+
resources.cpu_milli = 2000 # in milliseconds per cpu-second. This means the task requires 2 whole CPUs.
61+
resources.memory_mib = 16 # in MiB
62+
task.compute_resource = resources
63+
64+
task.max_retry_count = 2
65+
task.max_run_duration = "3600s"
66+
67+
# Tasks are grouped inside a job using TaskGroups.
68+
# Currently, it's possible to have only one task group.
69+
group = batch_v1.TaskGroup()
70+
group.task_count = 4
71+
group.task_spec = task
72+
73+
# Policies are used to define on what kind of virtual machines the tasks will run on.
74+
# In this case, we tell the system to use "e2-standard-4" machine type.
75+
# Read more about machine types here: https://cloud.google.com/compute/docs/machine-types
76+
policy = batch_v1.AllocationPolicy.InstancePolicy()
77+
policy.machine_type = "e2-standard-4"
78+
instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
79+
instances.policy = policy
80+
allocation_policy = batch_v1.AllocationPolicy()
81+
allocation_policy.instances = [instances]
82+
83+
job = batch_v1.Job()
84+
job.task_groups = [group]
85+
job.allocation_policy = allocation_policy
86+
job.labels = {"env": "testing", "type": "container"}
87+
# We use Cloud Logging as it's an out of the box available option
88+
job.logs_policy = batch_v1.LogsPolicy()
89+
job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING
90+
91+
# Configuring the first notification
92+
notification1 = batch_v1.JobNotification()
93+
notification1.pubsub_topic = f"projects/{project_id}/topics/{topic_name}"
94+
# Define the message that will be sent to the topic
95+
first_massage = batch_v1.JobNotification.Message()
96+
# Specify the new job state that will trigger the notification
97+
# In this case, the notification is triggered when the job state changes to SUCCEEDED
98+
first_massage.type_ = batch_v1.JobNotification.Type.JOB_STATE_CHANGED
99+
first_massage.new_job_state = batch_v1.JobStatus.State.SUCCEEDED
100+
# Assign the message to the notification
101+
notification1.message = first_massage
102+
103+
# Configuring the second notification
104+
notification2 = batch_v1.JobNotification()
105+
notification2.pubsub_topic = f"projects/{project_id}/topics/{topic_name}"
106+
second_message = batch_v1.JobNotification.Message()
107+
second_message.type_ = batch_v1.JobNotification.Type.TASK_STATE_CHANGED
108+
second_message.new_task_state = batch_v1.TaskStatus.State.FAILED
109+
notification2.message = second_message
110+
111+
# Assign a list of notifications to the job.
112+
job.notifications = [notification1, notification2]
113+
114+
create_request = batch_v1.CreateJobRequest()
115+
create_request.job = job
116+
create_request.job_id = job_name
117+
# The job's parent is the region in which the job will run
118+
create_request.parent = f"projects/{project_id}/locations/{region}"
119+
return client.create_job(create_request)
120+
121+
122+
# [END batch_notifications]
123+
124+
if __name__ == "__main__":
125+
PROJECT_ID = google.auth.default()[1]
126+
REGION = "europe-west4"
127+
job_name = "your-job-name"
128+
# The topic should be created in GCP Pub/Sub
129+
existing_topic_name = "your-existing-topic-name"
130+
job = create_with_pubsub_notification_job(
131+
PROJECT_ID, REGION, job_name, existing_topic_name
132+
)
133+
print(job)

batch/tests/test_basics.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
from ..create.create_with_custom_status_events import create_job_with_status_events
2929
from ..create.create_with_gpu_no_mounting import create_gpu_job
3030
from ..create.create_with_persistent_disk import create_with_pd_job
31+
from ..create.create_with_pubsub_notifications import (
32+
create_with_pubsub_notification_job,
33+
)
3134
from ..create.create_with_script_no_mounting import create_script_job
3235
from ..create.create_with_secret_manager import create_with_secret_manager
3336
from ..create.create_with_service_account import create_with_custom_service_account_job
@@ -138,6 +141,20 @@ def _check_secret_set(job: batch_v1.Job, secret_name: str):
138141
assert secret_name in job.task_groups[0].task_spec.environment.secret_variables
139142

140143

144+
def _check_notification(job, test_topic):
145+
notification_found = sum(
146+
1
147+
for notif in job.notifications
148+
if notif.message.new_task_state == batch_v1.TaskStatus.State.FAILED
149+
or notif.message.new_job_state == batch_v1.JobStatus.State.SUCCEEDED
150+
)
151+
assert (
152+
job.notifications[0].pubsub_topic == f"projects/{PROJECT}/topics/{test_topic}"
153+
)
154+
assert notification_found == len(job.notifications)
155+
assert len(job.notifications) == 2
156+
157+
141158
def _check_custom_events(job: batch_v1.Job):
142159
display_names = ["Script 1", "Barrier 1", "Script 2"]
143160
custom_event_found = False
@@ -221,3 +238,10 @@ def test_pd_job(job_name, disk_name):
221238
def test_create_job_with_custom_events(job_name):
222239
job = create_job_with_status_events(PROJECT, REGION, job_name)
223240
_test_body(job, additional_test=lambda: _check_custom_events(job))
241+
242+
243+
@flaky(max_runs=3, min_passes=1)
244+
def test_check_notification_job(job_name):
245+
test_topic = "test_topic"
246+
job = create_with_pubsub_notification_job(PROJECT, REGION, job_name, test_topic)
247+
_test_body(job, additional_test=lambda: _check_notification(job, test_topic))

0 commit comments

Comments
 (0)