Skip to content

Commit e4f8fe9

Browse files
author
Chen Zhiling
authored
Add general storage API and refactor existing store implementations (#567)
* Add storage interfaces, basic file structure (#529) * Add storage interfaces, basic file structure * Apply spotless, add comments * Move parseResponse and isEmpty to response object * Make changes to write interface to be more beam-like * Pass feature specs to the retriever * Pass feature specs to online retriever * Add FeatureSetRequest * Add mistakenly removed TestUtil * Add mistakenly removed TestUtil * Add BigQuery storage (#546) * Add Redis storage implementation (#547) * Add Redis storage * Remove staleness check; can be checked at the service level * Remove staleness related tests * Add dependencies to top level pom * Clean up code * Change serving and ingestion to use storage API (#553) * Change serving and ingestion to use storage API * Remove extra exclusion clause * Storage refactor API and docstring tweaks (#569) * API and docstring tweaks * Fix javadoc linting errors * Apply spotless * Fix javadoc formatting * Drop result from HistoricalRetrievalResult constructors * Change pipeline to use DeadletterSink API (#586) * Add better code docs to storage refactor (#601) * Add better code documentation, make GetFeastServingInfo independent of retriever * Make getStagingLocation method of historical retriever * Apply spotless * Clean up dependencies, remove exclusions at serving (#607) * Clean up OnlineServingService code (#605) * Clean up OnlineServingService code to be more readable * Revert Metrics * Rename storage API packages to nouns
1 parent 9139fe3 commit e4f8fe9

71 files changed

Lines changed: 3711 additions & 2711 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

ingestion/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,24 @@
101101
<version>${project.version}</version>
102102
</dependency>
103103

104+
<dependency>
105+
<groupId>dev.feast</groupId>
106+
<artifactId>feast-storage-api</artifactId>
107+
<version>${project.version}</version>
108+
</dependency>
109+
110+
<dependency>
111+
<groupId>dev.feast</groupId>
112+
<artifactId>feast-storage-connector-redis</artifactId>
113+
<version>${project.version}</version>
114+
</dependency>
115+
116+
<dependency>
117+
<groupId>dev.feast</groupId>
118+
<artifactId>feast-storage-connector-bigquery</artifactId>
119+
<version>${project.version}</version>
120+
</dependency>
121+
104122
<dependency>
105123
<groupId>com.google.auto.value</groupId>
106124
<artifactId>auto-value-annotations</artifactId>

ingestion/src/main/java/feast/ingestion/ImportJob.java

Lines changed: 39 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,26 @@
1717
package feast.ingestion;
1818

1919
import static feast.ingestion.utils.SpecUtil.getFeatureSetReference;
20+
import static feast.ingestion.utils.StoreUtil.getFeatureSink;
2021

2122
import com.google.protobuf.InvalidProtocolBufferException;
2223
import feast.core.FeatureSetProto.FeatureSet;
24+
import feast.core.FeatureSetProto.FeatureSetSpec;
2325
import feast.core.SourceProto.Source;
2426
import feast.core.StoreProto.Store;
2527
import feast.ingestion.options.BZip2Decompressor;
2628
import feast.ingestion.options.ImportOptions;
2729
import feast.ingestion.options.StringListStreamConverter;
2830
import feast.ingestion.transform.ReadFromSource;
2931
import feast.ingestion.transform.ValidateFeatureRows;
30-
import feast.ingestion.transform.WriteFailedElementToBigQuery;
31-
import feast.ingestion.transform.WriteToStore;
32-
import feast.ingestion.transform.metrics.WriteMetricsTransform;
33-
import feast.ingestion.utils.ResourceUtil;
32+
import feast.ingestion.transform.metrics.WriteFailureMetricsTransform;
33+
import feast.ingestion.transform.metrics.WriteSuccessMetricsTransform;
3434
import feast.ingestion.utils.SpecUtil;
35-
import feast.ingestion.utils.StoreUtil;
36-
import feast.ingestion.values.FailedElement;
35+
import feast.storage.api.writer.DeadletterSink;
36+
import feast.storage.api.writer.FailedElement;
37+
import feast.storage.api.writer.FeatureSink;
38+
import feast.storage.api.writer.WriteResult;
39+
import feast.storage.connectors.bigquery.writer.BigQueryDeadletterSink;
3740
import feast.types.FeatureRowProto.FeatureRow;
3841
import java.io.IOException;
3942
import java.util.HashMap;
@@ -93,17 +96,24 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
9396
SpecUtil.getSubscribedFeatureSets(store.getSubscriptionsList(), featureSets);
9497

9598
// Generate tags by key
96-
Map<String, FeatureSet> featureSetsByKey = new HashMap<>();
99+
Map<String, FeatureSetSpec> featureSetSpecsByKey = new HashMap<>();
97100
subscribedFeatureSets.stream()
98101
.forEach(
99102
fs -> {
100-
String ref = getFeatureSetReference(fs);
101-
featureSetsByKey.put(ref, fs);
103+
String ref = getFeatureSetReference(fs.getSpec());
104+
featureSetSpecsByKey.put(ref, fs.getSpec());
102105
});
103106

107+
FeatureSink featureSink = getFeatureSink(store, featureSetSpecsByKey);
108+
104109
// TODO: make the source part of the job initialisation options
105110
Source source = subscribedFeatureSets.get(0).getSpec().getSource();
106111

112+
for (FeatureSet featureSet : subscribedFeatureSets) {
113+
// Ensure Store has valid configuration and Feast can access it.
114+
featureSink.prepareWrite(featureSet);
115+
}
116+
107117
// Step 1. Read messages from Feast Source as FeatureRow.
108118
PCollectionTuple convertedFeatureRows =
109119
pipeline.apply(
@@ -114,58 +124,48 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
114124
.setFailureTag(DEADLETTER_OUT)
115125
.build());
116126

117-
for (FeatureSet featureSet : subscribedFeatureSets) {
118-
// Ensure Store has valid configuration and Feast can access it.
119-
StoreUtil.setupStore(store, featureSet);
120-
}
121-
122127
// Step 2. Validate incoming FeatureRows
123128
PCollectionTuple validatedRows =
124129
convertedFeatureRows
125130
.get(FEATURE_ROW_OUT)
126131
.apply(
127132
ValidateFeatureRows.newBuilder()
128-
.setFeatureSets(featureSetsByKey)
133+
.setFeatureSetSpecs(featureSetSpecsByKey)
129134
.setSuccessTag(FEATURE_ROW_OUT)
130135
.setFailureTag(DEADLETTER_OUT)
131136
.build());
132137

133138
// Step 3. Write FeatureRow to the corresponding Store.
134-
validatedRows
135-
.get(FEATURE_ROW_OUT)
136-
.apply(
137-
"WriteFeatureRowToStore",
138-
WriteToStore.newBuilder().setFeatureSets(featureSetsByKey).setStore(store).build());
139+
WriteResult writeFeatureRows =
140+
validatedRows.get(FEATURE_ROW_OUT).apply("WriteFeatureRowToStore", featureSink.writer());
139141

140142
// Step 4. Write FailedElements to a dead letter table in BigQuery.
141143
if (options.getDeadLetterTableSpec() != null) {
144+
// TODO: make deadletter destination type configurable
145+
DeadletterSink deadletterSink =
146+
new BigQueryDeadletterSink(options.getDeadLetterTableSpec());
147+
142148
convertedFeatureRows
143149
.get(DEADLETTER_OUT)
144-
.apply(
145-
"WriteFailedElements_ReadFromSource",
146-
WriteFailedElementToBigQuery.newBuilder()
147-
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
148-
.setTableSpec(options.getDeadLetterTableSpec())
149-
.build());
150+
.apply("WriteFailedElements_ReadFromSource", deadletterSink.write());
150151

151152
validatedRows
152153
.get(DEADLETTER_OUT)
153-
.apply(
154-
"WriteFailedElements_ValidateRows",
155-
WriteFailedElementToBigQuery.newBuilder()
156-
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
157-
.setTableSpec(options.getDeadLetterTableSpec())
158-
.build());
154+
.apply("WriteFailedElements_ValidateRows", deadletterSink.write());
155+
156+
writeFeatureRows
157+
.getFailedInserts()
158+
.apply("WriteFailedElements_WriteFeatureRowToStore", deadletterSink.write());
159159
}
160160

161161
// Step 5. Write metrics to a metrics sink.
162-
validatedRows.apply(
163-
"WriteMetrics",
164-
WriteMetricsTransform.newBuilder()
165-
.setStoreName(store.getName())
166-
.setSuccessTag(FEATURE_ROW_OUT)
167-
.setFailureTag(DEADLETTER_OUT)
168-
.build());
162+
writeFeatureRows
163+
.getSuccessfulInserts()
164+
.apply("WriteSuccessMetrics", WriteSuccessMetricsTransform.create(store.getName()));
165+
166+
writeFeatureRows
167+
.getFailedInserts()
168+
.apply("WriteFailureMetrics", WriteFailureMetricsTransform.create(store.getName()));
169169
}
170170

171171
return pipeline.run();

ingestion/src/main/java/feast/ingestion/transform/ReadFromSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import feast.core.SourceProto.Source;
2222
import feast.core.SourceProto.SourceType;
2323
import feast.ingestion.transform.fn.KafkaRecordToFeatureRowDoFn;
24-
import feast.ingestion.values.FailedElement;
24+
import feast.storage.api.writer.FailedElement;
2525
import feast.types.FeatureRowProto.FeatureRow;
2626
import org.apache.beam.sdk.io.kafka.KafkaIO;
2727
import org.apache.beam.sdk.transforms.PTransform;

ingestion/src/main/java/feast/ingestion/transform/ValidateFeatureRows.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
import com.google.auto.value.AutoValue;
2020
import feast.core.FeatureSetProto;
2121
import feast.ingestion.transform.fn.ValidateFeatureRowDoFn;
22-
import feast.ingestion.values.FailedElement;
2322
import feast.ingestion.values.FeatureSet;
23+
import feast.storage.api.writer.FailedElement;
2424
import feast.types.FeatureRowProto.FeatureRow;
2525
import java.util.Map;
2626
import java.util.stream.Collectors;
@@ -36,7 +36,7 @@
3636
public abstract class ValidateFeatureRows
3737
extends PTransform<PCollection<FeatureRow>, PCollectionTuple> {
3838

39-
public abstract Map<String, FeatureSetProto.FeatureSet> getFeatureSets();
39+
public abstract Map<String, FeatureSetProto.FeatureSetSpec> getFeatureSetSpecs();
4040

4141
public abstract TupleTag<FeatureRow> getSuccessTag();
4242

@@ -49,7 +49,8 @@ public static Builder newBuilder() {
4949
@AutoValue.Builder
5050
public abstract static class Builder {
5151

52-
public abstract Builder setFeatureSets(Map<String, FeatureSetProto.FeatureSet> featureSets);
52+
public abstract Builder setFeatureSetSpecs(
53+
Map<String, FeatureSetProto.FeatureSetSpec> featureSets);
5354

5455
public abstract Builder setSuccessTag(TupleTag<FeatureRow> successTag);
5556

@@ -62,7 +63,7 @@ public abstract static class Builder {
6263
public PCollectionTuple expand(PCollection<FeatureRow> input) {
6364

6465
Map<String, FeatureSet> featureSets =
65-
getFeatureSets().entrySet().stream()
66+
getFeatureSetSpecs().entrySet().stream()
6667
.map(e -> Pair.of(e.getKey(), new FeatureSet(e.getValue())))
6768
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
6869

ingestion/src/main/java/feast/ingestion/transform/WriteFailedElementToBigQuery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import com.google.api.services.bigquery.model.TableRow;
2020
import com.google.auto.value.AutoValue;
21-
import feast.ingestion.values.FailedElement;
21+
import feast.storage.api.writer.FailedElement;
2222
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
2323
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
2424
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;

ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java

Lines changed: 0 additions & 168 deletions
This file was deleted.

ingestion/src/main/java/feast/ingestion/transform/fn/KafkaRecordToFeatureRowDoFn.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818

1919
import com.google.auto.value.AutoValue;
2020
import com.google.protobuf.InvalidProtocolBufferException;
21-
import feast.ingestion.transform.ReadFromSource.Builder;
22-
import feast.ingestion.values.FailedElement;
21+
import feast.storage.api.writer.FailedElement;
2322
import feast.types.FeatureRowProto.FeatureRow;
2423
import java.util.Base64;
2524
import org.apache.beam.sdk.io.kafka.KafkaRecord;

0 commit comments

Comments
 (0)