Skip to content

Commit 294faa5

Browse files
author
Oleksii Moskalenko
authored
Dataflow runner options: disk type & streaming engine (#906)
* diskType & streamingEnginer * edit infra docs
1 parent f3ab22c commit 294faa5

9 files changed

Lines changed: 37 additions & 15 deletions

File tree

core/src/main/java/feast/core/job/dataflow/DataflowRunnerConfig.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class DataflowRunnerConfig extends RunnerConfig {
3232
public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
3333
this.project = runnerConfigOptions.getProject();
3434
this.region = runnerConfigOptions.getRegion();
35-
this.zone = runnerConfigOptions.getZone();
35+
this.workerZone = runnerConfigOptions.getWorkerZone();
3636
this.serviceAccount = runnerConfigOptions.getServiceAccount();
3737
this.network = runnerConfigOptions.getNetwork();
3838
this.subnetwork = runnerConfigOptions.getSubnetwork();
@@ -44,6 +44,8 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
4444
this.deadLetterTableSpec = runnerConfigOptions.getDeadLetterTableSpec();
4545
this.diskSizeGb = runnerConfigOptions.getDiskSizeGb();
4646
this.labels = runnerConfigOptions.getLabelsMap();
47+
this.enableStreamingEngine = runnerConfigOptions.getEnableStreamingEngine();
48+
this.workerDiskType = runnerConfigOptions.getWorkerDiskType();
4749
validate();
4850
}
4951

@@ -54,7 +56,7 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
5456
@NotBlank public String region;
5557

5658
/* GCP availability zone for operations. */
57-
@NotBlank public String zone;
59+
@NotBlank public String workerZone;
5860

5961
/* Run the job as a specific service account, instead of the default GCE robot. */
6062
public String serviceAccount;
@@ -91,6 +93,12 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
9193

9294
public Map<String, String> labels;
9395

96+
/* If true job will be run on StreamingEngine instead of VMs */
97+
public Boolean enableStreamingEngine;
98+
99+
/* Type of persistent disk to be used by workers */
100+
public String workerDiskType;
101+
94102
/** Validates Dataflow runner configuration options */
95103
public void validate() {
96104
ValidatorFactory factory = Validation.buildDefaultValidatorFactory();

core/src/main/resources/application.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,13 @@ feast:
4242
options:
4343
project: my_gcp_project
4444
region: asia-east1
45-
zone: asia-east1-a
45+
workerZone: asia-east1-a
4646
tempLocation: gs://bucket/tempLocation
4747
network: default
4848
subnetwork: regions/asia-east1/subnetworks/mysubnetwork
4949
maxNumWorkers: 1
50+
enableStreamingEngine: false
51+
workerDiskType: compute.googleapis.com/projects/asia-east1-a/diskTypes/pd-ssd
5052
autoscalingAlgorithm: THROUGHPUT_BASED
5153
usePublicIps: false
5254
workerMachineType: n1-standard-1

core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public void setUp() {
8181
Builder optionsBuilder = DataflowRunnerConfigOptions.newBuilder();
8282
optionsBuilder.setProject("project");
8383
optionsBuilder.setRegion("region");
84-
optionsBuilder.setZone("zone");
84+
optionsBuilder.setWorkerZone("zone");
8585
optionsBuilder.setTempLocation("tempLocation");
8686
optionsBuilder.setNetwork("network");
8787
optionsBuilder.setSubnetwork("subnetwork");

core/src/test/java/feast/core/job/dataflow/DataflowRunnerConfigTest.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException {
3333
DataflowRunnerConfigOptions.newBuilder()
3434
.setProject("my-project")
3535
.setRegion("asia-east1")
36-
.setZone("asia-east1-a")
36+
.setWorkerZone("asia-east1-a")
37+
.setEnableStreamingEngine(true)
38+
.setWorkerDiskType("pd-ssd")
3739
.setTempLocation("gs://bucket/tempLocation")
3840
.setNetwork("default")
3941
.setSubnetwork("regions/asia-east1/subnetworks/mysubnetwork")
@@ -52,7 +54,7 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException {
5254
Arrays.asList(
5355
"--project=my-project",
5456
"--region=asia-east1",
55-
"--zone=asia-east1-a",
57+
"--workerZone=asia-east1-a",
5658
"--tempLocation=gs://bucket/tempLocation",
5759
"--network=default",
5860
"--subnetwork=regions/asia-east1/subnetworks/mysubnetwork",
@@ -62,7 +64,9 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException {
6264
"--workerMachineType=n1-standard-1",
6365
"--deadLetterTableSpec=project_id:dataset_id.table_id",
6466
"--diskSizeGb=100",
65-
"--labels={\"key\":\"value\"}")
67+
"--labels={\"key\":\"value\"}",
68+
"--enableStreamingEngine=true",
69+
"--workerDiskType=pd-ssd")
6670
.toArray(String[]::new);
6771
assertThat(args.size(), equalTo(expectedArgs.length));
6872
assertThat(args, containsInAnyOrder(expectedArgs));
@@ -74,7 +78,7 @@ public void shouldIgnoreOptionalArguments() throws IllegalAccessException {
7478
DataflowRunnerConfigOptions.newBuilder()
7579
.setProject("my-project")
7680
.setRegion("asia-east1")
77-
.setZone("asia-east1-a")
81+
.setWorkerZone("asia-east1-a")
7882
.setTempLocation("gs://bucket/tempLocation")
7983
.setNetwork("default")
8084
.setSubnetwork("regions/asia-east1/subnetworks/mysubnetwork")
@@ -90,15 +94,16 @@ public void shouldIgnoreOptionalArguments() throws IllegalAccessException {
9094
Arrays.asList(
9195
"--project=my-project",
9296
"--region=asia-east1",
93-
"--zone=asia-east1-a",
97+
"--workerZone=asia-east1-a",
9498
"--tempLocation=gs://bucket/tempLocation",
9599
"--network=default",
96100
"--subnetwork=regions/asia-east1/subnetworks/mysubnetwork",
97101
"--maxNumWorkers=1",
98102
"--autoscalingAlgorithm=THROUGHPUT_BASED",
99103
"--usePublicIps=false",
100104
"--workerMachineType=n1-standard-1",
101-
"--labels={}")
105+
"--labels={}",
106+
"--enableStreamingEngine=false")
102107
.toArray(String[]::new);
103108
assertThat(args.size(), equalTo(expectedArgs.length));
104109
assertThat(args, containsInAnyOrder(expectedArgs));

infra/charts/feast/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ feast-core:
241241
options:
242242
project: <google_project_id>
243243
region: <dataflow_regional_endpoint e.g. asia-east1>
244-
zone: <google_zone e.g. asia-east1-a>
244+
workerZone: <google_zone e.g. asia-east1-a>
245245
tempLocation: <gcs_path_for_temp_files e.g. gs://bucket/tempLocation>
246246
network: <google_cloud_network_name>
247247
subnetwork: <google_cloud_subnetwork_path e.g. regions/asia-east1/subnetworks/mysubnetwork>

infra/charts/feast/README.md.gotmpl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ feast-core:
214214
options:
215215
project: <google_project_id>
216216
region: <dataflow_regional_endpoint e.g. asia-east1>
217-
zone: <google_zone e.g. asia-east1-a>
217+
workerZone: <google_zone e.g. asia-east1-a>
218218
tempLocation: <gcs_path_for_temp_files e.g. gs://bucket/tempLocation>
219219
network: <google_cloud_network_name>
220220
subnetwork: <google_cloud_subnetwork_path e.g. regions/asia-east1/subnetworks/mysubnetwork>

infra/charts/feast/values-dataflow-runner.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ feast-core:
1919
options:
2020
project: <google_project_id>
2121
region: <dataflow_regional_endpoint e.g. asia-east1>
22-
zone: <google_zone e.g. asia-east1-a>
22+
workerZone: <google_zone e.g. asia-east1-a>
2323
tempLocation: <gcs_path_for_temp_files e.g. gs://bucket/tempLocation>
2424
network: <google_cloud_network_name>
2525
subnetwork: <google_cloud_subnetwork_path e.g. regions/asia-east1/subnetworks/mysubnetwork>
26+
enableStreamingEngine: false
27+
workerDiskType: <disk_type e.g. compute.googleapis.com/projects/asia-east1-a/diskTypes/pd-ssd>
2628
maxNumWorkers: 1
2729
autoscalingAlgorithm: THROUGHPUT_BASED
2830
usePublicIps: false

infra/scripts/test-templates/values-end-to-end-batch-dataflow.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ feast-core:
2727
options:
2828
project: $GCLOUD_PROJECT
2929
region: $GCLOUD_REGION
30-
zone: $GCLOUD_REGION-a
30+
workerZone: $GCLOUD_REGION-a
3131
tempLocation: gs://$TEMP_BUCKET/tempLocation
3232
network: $GCLOUD_NETWORK
3333
subnetwork: regions/$GCLOUD_REGION/subnetworks/$GCLOUD_SUBNET

protos/feast/core/Runner.proto

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ message DataflowRunnerConfigOptions {
4545
string region = 2;
4646

4747
/* GCP availability zone for operations. */
48-
string zone = 3;
48+
string workerZone = 3;
4949

5050
/* Run the job as a specific service account, instead of the default GCE robot. */
5151
string serviceAccount = 4;
@@ -81,4 +81,9 @@ message DataflowRunnerConfigOptions {
8181
/* Disk size to use on each remote Compute Engine worker instance */
8282
int32 diskSizeGb = 14;
8383

84+
/* Run job on Dataflow Streaming Engine instead of creating worker VMs */
85+
bool enableStreamingEngine = 15;
86+
87+
/* Type of persistent disk to be used by workers */
88+
string workerDiskType = 16;
8489
}

0 commit comments

Comments
 (0)