Skip to content

Commit 196935e

Browse files
feat: Batch create job with NFS sample (GoogleCloudPlatform#12088)
* Created sample and test for job with an existing network file system (NFS)
1 parent be7b161 commit 196935e

File tree

2 files changed

+169
-5
lines changed

2 files changed

+169
-5
lines changed

batch/create/create_with_nfs.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import google.auth
16+
17+
# [START batch_create_nfs_job]
18+
from google.cloud import batch_v1
19+
20+
21+
def create_job_with_network_file_system(
22+
project_id: str,
23+
region: str,
24+
job_name: str,
25+
mount_path: str,
26+
nfs_ip_address: str,
27+
nfs_path: str,
28+
) -> batch_v1.Job:
29+
"""
30+
Creates a Batch job with status events that mounts a Network File System (NFS).
31+
Function mounts an NFS volume using the provided NFS server, IP address and path.
32+
33+
Args:
34+
project_id (str): project ID or project number of the Cloud project you want to use.
35+
region (str): name of the region you want to use to run the job. Regions that are
36+
available for Batch are listed on: https://cloud.google.com/batch/docs/locations
37+
job_name (str): the name of the job that will be created.
38+
It needs to be unique for each project and region pair.
39+
mount_path (str): The mount path that the job's tasks use to access the NFS.
40+
nfs_ip_address (str): The IP address of the NFS server (e.g., Filestore instance).
41+
Documentation on how to create a
42+
Filestore instance is available here: https://cloud.google.com/filestore/docs/create-instance-gcloud
43+
nfs_path (str): The path of the NFS directory that the job accesses.
44+
The path must start with a / followed by the root directory of the NFS.
45+
46+
Returns:
47+
batch_v1.Job: The created Batch job object containing configuration details.
48+
"""
49+
client = batch_v1.BatchServiceClient()
50+
51+
# Create a runnable with a script that writes a message to a file
52+
runnable = batch_v1.Runnable()
53+
runnable.script = batch_v1.Runnable.Script()
54+
runnable.script.text = f"echo Hello world from task ${{BATCH_TASK_INDEX}}. >> {mount_path}/output_task_${{BATCH_TASK_INDEX}}.txt"
55+
56+
# Define a volume that uses NFS
57+
volume = batch_v1.Volume()
58+
volume.nfs = batch_v1.NFS(server=nfs_ip_address, remote_path=nfs_path)
59+
volume.mount_path = mount_path
60+
61+
# Create a task specification and assign the runnable and volume to it
62+
task = batch_v1.TaskSpec()
63+
task.runnables = [runnable]
64+
task.volumes = [volume]
65+
66+
# Specify what resources are requested by each task.
67+
resources = batch_v1.ComputeResource()
68+
resources.cpu_milli = 2000 # in milliseconds per cpu-second. This means the task requires 2 whole CPUs.
69+
resources.memory_mib = 16 # in MiB
70+
task.compute_resource = resources
71+
72+
task.max_retry_count = 2
73+
task.max_run_duration = "3600s"
74+
75+
# Create a task group and assign the task specification to it
76+
group = batch_v1.TaskGroup()
77+
group.task_count = 1
78+
group.task_spec = task
79+
80+
# Policies are used to define on what kind of virtual machines the tasks will run on.
81+
# In this case, we tell the system to use "e2-standard-4" machine type.
82+
# Read more about machine types here: https://cloud.google.com/compute/docs/machine-types
83+
policy = batch_v1.AllocationPolicy.InstancePolicy()
84+
policy.machine_type = "e2-standard-4"
85+
instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
86+
instances.policy = policy
87+
allocation_policy = batch_v1.AllocationPolicy()
88+
allocation_policy.instances = [instances]
89+
90+
# Create the job and assign the task group and allocation policy to it
91+
job = batch_v1.Job()
92+
job.task_groups = [group]
93+
job.allocation_policy = allocation_policy
94+
job.labels = {"env": "testing", "type": "container"}
95+
# We use Cloud Logging as it's an out of the box available option
96+
job.logs_policy = batch_v1.LogsPolicy()
97+
job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING
98+
99+
# Create the job request and set the job and job ID
100+
create_request = batch_v1.CreateJobRequest()
101+
create_request.job = job
102+
create_request.job_id = job_name
103+
# The job's parent is the region in which the job will run
104+
create_request.parent = f"projects/{project_id}/locations/{region}"
105+
106+
return client.create_job(create_request)
107+
108+
109+
# [END batch_create_nfs_job]
110+
111+
112+
if __name__ == "__main__":
113+
PROJECT_ID = google.auth.default()[1]
114+
REGION = "us-central1"
115+
job_name = "your-job-name"
116+
# The local path on your VM where the NFS mounted.
117+
mount_path = "/mnt/disks"
118+
# IP address of the NFS server e.g. Filestore instance.
119+
nfc_ip_address = "IP_address_of_your_NFS_server"
120+
# The path of the NFS directory
121+
nfs_path = "/your_nfs_path"
122+
create_job_with_network_file_system(
123+
PROJECT_ID, REGION, job_name, mount_path, nfc_ip_address, nfs_path
124+
)

batch/tests/test_basics.py

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from ..create.create_with_container_no_mounting import create_container_job
2828
from ..create.create_with_custom_status_events import create_job_with_status_events
2929
from ..create.create_with_gpu_no_mounting import create_gpu_job
30+
from ..create.create_with_nfs import create_job_with_network_file_system
3031
from ..create.create_with_persistent_disk import create_with_pd_job
3132
from ..create.create_with_pubsub_notifications import (
3233
create_with_pubsub_notification_job,
@@ -79,20 +80,25 @@ def disk_name():
7980
return f"test-disk-{uuid.uuid4().hex[:10]}"
8081

8182

82-
def _test_body(test_job: batch_v1.Job, additional_test: Callable = None, region=REGION):
83+
def _test_body(
84+
test_job: batch_v1.Job,
85+
additional_test: Callable = None,
86+
region=REGION,
87+
project=PROJECT,
88+
):
8389
start_time = time.time()
8490
try:
8591
while test_job.status.state in WAIT_STATES:
8692
if time.time() - start_time > TIMEOUT:
8793
pytest.fail("Timed out while waiting for job to complete!")
8894
test_job = get_job(
89-
PROJECT, region, test_job.name.rsplit("/", maxsplit=1)[1]
95+
project, region, test_job.name.rsplit("/", maxsplit=1)[1]
9096
)
9197
time.sleep(5)
9298

9399
assert test_job.status.state == batch_v1.JobStatus.State.SUCCEEDED
94100

95-
for job in list_jobs(PROJECT, region):
101+
for job in list_jobs(project, region):
96102
if test_job.uid == job.uid:
97103
break
98104
else:
@@ -101,9 +107,9 @@ def _test_body(test_job: batch_v1.Job, additional_test: Callable = None, region=
101107
if additional_test:
102108
additional_test()
103109
finally:
104-
delete_job(PROJECT, region, test_job.name.rsplit("/", maxsplit=1)[1]).result()
110+
delete_job(project, region, test_job.name.rsplit("/", maxsplit=1)[1]).result()
105111

106-
for job in list_jobs(PROJECT, region):
112+
for job in list_jobs(project, region):
107113
if job.uid == test_job.uid:
108114
pytest.fail("The test job should be deleted at this point!")
109115

@@ -173,6 +179,16 @@ def _check_custom_events(job: batch_v1.Job):
173179
assert barrier_name_found
174180

175181

182+
def _check_nfs_mounting(
183+
job: batch_v1.Job, mount_path: str, nfc_ip_address: str, nfs_path: str
184+
):
185+
expected_script_text = f"{mount_path}/output_task_${{BATCH_TASK_INDEX}}.txt"
186+
assert job.task_groups[0].task_spec.volumes[0].nfs.server == nfc_ip_address
187+
assert job.task_groups[0].task_spec.volumes[0].nfs.remote_path == nfs_path
188+
assert job.task_groups[0].task_spec.volumes[0].mount_path == mount_path
189+
assert expected_script_text in job.task_groups[0].task_spec.runnables[0].script.text
190+
191+
176192
@flaky(max_runs=3, min_passes=1)
177193
def test_script_job(job_name, capsys):
178194
job = create_script_job(PROJECT, REGION, job_name)
@@ -245,3 +261,27 @@ def test_check_notification_job(job_name):
245261
test_topic = "test_topic"
246262
job = create_with_pubsub_notification_job(PROJECT, REGION, job_name, test_topic)
247263
_test_body(job, additional_test=lambda: _check_notification(job, test_topic))
264+
265+
266+
@flaky(max_runs=3, min_passes=1)
267+
def test_check_nfs_job(job_name):
268+
mount_path = "/mnt/nfs"
269+
nfc_ip_address = "10.180.103.74"
270+
nfs_path = "/vol1"
271+
project_with_nfs_filestore = "python-docs-samples-tests"
272+
job = create_job_with_network_file_system(
273+
project_with_nfs_filestore,
274+
"us-central1",
275+
job_name,
276+
mount_path,
277+
nfc_ip_address,
278+
nfs_path,
279+
)
280+
_test_body(
281+
job,
282+
additional_test=lambda: _check_nfs_mounting(
283+
job, mount_path, nfc_ip_address, nfs_path
284+
),
285+
region="us-central1",
286+
project=project_with_nfs_filestore,
287+
)

0 commit comments

Comments
 (0)