Skip to content

Commit 5a6edc4

Browse files
timsfeast-ci-bot
authored andcommitted
Coalesce rows (feast-dev#89)
* ptransform to coalesce feature rows in batch and streaming * add coalesce feature rows transform, to build state of latest row per key in the global window * coalesce for serving on entity name and key only, preparing for removing history from serving stores * change to rounding granularities after feature row coalesce * rename importSpec.options to be importspec.sourceOptions and add jobOptions * only write out the latest row for bigtable and remove postgres feature store * do not write historical values to redis serving store * formatting and fix test * core importspec validator test * fix typo on option name * use Timestamps.compare * remove redudant proto message * default coalescerows to true * Update go protos and cli tests * Update python sdk for new import spec
1 parent ca2ee77 commit 5a6edc4

57 files changed

Lines changed: 2218 additions & 1100 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

cli/feast/pkg/parse/yaml.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package parse
1616

1717
import (
1818
"encoding/json"
19-
"fmt"
2019
"time"
2120

2221
"github.com/golang/protobuf/ptypes/timestamp"
@@ -98,11 +97,6 @@ func YamlToImportSpec(in []byte) (*specs.ImportSpec, error) {
9897
return nil, err
9998
}
10099

101-
// schema must be available for 'file' or 'bigquery'
102-
if (ymlMap["type"] == "file" || ymlMap["type"] == "bigquery") && ymlMap["schema"] == nil {
103-
return nil, fmt.Errorf("Schema must be specified for importing data from file or BigQuery")
104-
}
105-
106100
// either timestampValue or timestampColumn
107101
var timestampValue *timestamp.Timestamp
108102
var timestampColumn string

cli/feast/pkg/parse/yaml_test.go

Lines changed: 7 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,9 @@ func TestYamlToImportSpec(t *testing.T) {
243243
{
244244
name: "valid yaml",
245245
input: []byte(`type: file
246-
options:
246+
jobOptions:
247+
coalesceRows.enabled: "true"
248+
sourceOptions:
247249
format: csv
248250
path: jaeger_last_opportunity_sample.csv
249251
entities:
@@ -258,7 +260,10 @@ schema:
258260
featureId: driver.none.last_opportunity`),
259261
expected: &specs.ImportSpec{
260262
Type: "file",
261-
Options: map[string]string{
263+
JobOptions: map[string]string{
264+
"coalesceRows.enabled": "true",
265+
},
266+
SourceOptions: map[string]string{
262267
"format": "csv",
263268
"path": "jaeger_last_opportunity_sample.csv",
264269
},
@@ -299,49 +304,3 @@ schema:
299304
})
300305
}
301306
}
302-
303-
func TestYamlToImportSpecNoSchema(t *testing.T) {
304-
tt := []struct {
305-
name string
306-
input []byte
307-
expected *specs.ImportSpec
308-
err error
309-
}{
310-
{
311-
name: "valid yaml",
312-
input: []byte(`type: pubsub
313-
options:
314-
topic: projects/your-gcp-project/topics/feast-test
315-
entities:
316-
- testentity`),
317-
expected: &specs.ImportSpec{
318-
Type: "pubsub",
319-
Options: map[string]string{
320-
"topic": "projects/your-gcp-project/topics/feast-test",
321-
},
322-
Entities: []string{"testentity"},
323-
},
324-
err: nil,
325-
},
326-
}
327-
328-
for _, tc := range tt {
329-
t.Run(tc.name, func(t *testing.T) {
330-
spec, err := YamlToImportSpec(tc.input)
331-
if tc.err == nil {
332-
if err != nil {
333-
t.Error(err)
334-
} else if !cmp.Equal(spec, tc.expected) {
335-
t.Errorf("Expected %s, got %s", tc.expected, spec)
336-
}
337-
} else {
338-
// we expect an error
339-
if err == nil {
340-
t.Error(err)
341-
} else if err.Error() != tc.err.Error() {
342-
t.Errorf("Expected error %v, got %v", err.Error(), tc.err.Error())
343-
}
344-
}
345-
})
346-
}
347-
}

core/src/main/java/feast/core/model/JobInfo.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,12 @@ public class JobInfo extends AbstractTimestampEntity {
5555
private String runner;
5656

5757
// Job options. Stored as a json string as it is specific to the runner.
58-
@Column(name = "options")
59-
private String options;
58+
@Column(name = "source_options")
59+
private String sourceOptions;
60+
61+
// Job options. Stored as a json string as it is specific to the runner.
62+
@Column(name = "job_options")
63+
private String jobOptions;
6064

6165
// Entities populated by the job
6266
@ManyToMany
@@ -99,7 +103,8 @@ public JobInfo(
99103
this.extId = extId;
100104
this.type = importSpec.getType();
101105
this.runner = runner;
102-
this.options = TypeConversion.convertMapToJsonString(importSpec.getOptionsMap());
106+
this.sourceOptions = TypeConversion.convertMapToJsonString(importSpec.getSourceOptionsMap());
107+
this.jobOptions = TypeConversion.convertMapToJsonString(importSpec.getJobOptionsMap());
103108
this.entities = new ArrayList<>();
104109
for (String entity : importSpec.getEntitiesList()) {
105110
EntityInfo entityInfo = new EntityInfo();

core/src/main/java/feast/core/validators/SpecValidator.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static com.google.common.base.Preconditions.checkNotNull;
2222
import static feast.core.validators.Matchers.checkLowerSnakeCase;
2323

24+
import com.google.common.base.Preconditions;
2425
import com.google.common.base.Strings;
2526
import com.google.common.collect.Lists;
2627
import feast.core.dao.EntityInfoRepository;
@@ -40,6 +41,8 @@
4041
import feast.specs.ImportSpecProto.ImportSpec;
4142
import feast.specs.StorageSpecProto.StorageSpec;
4243
import java.util.Arrays;
44+
import java.util.List;
45+
import java.util.Map;
4346
import java.util.Optional;
4447
import java.util.stream.Collectors;
4548
import java.util.stream.Stream;
@@ -256,6 +259,19 @@ public void validateImportSpec(ImportSpec spec) throws IllegalArgumentException
256259
entityInfoRepository.existsById(name),
257260
Strings.lenientFormat("Entity %s not registered", name));
258261
}
262+
Map<String, String> jobOptions = spec.getJobOptionsMap();
263+
if (jobOptions.size() > 0) {
264+
List<String> opts = Lists.newArrayList(
265+
"sample.limit",
266+
"coalesceRows.enabled",
267+
"coalesceRows.delaySeconds",
268+
"coalesceRows.timeoutSeconds"
269+
);
270+
for (String key : jobOptions.keySet()) {
271+
Preconditions.checkArgument(opts.contains(key),
272+
Strings.lenientFormat("Option %s is not a valid jobOption", key));
273+
}
274+
}
259275
} catch (NullPointerException | IllegalArgumentException e) {
260276
throw new IllegalArgumentException(
261277
Strings.lenientFormat("Validation for import spec failed: %s", e.getMessage()));
@@ -264,8 +280,8 @@ public void validateImportSpec(ImportSpec spec) throws IllegalArgumentException
264280

265281
private void checkKafkaImportSpecOption(ImportSpec spec) {
266282
try {
267-
String topics = spec.getOptionsOrDefault("topics", "");
268-
String server = spec.getOptionsOrDefault("server", "");
283+
String topics = spec.getSourceOptionsOrDefault("topics", "");
284+
String server = spec.getSourceOptionsOrDefault("server", "");
269285
if (topics.equals("") && server.equals("")) {
270286
throw new IllegalArgumentException(
271287
"Kafka ingestion requires either topics or servers");
@@ -278,7 +294,8 @@ private void checkKafkaImportSpecOption(ImportSpec spec) {
278294

279295
private void checkFileImportSpecOption(ImportSpec spec) throws IllegalArgumentException {
280296
try {
281-
checkArgument(!spec.getOptionsOrDefault("path", "").equals(""), "File path cannot be empty");
297+
checkArgument(!spec.getSourceOptionsOrDefault("path", "").equals(""),
298+
"File path cannot be empty");
282299
} catch (NullPointerException | IllegalArgumentException e) {
283300
throw new IllegalArgumentException(
284301
Strings.lenientFormat("Invalid options: %s", e.getMessage()));
@@ -287,8 +304,8 @@ private void checkFileImportSpecOption(ImportSpec spec) throws IllegalArgumentEx
287304

288305
private void checkPubSubImportSpecOption(ImportSpec spec) throws IllegalArgumentException {
289306
try {
290-
String topic = spec.getOptionsOrDefault("topic", "");
291-
String subscription = spec.getOptionsOrDefault("subscription", "");
307+
String topic = spec.getSourceOptionsOrDefault("topic", "");
308+
String subscription = spec.getSourceOptionsOrDefault("subscription", "");
292309
if (topic.equals("") && subscription.equals("")) {
293310
throw new IllegalArgumentException(
294311
"Pubsub ingestion requires either topic or subscription");
@@ -301,11 +318,12 @@ private void checkPubSubImportSpecOption(ImportSpec spec) throws IllegalArgument
301318

302319
private void checkBigqueryImportSpecOption(ImportSpec spec) throws IllegalArgumentException {
303320
try {
304-
checkArgument(!spec.getOptionsOrThrow("project").equals(""),
321+
checkArgument(!spec.getSourceOptionsOrThrow("project").equals(""),
305322
"Bigquery project cannot be empty");
306-
checkArgument(!spec.getOptionsOrThrow("dataset").equals(""),
323+
checkArgument(!spec.getSourceOptionsOrThrow("dataset").equals(""),
307324
"Bigquery dataset cannot be empty");
308-
checkArgument(!spec.getOptionsOrThrow("table").equals(""), "Bigquery table cannot be empty");
325+
checkArgument(!spec.getSourceOptionsOrThrow("table").equals(""),
326+
"Bigquery table cannot be empty");
309327
} catch (NullPointerException | IllegalArgumentException e) {
310328
throw new IllegalArgumentException(
311329
Strings.lenientFormat("Invalid options: %s", e.getMessage()));

core/src/test/java/feast/core/job/ScheduledJobMonitorTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public void getJobStatus_shouldUpdateJobInfoForRunningJob() {
6666
"Streaming",
6767
"DataflowRunner",
6868
"",
69+
"",
6970
Collections.emptyList(),
7071
Collections.emptyList(),
7172
Collections.emptyList(),
@@ -104,6 +105,7 @@ public void getJobMetrics_shouldPushToStatsDMetricPusherAndSaveNewMetricToDb() {
104105
"Streaming",
105106
"DataflowRunner",
106107
"",
108+
"",
107109
Collections.emptyList(),
108110
Collections.emptyList(),
109111
Collections.emptyList(),

core/src/test/java/feast/core/model/JobInfoTest.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,8 @@ public void shouldInitialiseGivenJobIdAndSpec() throws InvalidProtocolBufferExce
4141
.build();
4242

4343
ImportSpecProto.ImportSpec importSpec = ImportSpecProto.ImportSpec.newBuilder()
44-
.setType("file")
45-
.putOptions("format", "csv")
46-
.putOptions("path", "gs://some/path")
44+
.setType("file.csv")
45+
.putSourceOptions("path", "gs://some/path")
4746
.addEntities("entity")
4847
.setSchema(schema)
4948
.build();
@@ -52,9 +51,9 @@ public void shouldInitialiseGivenJobIdAndSpec() throws InvalidProtocolBufferExce
5251
JobInfo expected = new JobInfo();
5352
expected.setId("fake-job-id");
5453
expected.setExtId("fake-ext-id");
55-
expected.setType("file");
54+
expected.setType("file.csv");
5655
expected.setRunner("DataflowRunner");
57-
expected.setOptions(TypeConversion.convertMapToJsonString(importSpec.getOptionsMap()));
56+
expected.setSourceOptions(TypeConversion.convertMapToJsonString(importSpec.getSourceOptionsMap()));
5857

5958
List<EntityInfo> entities = new ArrayList<>();
6059
EntityInfo entityInfo = new EntityInfo();
@@ -75,7 +74,7 @@ public void shouldInitialiseGivenJobIdAndSpec() throws InvalidProtocolBufferExce
7574
assertThat(actual.getRunner(), equalTo(expected.getRunner()));
7675
assertThat(actual.getEntities(), equalTo(expected.getEntities()));
7776
assertThat(actual.getFeatures(), equalTo(expected.getFeatures()));
78-
assertThat(actual.getOptions(), equalTo(expected.getOptions()));
77+
assertThat(actual.getSourceOptions(), equalTo(expected.getSourceOptions()));
7978
assertThat(actual.getRaw(), equalTo(expected.getRaw()));
8079
}
8180
}

core/src/test/java/feast/core/service/JobManagementServiceTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public void shouldListAllJobDetails() {
7070
"",
7171
"",
7272
"",
73+
"",
7374
Collections.emptyList(),
7475
Collections.emptyList(),
7576
Collections.emptyList(),
@@ -84,6 +85,7 @@ public void shouldListAllJobDetails() {
8485
"",
8586
"",
8687
"",
88+
"",
8789
Collections.emptyList(),
8890
Collections.emptyList(),
8991
Collections.emptyList(),
@@ -121,6 +123,7 @@ public void shouldReturnDetailOfRequestedJobId() {
121123
"",
122124
"",
123125
"",
126+
"",
124127
Collections.emptyList(),
125128
Collections.emptyList(),
126129
Collections.emptyList(),

core/src/test/java/feast/core/validators/SpecValidatorTest.java

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -709,7 +709,7 @@ public void fileImportSpecWithoutEntityIdColumnInSchemaShouldThrowIllegalArgumen
709709
ImportSpec input =
710710
ImportSpec.newBuilder()
711711
.setType("file.csv")
712-
.putOptions("path", "gs://asdasd")
712+
.putSourceOptions("path", "gs://asdasd")
713713
.build();
714714
exception.expect(IllegalArgumentException.class);
715715
exception.expectMessage(
@@ -728,9 +728,9 @@ public void bigQueryImportSpecWithoutEntityIdColumnInSchemaShouldThrowIllegalArg
728728
ImportSpec input =
729729
ImportSpec.newBuilder()
730730
.setType("bigquery")
731-
.putOptions("project", "my-google-project")
732-
.putOptions("dataset", "feast")
733-
.putOptions("table", "feast")
731+
.putSourceOptions("project", "my-google-project")
732+
.putSourceOptions("dataset", "feast")
733+
.putSourceOptions("table", "feast")
734734
.build();
735735
exception.expect(IllegalArgumentException.class);
736736
exception.expectMessage(
@@ -750,7 +750,7 @@ public void importSpecWithoutValidEntityShouldThrowIllegalArgumentException() {
750750
ImportSpec input =
751751
ImportSpec.newBuilder()
752752
.setType("pubsub")
753-
.putOptions("topic", "my/pubsub/topic")
753+
.putSourceOptions("topic", "my/pubsub/topic")
754754
.addEntities("someEntity")
755755
.build();
756756
exception.expect(IllegalArgumentException.class);
@@ -775,7 +775,7 @@ public void importSpecWithUnregisteredFeaturesShouldThrowIllegalArgumentExceptio
775775
ImportSpec input =
776776
ImportSpec.newBuilder()
777777
.setType("pubsub")
778-
.putOptions("topic", "my/pubsub/topic")
778+
.putSourceOptions("topic", "my/pubsub/topic")
779779
.setSchema(schema)
780780
.addEntities("someEntity")
781781
.build();
@@ -802,8 +802,63 @@ public void importSpecWithKafkaSourceAndCorrectOptionsShouldPassValidation() {
802802
ImportSpec input =
803803
ImportSpec.newBuilder()
804804
.setType("kafka")
805-
.putOptions("topics", "my-kafka-topic")
806-
.putOptions("server", "localhost:54321")
805+
.putSourceOptions("topics", "my-kafka-topic")
806+
.putSourceOptions("server", "localhost:54321")
807+
.setSchema(schema)
808+
.addEntities("someEntity")
809+
.build();
810+
validator.validateImportSpec(input);
811+
}
812+
813+
@Test
814+
public void importSpecWithCoalesceJobOptionsShouldPassValidation() {
815+
SpecValidator validator =
816+
new SpecValidator(
817+
storageInfoRepository,
818+
entityInfoRepository,
819+
featureGroupInfoRepository,
820+
featureInfoRepository);
821+
when(featureInfoRepository.existsById("some_existing_feature")).thenReturn(true);
822+
when(entityInfoRepository.existsById("someEntity")).thenReturn(true);
823+
Schema schema =
824+
Schema.newBuilder()
825+
.addFields(Field.newBuilder().setFeatureId("some_existing_feature").build())
826+
.build();
827+
ImportSpec input =
828+
ImportSpec.newBuilder()
829+
.setType("kafka")
830+
.putSourceOptions("topics", "my-kafka-topic")
831+
.putSourceOptions("server", "localhost:54321")
832+
.putJobOptions("coalesceRows.enabled", "true")
833+
.putJobOptions("coalesceRows.delaySeconds", "10000")
834+
.putJobOptions("coalesceRows.timeoutSeconds", "20000")
835+
.putJobOptions("sample.limit", "1000")
836+
.setSchema(schema)
837+
.addEntities("someEntity")
838+
.build();
839+
validator.validateImportSpec(input);
840+
}
841+
842+
@Test
843+
public void importSpecWithLimitJobOptionsShouldPassValidation() {
844+
SpecValidator validator =
845+
new SpecValidator(
846+
storageInfoRepository,
847+
entityInfoRepository,
848+
featureGroupInfoRepository,
849+
featureInfoRepository);
850+
when(featureInfoRepository.existsById("some_existing_feature")).thenReturn(true);
851+
when(entityInfoRepository.existsById("someEntity")).thenReturn(true);
852+
Schema schema =
853+
Schema.newBuilder()
854+
.addFields(Field.newBuilder().setFeatureId("some_existing_feature").build())
855+
.build();
856+
ImportSpec input =
857+
ImportSpec.newBuilder()
858+
.setType("kafka")
859+
.putSourceOptions("topics", "my-kafka-topic")
860+
.putSourceOptions("server", "localhost:54321")
861+
.putJobOptions("sample.limit", "1000")
807862
.setSchema(schema)
808863
.addEntities("someEntity")
809864
.build();

0 commit comments

Comments
 (0)