Skip to content

Commit 18cbad5

Browse files
committed
During job expand, prefetch all entity and feature specs listed in importSpec and fetch their relevant storage spec, then validate
1 parent fb7a27d commit 18cbad5

File tree

15 files changed

+471
-407
lines changed

15 files changed

+471
-407
lines changed

ingestion/src/main/java/feast/ingestion/ImportJob.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,21 @@
2828
import feast.ingestion.config.ImportSpecSupplier;
2929
import feast.ingestion.model.Specs;
3030
import feast.ingestion.options.ImportJobOptions;
31-
import feast.ingestion.transform.*;
31+
import feast.ingestion.transform.ErrorsStoreTransform;
32+
import feast.ingestion.transform.ReadFeaturesTransform;
33+
import feast.ingestion.transform.ServingStoreTransform;
34+
import feast.ingestion.transform.ToFeatureRowExtended;
35+
import feast.ingestion.transform.ValidateTransform;
36+
import feast.ingestion.transform.WarehouseStoreTransform;
3237
import feast.ingestion.transform.fn.ConvertTypesDoFn;
3338
import feast.ingestion.transform.fn.LoggerDoFn;
3439
import feast.ingestion.transform.fn.RoundEventTimestampsDoFn;
3540
import feast.ingestion.values.PFeatureRows;
3641
import feast.specs.ImportSpecProto.ImportSpec;
3742
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;
3843
import feast.types.FeatureRowProto.FeatureRow;
44+
import java.util.Arrays;
45+
import java.util.Random;
3946
import lombok.extern.slf4j.Slf4j;
4047
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
4148
import org.apache.beam.runners.dataflow.DataflowRunner;
@@ -60,9 +67,6 @@
6067
import org.joda.time.Duration;
6168
import org.slf4j.event.Level;
6269

63-
import java.util.Arrays;
64-
import java.util.Random;
65-
6670
@Slf4j
6771
public class ImportJob {
6872
private static Random random = new Random(System.currentTimeMillis());
@@ -120,6 +124,13 @@ public static PipelineResult mainWithResult(String[] args) {
120124
return job.run();
121125
}
122126

127+
private static String generateName() {
128+
byte[] bytes = new byte[7];
129+
random.nextBytes(bytes);
130+
String randomHex = DigestUtils.sha1Hex(bytes).substring(0, 7);
131+
return String.format("feast-importjob-%s-%s", DateTime.now().getMillis(), randomHex);
132+
}
133+
123134
public void expand() {
124135
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
125136
coderRegistry.registerCoderForType(
@@ -134,6 +145,8 @@ public void expand() {
134145
// pass
135146
}
136147

148+
specs.validate();
149+
137150
PCollection<FeatureRow> features = pipeline.apply("Read", readFeaturesTransform);
138151
if (options.getLimit() != null && options.getLimit() > 0) {
139152
features = features.apply(Sample.any(options.getLimit()));
@@ -195,13 +208,6 @@ public void logNRows(PFeatureRows pFeatureRows, String name, int limit) {
195208
.apply("Log errors sample", ParDo.of(new LoggerDoFn(Level.ERROR, name + " ERRORS ")));
196209
}
197210

198-
private static String generateName() {
199-
byte[] bytes = new byte[7];
200-
random.nextBytes(bytes);
201-
String randomHex = DigestUtils.sha1Hex(bytes).substring(0, 7);
202-
return String.format("feast-importjob-%s-%s", DateTime.now().getMillis(), randomHex);
203-
}
204-
205211
private String retrieveId(PipelineResult result) {
206212
Class<? extends PipelineRunner<?>> runner = options.getRunner();
207213
if (runner.isAssignableFrom(DataflowRunner.class)) {

ingestion/src/main/java/feast/ingestion/boot/ImportJobModule.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,11 @@
2020
import com.google.inject.AbstractModule;
2121
import com.google.inject.Provides;
2222
import com.google.inject.Singleton;
23-
import java.util.List;
24-
import org.apache.beam.sdk.options.PipelineOptions;
2523
import feast.ingestion.model.Specs;
26-
import feast.ingestion.model.SpecsImpl;
2724
import feast.ingestion.options.ImportJobOptions;
28-
import feast.ingestion.service.CachedSpecService;
2925
import feast.ingestion.service.CoreSpecService;
3026
import feast.ingestion.service.FileSpecService;
27+
import feast.ingestion.service.SpecService;
3128
import feast.ingestion.service.SpecService.Builder;
3229
import feast.ingestion.service.SpecService.UnsupportedBuilder;
3330
import feast.specs.ImportSpecProto.ImportSpec;
@@ -37,6 +34,8 @@
3734
import feast.storage.service.ErrorsStoreService;
3835
import feast.storage.service.ServingStoreService;
3936
import feast.storage.service.WarehouseStoreService;
37+
import java.util.List;
38+
import org.apache.beam.sdk.options.PipelineOptions;
4039

4140
/** An ImportJobModule is a Guice module for creating dependency injection bindings. */
4241
public class ImportJobModule extends AbstractModule {
@@ -54,23 +53,27 @@ protected void configure() {
5453
bind(ImportJobOptions.class).toInstance(options);
5554
bind(PipelineOptions.class).toInstance(options);
5655
bind(ImportSpec.class).toInstance(importSpec);
57-
bind(Specs.class).to(SpecsImpl.class);
5856
}
5957

6058
@Provides
6159
@Singleton
6260
Builder provideSpecService(ImportJobOptions options) {
6361
if (options.getCoreApiUri() != null) {
64-
return new CachedSpecService.Builder(new CoreSpecService.Builder(options.getCoreApiUri()));
62+
return new CoreSpecService.Builder(options.getCoreApiUri());
6563
} else if (options.getCoreApiSpecPath() != null) {
66-
return new CachedSpecService.Builder(
67-
new FileSpecService.Builder(options.getCoreApiSpecPath()));
64+
return new FileSpecService.Builder(options.getCoreApiSpecPath());
6865
} else {
6966
return new UnsupportedBuilder(
7067
"Cannot initialise spec service as coreApiHost or specPath was not set.");
7168
}
7269
}
7370

71+
@Provides
72+
@Singleton
73+
Specs provideSpecs(SpecService.Builder specService) {
74+
return Specs.of(options.getJobName(), importSpec, specService.build());
75+
}
76+
7477
@Provides
7578
@Singleton
7679
List<WarehouseStore> provideWarehouseStores() {

ingestion/src/main/java/feast/ingestion/model/Specs.java

Lines changed: 112 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,127 @@
1717

1818
package feast.ingestion.model;
1919

20-
import java.io.Serializable;
21-
import java.util.List;
22-
import java.util.Map;
23-
import feast.ingestion.service.SpecRetrievalException;
20+
import com.google.common.base.Preconditions;
21+
import feast.ingestion.service.SpecService;
2422
import feast.specs.EntitySpecProto.EntitySpec;
2523
import feast.specs.FeatureSpecProto.FeatureSpec;
24+
import feast.specs.ImportSpecProto.Field;
2625
import feast.specs.ImportSpecProto.ImportSpec;
2726
import feast.specs.StorageSpecProto.StorageSpec;
27+
import java.io.Serializable;
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
import java.util.Map;
31+
import java.util.Map.Entry;
32+
import lombok.Builder;
33+
import lombok.Getter;
34+
35+
@Builder
36+
@Getter
37+
public class Specs implements Serializable {
38+
private String jobName;
39+
private ImportSpec importSpec;
40+
private Map<String, EntitySpec> entitySpecs;
41+
private Map<String, FeatureSpec> featureSpecs;
42+
private Map<String, StorageSpec> storageSpecs;
43+
private transient SpecService specService;
44+
private RuntimeException error;
45+
46+
public static Specs of(String jobName, ImportSpec importSpec, SpecService specService) {
47+
try {
48+
Specs.SpecsBuilder specsBuilder = Specs.builder().jobName(jobName).importSpec(importSpec);
49+
50+
List<Field> fields = importSpec.getSchema().getFieldsList();
51+
List<String> featureIds = new ArrayList<>();
52+
for (Field field : fields) {
53+
if (!field.getFeatureId().isEmpty()) {
54+
featureIds.add(field.getFeatureId());
55+
}
56+
}
57+
specsBuilder.featureSpecs(specService.getFeatureSpecs(featureIds));
58+
59+
List<String> entityNames = importSpec.getEntitiesList();
60+
for (FeatureSpec featureSpec : specsBuilder.featureSpecs.values()) {
61+
Preconditions.checkArgument(
62+
entityNames.contains(featureSpec.getEntity()),
63+
"Feature has entity not listed in import spec featureSpec=" + featureSpec.toString());
64+
}
65+
specsBuilder.entitySpecs(specService.getEntitySpecs(entityNames));
66+
67+
specsBuilder.storageSpecs(specService.getAllStorageSpecs());
68+
69+
return specsBuilder.build();
70+
} catch (RuntimeException e) {
71+
return Specs.builder().error(e).build();
72+
}
73+
}
2874

29-
public interface Specs extends Serializable {
30-
FeatureSpec getFeatureSpec(String featureId);
75+
public void validate() {
76+
if (error != null) {
77+
throw error;
78+
}
3179

32-
List<FeatureSpec> getFeatureSpecByServingStoreId(String storeId) throws SpecRetrievalException;
80+
// Sanity checks that our maps are built correctly
81+
for (Entry<String, FeatureSpec> entry : featureSpecs.entrySet()) {
82+
Preconditions.checkArgument(entry.getKey().equals(entry.getValue().getId()));
83+
}
84+
for (Entry<String, EntitySpec> entry : entitySpecs.entrySet()) {
85+
Preconditions.checkArgument(entry.getKey().equals(entry.getValue().getName()));
86+
}
87+
for (Entry<String, StorageSpec> entry : storageSpecs.entrySet()) {
88+
Preconditions.checkArgument(entry.getKey().equals(entry.getValue().getId()));
89+
}
3390

34-
EntitySpec getEntitySpec(String entityName) throws SpecRetrievalException;
91+
for (FeatureSpec featureSpec : featureSpecs.values()) {
92+
// Check that feature has a matching entity
93+
Preconditions.checkArgument(
94+
entitySpecs.containsKey(featureSpec.getEntity()),
95+
String.format(
96+
"Feature %s references unknown entity %s",
97+
featureSpec.getId(), featureSpec.getEntity()));
98+
// Check that feature has a matching serving store
99+
Preconditions.checkArgument(
100+
storageSpecs.containsKey(featureSpec.getDataStores().getServing().getId()),
101+
String.format(
102+
"Feature %s references unknown serving store %s",
103+
featureSpec.getId(), featureSpec.getDataStores().getServing().getId()));
104+
// Check that feature has a matching warehouse store
105+
Preconditions.checkArgument(
106+
storageSpecs.containsKey(featureSpec.getDataStores().getWarehouse().getId()),
107+
String.format(
108+
"Feature %s references unknown warehouse store %s",
109+
featureSpec.getId(), featureSpec.getDataStores().getWarehouse().getId()));
110+
}
111+
}
35112

36-
ImportSpec getImportSpec() throws SpecRetrievalException;
113+
public EntitySpec getEntitySpec(String entityName) {
114+
Preconditions.checkArgument(
115+
entitySpecs.containsKey(entityName),
116+
String.format("Unknown entity %s, spec was not initialized", entityName));
117+
return entitySpecs.get(entityName);
118+
}
37119

38-
Map<String, StorageSpec> getStorageSpecs() throws SpecRetrievalException;
120+
public FeatureSpec getFeatureSpec(String featureId) {
121+
Preconditions.checkArgument(
122+
featureSpecs.containsKey(featureId),
123+
String.format("Unknown feature %s, spec was not initialized", featureId));
124+
return featureSpecs.get(featureId);
125+
}
39126

40-
StorageSpec getStorageSpec(String storeId);
127+
public List<FeatureSpec> getFeatureSpecByServingStoreId(String storeId) {
128+
List<FeatureSpec> out = new ArrayList<>();
129+
for (FeatureSpec featureSpec : featureSpecs.values()) {
130+
if (featureSpec.getDataStores().getServing().getId().equals(storeId)) {
131+
out.add(featureSpec);
132+
}
133+
}
134+
return out;
135+
}
41136

42-
String getJobName();
137+
public StorageSpec getStorageSpec(String storeId) {
138+
Preconditions.checkArgument(
139+
storageSpecs.containsKey(storeId),
140+
String.format("Unknown store %s, spec was not initialized", storeId));
141+
return storageSpecs.get(storeId);
142+
}
43143
}

ingestion/src/main/java/feast/ingestion/model/SpecsImpl.java

Lines changed: 0 additions & 103 deletions
This file was deleted.

0 commit comments

Comments
 (0)