Skip to content

Commit b9e6597

Browse files
authored
Replace Data Source specific format options with DataFormat message (feast-dev#1049)
* Proto: Add DataFormat proto and use when specifying data format/options Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * Core: Update DataSource model to support new DataFormat proto Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * Python SDK: Update SDK to use new data format API. Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * Fix python lint Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * Fix Python unit tests refercing old argument `class_path' Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * Fix DataSource's from_proto() not coverting DataFormat protos to native objects. Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * Fix python lint Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * Core: Add convience methods to DataGenerator to create Kinesis source, DataFormat specs Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * Core: Add DataSourceValidator to valid data source specs in Feature Tables Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * E2E: Fix specifying data format in FeatureTables in e2e tests Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * Rebase on master Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * E2E: Fix typo Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * Serving: Fix IT TestUtils not producing Protobuf specs inline with the current API Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * Fix java lint Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * E2E: Fix another typo Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * Proto: Split DataFormat message into StreamFormat and FileFormat messages Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * Core: Update Core to support split StreamFormat and FileFormat messages Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * Proto: Split data formats from DataSource.proto into new DataFormat.proto Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * Python SDK: Update SDK to support split StreamFormat and FileFormat messages Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * Python SDK: Fix python lint Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * Python SDK: Fix missing data_format module due to file being untracked by git Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * Python SDK: Fix Lint Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * Python SDK: Fix data_format specification format Signed-off-by: Zhu Zhanyan <program.nom@gmail.com> * Python SDK: Fix issue where _source_to_argument pass FileFormat class instead of expected str Signed-off-by: Zhu Zhanyan <program.nom@gmail.com>
1 parent d1e7f39 commit b9e6597

43 files changed

Lines changed: 1212 additions & 313 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

common-test/src/main/java/feast/common/it/DataGenerator.java

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,16 @@
2020
import com.google.common.collect.ImmutableMap;
2121
import com.google.protobuf.Duration;
2222
import com.google.protobuf.Timestamp;
23+
import feast.proto.core.DataFormatProto.FileFormat;
24+
import feast.proto.core.DataFormatProto.FileFormat.ParquetFormat;
25+
import feast.proto.core.DataFormatProto.StreamFormat;
26+
import feast.proto.core.DataFormatProto.StreamFormat.AvroFormat;
27+
import feast.proto.core.DataFormatProto.StreamFormat.ProtoFormat;
2328
import feast.proto.core.DataSourceProto.DataSource;
2429
import feast.proto.core.DataSourceProto.DataSource.BigQueryOptions;
2530
import feast.proto.core.DataSourceProto.DataSource.FileOptions;
2631
import feast.proto.core.DataSourceProto.DataSource.KafkaOptions;
32+
import feast.proto.core.DataSourceProto.DataSource.KinesisOptions;
2733
import feast.proto.core.EntityProto;
2834
import feast.proto.core.FeatureProto;
2935
import feast.proto.core.FeatureProto.FeatureSpecV2;
@@ -266,11 +272,14 @@ public static FeatureTableSpec createFeatureTableSpec(
266272
}
267273

268274
public static DataSource createFileDataSourceSpec(
269-
String fileURL, String fileFormat, String timestampColumn, String datePartitionColumn) {
275+
String fileURL, String timestampColumn, String datePartitionColumn) {
270276
return DataSource.newBuilder()
271277
.setType(DataSource.SourceType.BATCH_FILE)
272278
.setFileOptions(
273-
FileOptions.newBuilder().setFileFormat(fileFormat).setFileUrl(fileURL).build())
279+
FileOptions.newBuilder()
280+
.setFileFormat(createParquetFormat())
281+
.setFileUrl(fileURL)
282+
.build())
274283
.setEventTimestampColumn(timestampColumn)
275284
.setDatePartitionColumn(datePartitionColumn)
276285
.build();
@@ -294,7 +303,7 @@ public static DataSource createKafkaDataSourceSpec(
294303
KafkaOptions.newBuilder()
295304
.setTopic(topic)
296305
.setBootstrapServers(servers)
297-
.setClassPath(classPath)
306+
.setMessageFormat(createProtoFormat("class.path"))
298307
.build())
299308
.setEventTimestampColumn(timestampColumn)
300309
.build();
@@ -327,4 +336,34 @@ public static ServingAPIProto.GetOnlineFeaturesRequestV2.EntityRow createEntityR
327336
.putFields(entityName, entityValue)
328337
.build();
329338
}
339+
340+
public static DataSource createKinesisDataSourceSpec(
341+
String region, String streamName, String classPath, String timestampColumn) {
342+
return DataSource.newBuilder()
343+
.setType(DataSource.SourceType.STREAM_KINESIS)
344+
.setKinesisOptions(
345+
KinesisOptions.newBuilder()
346+
.setRegion("ap-nowhere1")
347+
.setStreamName("stream")
348+
.setRecordFormat(createProtoFormat(classPath))
349+
.build())
350+
.setEventTimestampColumn(timestampColumn)
351+
.build();
352+
}
353+
354+
public static FileFormat createParquetFormat() {
355+
return FileFormat.newBuilder().setParquetFormat(ParquetFormat.getDefaultInstance()).build();
356+
}
357+
358+
public static StreamFormat createAvroFormat(String schemaJSON) {
359+
return StreamFormat.newBuilder()
360+
.setAvroFormat(AvroFormat.newBuilder().setSchemaJson(schemaJSON).build())
361+
.build();
362+
}
363+
364+
public static StreamFormat createProtoFormat(String classPath) {
365+
return StreamFormat.newBuilder()
366+
.setProtoFormat(ProtoFormat.newBuilder().setClassPath(classPath).build())
367+
.build();
368+
}
330369
}

core/src/main/java/feast/core/model/DataSource.java

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@
1616
*/
1717
package feast.core.model;
1818

19-
import static feast.proto.core.DataSourceProto.DataSource.SourceType.*;
20-
19+
import com.google.protobuf.InvalidProtocolBufferException;
20+
import com.google.protobuf.Message;
21+
import com.google.protobuf.MessageOrBuilder;
22+
import com.google.protobuf.util.JsonFormat;
2123
import feast.core.util.TypeConversion;
24+
import feast.proto.core.DataFormatProto.FileFormat;
25+
import feast.proto.core.DataFormatProto.StreamFormat;
2226
import feast.proto.core.DataSourceProto;
2327
import feast.proto.core.DataSourceProto.DataSource.BigQueryOptions;
2428
import feast.proto.core.DataSourceProto.DataSource.FileOptions;
@@ -91,20 +95,23 @@ public static DataSource fromProto(DataSourceProto.DataSource spec) {
9195
switch (spec.getType()) {
9296
case BATCH_FILE:
9397
dataSourceConfigMap.put("file_url", spec.getFileOptions().getFileUrl());
94-
dataSourceConfigMap.put("file_format", spec.getFileOptions().getFileFormat());
98+
dataSourceConfigMap.put("file_format", printJSON(spec.getFileOptions().getFileFormat()));
9599
break;
96100
case BATCH_BIGQUERY:
97101
dataSourceConfigMap.put("table_ref", spec.getBigqueryOptions().getTableRef());
98102
break;
99103
case STREAM_KAFKA:
100104
dataSourceConfigMap.put("bootstrap_servers", spec.getKafkaOptions().getBootstrapServers());
101-
dataSourceConfigMap.put("class_path", spec.getKafkaOptions().getClassPath());
105+
dataSourceConfigMap.put(
106+
"message_format", printJSON(spec.getKafkaOptions().getMessageFormat()));
102107
dataSourceConfigMap.put("topic", spec.getKafkaOptions().getTopic());
103108
break;
104109
case STREAM_KINESIS:
105-
dataSourceConfigMap.put("class_path", spec.getKinesisOptions().getClassPath());
110+
dataSourceConfigMap.put(
111+
"record_format", printJSON(spec.getKinesisOptions().getRecordFormat()));
106112
dataSourceConfigMap.put("region", spec.getKinesisOptions().getRegion());
107113
dataSourceConfigMap.put("stream_name", spec.getKinesisOptions().getStreamName());
114+
108115
break;
109116
default:
110117
throw new UnsupportedOperationException(
@@ -137,7 +144,11 @@ public DataSourceProto.DataSource toProto() {
137144
case BATCH_FILE:
138145
FileOptions.Builder fileOptions = FileOptions.newBuilder();
139146
fileOptions.setFileUrl(dataSourceConfigMap.get("file_url"));
140-
fileOptions.setFileFormat(dataSourceConfigMap.get("file_format"));
147+
148+
FileFormat.Builder fileFormat = FileFormat.newBuilder();
149+
parseMessage(dataSourceConfigMap.get("file_format"), fileFormat);
150+
fileOptions.setFileFormat(fileFormat.build());
151+
141152
spec.setFileOptions(fileOptions.build());
142153
break;
143154
case BATCH_BIGQUERY:
@@ -148,15 +159,23 @@ public DataSourceProto.DataSource toProto() {
148159
case STREAM_KAFKA:
149160
KafkaOptions.Builder kafkaOptions = KafkaOptions.newBuilder();
150161
kafkaOptions.setBootstrapServers(dataSourceConfigMap.get("bootstrap_servers"));
151-
kafkaOptions.setClassPath(dataSourceConfigMap.get("class_path"));
152162
kafkaOptions.setTopic(dataSourceConfigMap.get("topic"));
163+
164+
StreamFormat.Builder messageFormat = StreamFormat.newBuilder();
165+
parseMessage(dataSourceConfigMap.get("message_format"), messageFormat);
166+
kafkaOptions.setMessageFormat(messageFormat.build());
167+
153168
spec.setKafkaOptions(kafkaOptions.build());
154169
break;
155170
case STREAM_KINESIS:
156171
KinesisOptions.Builder kinesisOptions = KinesisOptions.newBuilder();
157-
kinesisOptions.setClassPath(dataSourceConfigMap.get("class_path"));
158172
kinesisOptions.setRegion(dataSourceConfigMap.get("region"));
159173
kinesisOptions.setStreamName(dataSourceConfigMap.get("stream_name"));
174+
175+
StreamFormat.Builder recordFormat = StreamFormat.newBuilder();
176+
parseMessage(dataSourceConfigMap.get("record_format"), recordFormat);
177+
kinesisOptions.setRecordFormat(recordFormat.build());
178+
160179
spec.setKinesisOptions(kinesisOptions.build());
161180
break;
162181
default:
@@ -194,4 +213,22 @@ public boolean equals(Object o) {
194213
DataSource other = (DataSource) o;
195214
return this.toProto().equals(other.toProto());
196215
}
216+
217+
/** Print the given Message into its JSON string representation */
218+
private static String printJSON(MessageOrBuilder message) {
219+
try {
220+
return JsonFormat.printer().print(message);
221+
} catch (InvalidProtocolBufferException e) {
222+
throw new RuntimeException("Unexpected exception convering Proto to JSON", e);
223+
}
224+
}
225+
226+
/** Parse the given Message in JSON representation into the given Message Builder */
227+
private static void parseMessage(String json, Message.Builder message) {
228+
try {
229+
JsonFormat.parser().merge(json, message);
230+
} catch (InvalidProtocolBufferException e) {
231+
throw new RuntimeException("Unexpected exception convering JSON to Proto", e);
232+
}
233+
}
197234
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright 2018-2020 The Feast Authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package feast.core.validators;
18+
19+
import static feast.core.validators.Matchers.*;
20+
import static feast.proto.core.DataSourceProto.DataSource.SourceType.*;
21+
22+
import feast.proto.core.DataFormatProto.FileFormat;
23+
import feast.proto.core.DataFormatProto.StreamFormat;
24+
import feast.proto.core.DataSourceProto.DataSource;
25+
26+
public class DataSourceValidator {
27+
/** Validate if the given DataSource protobuf spec is valid. */
28+
public static void validate(DataSource spec) {
29+
switch (spec.getType()) {
30+
case BATCH_FILE:
31+
FileFormat.FormatCase fileFormat = spec.getFileOptions().getFileFormat().getFormatCase();
32+
switch (fileFormat) {
33+
case PARQUET_FORMAT:
34+
break;
35+
default:
36+
throw new UnsupportedOperationException(
37+
String.format("Unsupported File Format: %s", fileFormat));
38+
}
39+
break;
40+
41+
case BATCH_BIGQUERY:
42+
checkValidBigQueryTableRef(spec.getBigqueryOptions().getTableRef(), "FeatureTable");
43+
break;
44+
45+
case STREAM_KAFKA:
46+
StreamFormat.FormatCase messageFormat =
47+
spec.getKafkaOptions().getMessageFormat().getFormatCase();
48+
switch (messageFormat) {
49+
case PROTO_FORMAT:
50+
checkValidClassPath(
51+
spec.getKafkaOptions().getMessageFormat().getProtoFormat().getClassPath(),
52+
"FeatureTable");
53+
break;
54+
default:
55+
throw new UnsupportedOperationException(
56+
String.format(
57+
"Unsupported Stream Format for Kafka Source Type: %s", messageFormat));
58+
}
59+
break;
60+
61+
case STREAM_KINESIS:
62+
// Verify tht DataFormat is supported by kinesis data source
63+
StreamFormat.FormatCase recordFormat =
64+
spec.getKinesisOptions().getRecordFormat().getFormatCase();
65+
switch (recordFormat) {
66+
case PROTO_FORMAT:
67+
checkValidClassPath(
68+
spec.getKinesisOptions().getRecordFormat().getProtoFormat().getClassPath(),
69+
"FeatureTable");
70+
break;
71+
default:
72+
throw new UnsupportedOperationException(
73+
String.format("Unsupported Stream Format for Kafka Source Type: %s", recordFormat));
74+
}
75+
break;
76+
default:
77+
throw new UnsupportedOperationException(
78+
String.format("Unsupported Feature Store Type: %s", spec.getType()));
79+
}
80+
}
81+
}

core/src/main/java/feast/core/validators/FeatureTableValidator.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static feast.core.validators.Matchers.*;
2020

21+
import feast.proto.core.DataSourceProto.DataSource.SourceType;
2122
import feast.proto.core.FeatureProto.FeatureSpecV2;
2223
import feast.proto.core.FeatureTableProto.FeatureTableSpec;
2324
import java.util.ArrayList;
@@ -49,12 +50,6 @@ public static void validateSpec(FeatureTableSpec spec) {
4950
checkValidCharacters(spec.getName(), "FeatureTable");
5051
spec.getFeaturesList().forEach(FeatureTableValidator::validateFeatureSpec);
5152

52-
// Check that BigQuery reference defined for BigQuery source is valid
53-
if (!spec.getBatchSource().getBigqueryOptions().getTableRef().isEmpty()) {
54-
checkValidBigQueryTableRef(
55-
spec.getBatchSource().getBigqueryOptions().getTableRef(), "FeatureTable");
56-
}
57-
5853
// Check that features and entities defined in FeatureTable do not use reserved names
5954
ArrayList<String> fieldNames = new ArrayList<>(spec.getEntitiesList());
6055
fieldNames.addAll(
@@ -70,6 +65,14 @@ public static void validateSpec(FeatureTableSpec spec) {
7065
throw new IllegalArgumentException(
7166
String.format("Entity and Feature names within a Feature Table should be unique."));
7267
}
68+
69+
// Check that the data sources defined in the feature table are valid
70+
if (!spec.getBatchSource().getType().equals(SourceType.INVALID)) {
71+
DataSourceValidator.validate(spec.getBatchSource());
72+
}
73+
if (!spec.getStreamSource().getType().equals(SourceType.INVALID)) {
74+
DataSourceValidator.validate(spec.getStreamSource());
75+
}
7376
}
7477

7578
private static void validateFeatureSpec(FeatureSpecV2 spec) {

core/src/main/java/feast/core/validators/Matchers.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ public class Matchers {
2424

2525
private static Pattern BIGQUERY_TABLE_REF_REGEX =
2626
Pattern.compile("[a-zA-Z0-9-]+[:]+[a-zA-Z0-9_]+[.]+[a-zA-Z0-9_]*");
27+
private static Pattern CLASS_PATH_REGEX =
28+
Pattern.compile("[a-zA-Z_$][a-zA-Z0-9_$]*(\\.[a-zA-Z_$][a-zA-Z0-9_$]*)");
2729
private static Pattern UPPER_SNAKE_CASE_REGEX = Pattern.compile("^[A-Z0-9]+(_[A-Z0-9]+)*$");
2830
private static Pattern LOWER_SNAKE_CASE_REGEX = Pattern.compile("^[a-z0-9]+(_[a-z0-9]+)*$");
2931
private static Pattern VALID_CHARACTERS_REGEX = Pattern.compile("^[a-zA-Z_][a-zA-Z0-9_]*$");
@@ -92,6 +94,14 @@ public static void checkValidBigQueryTableRef(String input, String resource)
9294
}
9395
}
9496

97+
public static void checkValidClassPath(String input, String resource) {
98+
if (!CLASS_PATH_REGEX.matcher(input).matches()) {
99+
throw new IllegalArgumentException(
100+
String.format(
101+
ERROR_MESSAGE_TEMPLATE, resource, input, "argument must be a valid Java Classpath"));
102+
}
103+
}
104+
95105
public static boolean hasDuplicates(Collection<String> strings) {
96106
return (new HashSet<>(strings)).size() < strings.size();
97107
}

core/src/test/java/feast/core/model/DataSourceTest.java

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,11 @@
1616
*/
1717
package feast.core.model;
1818

19-
import static feast.proto.core.DataSourceProto.DataSource.SourceType.*;
2019
import static org.hamcrest.MatcherAssert.assertThat;
2120
import static org.hamcrest.core.IsEqual.equalTo;
2221

2322
import feast.common.it.DataGenerator;
2423
import feast.proto.core.DataSourceProto;
25-
import feast.proto.core.DataSourceProto.DataSource.BigQueryOptions;
26-
import feast.proto.core.DataSourceProto.DataSource.KinesisOptions;
2724
import java.util.List;
2825
import java.util.Map;
2926
import org.junit.Test;
@@ -55,21 +52,9 @@ public void shouldFromProtoBeReversableWithToProto() {
5552

5653
private List<DataSourceProto.DataSource> getTestSpecs() {
5754
return List.of(
58-
DataGenerator.createFileDataSourceSpec("file:///path/to/file", "parquet", "ts_col", ""),
55+
DataGenerator.createFileDataSourceSpec("file:///path/to/file", "ts_col", ""),
5956
DataGenerator.createKafkaDataSourceSpec("localhost:9092", "topic", "class.path", "ts_col"),
60-
DataSourceProto.DataSource.newBuilder()
61-
.setType(BATCH_BIGQUERY)
62-
.setBigqueryOptions(
63-
BigQueryOptions.newBuilder().setTableRef("project:dataset.table").build())
64-
.build(),
65-
DataSourceProto.DataSource.newBuilder()
66-
.setType(STREAM_KINESIS)
67-
.setKinesisOptions(
68-
KinesisOptions.newBuilder()
69-
.setRegion("ap-nowhere1")
70-
.setStreamName("stream")
71-
.setClassPath("class.path")
72-
.build())
73-
.build());
57+
DataGenerator.createBigQueryDataSourceSpec("project:dataset.table", "ts_col", "dt_col"),
58+
DataGenerator.createKinesisDataSourceSpec("ap-nowhere1", "stream", "class.path", "ts_col"));
7459
}
7560
}

0 commit comments

Comments
 (0)