Skip to content

Commit 558453a

Browse files
Oleksii MoskalenkoOleksii Moskalenko
andauthored
Add IngestionId & EventTimestamp to FeatureRowBatch to calculate lag metric correctly (#874)
* ingestionId & eventTimestamp in FeatureRowBatch * refactor idx operations using schema * dummy Co-authored-by: Oleksii Moskalenko <oleksii.moskalenko@go-jek.com>
1 parent 25ff687 commit 558453a

2 files changed

Lines changed: 78 additions & 20 deletions

File tree

storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/compression/FeatureRowsBatch.java

Lines changed: 69 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import static feast.proto.types.ValueProto.Value.ValCase.*;
2020
import static feast.storage.connectors.bigquery.common.TypeUtil.*;
2121

22+
import com.google.common.collect.ImmutableList;
23+
import com.google.protobuf.Timestamp;
2224
import feast.proto.types.FeatureRowProto;
2325
import feast.proto.types.FieldProto;
2426
import feast.proto.types.ValueProto;
@@ -40,6 +42,8 @@
4042
* <p>getFeatureRows provides reverse transformation
4143
*/
4244
public class FeatureRowsBatch implements Serializable {
45+
public static final ImmutableList<String> SERVICE_FIELDS =
46+
ImmutableList.of("eventTimestamp", "ingestionId");
4347
private final Schema schema;
4448
private String featureSetReference;
4549
private List<Object> values = new ArrayList<>();
@@ -118,6 +122,12 @@ private Schema inferCommonSchema(Iterable<FeatureRowProto.FeatureRow> featureRow
118122
featureSetReference = row.getFeatureSet();
119123
}
120124
}));
125+
126+
fieldsInOrder.add(
127+
Schema.Field.of("eventTimestamp", Schema.FieldType.array(Schema.FieldType.INT64)));
128+
fieldsInOrder.add(
129+
Schema.Field.of("ingestionId", Schema.FieldType.array(Schema.FieldType.STRING)));
130+
121131
Schema schema = Schema.builder().addFields(fieldsInOrder).build();
122132
schema.setUUID(UUID.randomUUID());
123133
return schema;
@@ -132,16 +142,33 @@ private void initValues() {
132142
}
133143

134144
private void toColumnar(Iterable<FeatureRowProto.FeatureRow> featureRows) {
145+
int timestampColumnIdx = schema.indexOf("eventTimestamp");
146+
int ingestionIdColumnIdx = schema.indexOf("ingestionId");
147+
135148
featureRows.forEach(
136149
row -> {
137-
Map<String, ValueProto.Value> rowValues =
138-
row.getFieldsList().stream()
139-
.collect(Collectors.toMap(FieldProto.Field::getName, FieldProto.Field::getValue));
150+
Map<String, ValueProto.Value> rowValues;
151+
try {
152+
rowValues =
153+
row.getFieldsList().stream()
154+
.collect(
155+
Collectors.toMap(FieldProto.Field::getName, FieldProto.Field::getValue));
156+
} catch (IllegalStateException e) {
157+
// row contains feature duplicates
158+
// omitting for now
159+
return;
160+
}
140161

141-
IntStream.range(0, schema.getFieldCount())
162+
schema
163+
.getFieldNames()
142164
.forEach(
143-
idx -> {
144-
Schema.Field field = schema.getField(idx);
165+
fieldName -> {
166+
if (SERVICE_FIELDS.contains(fieldName)) {
167+
return;
168+
}
169+
Schema.Field field = schema.getField(fieldName);
170+
int idx = schema.indexOf(fieldName);
171+
145172
if (rowValues.containsKey(field.getName())) {
146173
Object o = protoValueToObject(rowValues.get(field.getName()));
147174
if (o != null) {
@@ -152,6 +179,10 @@ private void toColumnar(Iterable<FeatureRowProto.FeatureRow> featureRows) {
152179

153180
((List<Object>) values.get(idx)).add(defaultValues.get(field.getName()));
154181
});
182+
183+
// adding service fields
184+
((List<Object>) values.get(timestampColumnIdx)).add(row.getEventTimestamp().getSeconds());
185+
((List<Object>) values.get(ingestionIdColumnIdx)).add(row.getIngestionId());
155186
});
156187
}
157188

@@ -177,27 +208,45 @@ public static FeatureRowsBatch fromRow(Row row) {
177208
}
178209

179210
public Iterator<FeatureRowProto.FeatureRow> getFeatureRows() {
211+
int timestampColumnIdx = schema.indexOf("eventTimestamp");
212+
int ingestionIdColumnIdx = schema.indexOf("ingestionId");
213+
180214
return IntStream.range(0, ((List<Object>) values.get(0)).size())
181215
.parallel()
182216
.mapToObj(
183217
rowIdx ->
184218
FeatureRowProto.FeatureRow.newBuilder()
185219
.setFeatureSet(getFeatureSetReference())
220+
.setEventTimestamp(
221+
Timestamp.newBuilder()
222+
.setSeconds(
223+
(long)
224+
(((List<Object>) values.get(timestampColumnIdx)).get(rowIdx)))
225+
.build())
226+
.setIngestionId(
227+
(String) (((List<Object>) values.get(ingestionIdColumnIdx)).get(rowIdx)))
186228
.addAllFields(
187-
IntStream.range(0, schema.getFieldCount())
188-
.mapToObj(
189-
fieldIdx ->
190-
FieldProto.Field.newBuilder()
191-
.setName(schema.getField(fieldIdx).getName())
192-
.setValue(
193-
objectToProtoValue(
194-
((List<Object>) values.get(fieldIdx)).get(rowIdx),
195-
schemaToProtoTypes.get(
196-
schema
197-
.getField(fieldIdx)
198-
.getType()
199-
.getCollectionElementType())))
200-
.build())
229+
schema.getFieldNames().stream()
230+
.map(
231+
fieldName -> {
232+
if (SERVICE_FIELDS.contains(fieldName)) {
233+
return null;
234+
}
235+
int fieldIdx = schema.indexOf(fieldName);
236+
237+
return FieldProto.Field.newBuilder()
238+
.setName(schema.getField(fieldIdx).getName())
239+
.setValue(
240+
objectToProtoValue(
241+
((List<Object>) values.get(fieldIdx)).get(rowIdx),
242+
schemaToProtoTypes.get(
243+
schema
244+
.getField(fieldIdx)
245+
.getType()
246+
.getCollectionElementType())))
247+
.build();
248+
})
249+
.filter(Objects::nonNull)
201250
.collect(Collectors.toList()))
202251
.build())
203252
.iterator();

storage/connectors/bigquery/src/test/java/feast/storage/connectors/bigquery/writer/BigQuerySinkTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,11 @@ private FeatureRow generateRow(String featureSet) {
124124
FeatureRow.Builder row =
125125
FeatureRow.newBuilder()
126126
.setFeatureSet(featureSet)
127+
.setEventTimestamp(
128+
com.google.protobuf.Timestamp.newBuilder()
129+
.setSeconds(System.currentTimeMillis() / 1000)
130+
.build())
131+
.setIngestionId("ingestion-id")
127132
.addFields(field("entity", rd.nextInt(), ValueProto.ValueType.Enum.INT64))
128133
.addFields(FieldProto.Field.newBuilder().setName("null_value").build());
129134

@@ -499,6 +504,8 @@ private List<FeatureRow> dropNullFeature(List<FeatureRow> input) {
499504
r ->
500505
FeatureRow.newBuilder()
501506
.setFeatureSet(r.getFeatureSet())
507+
.setIngestionId(r.getIngestionId())
508+
.setEventTimestamp(r.getEventTimestamp())
502509
.addAllFields(copyFieldsWithout(r, "null_value"))
503510
.build())
504511
.collect(Collectors.toList());
@@ -520,6 +527,8 @@ public static List<FeatureRow> sortFeaturesByName(List<FeatureRow> rows) {
520527

521528
return FeatureRow.newBuilder()
522529
.setFeatureSet(row.getFeatureSet())
530+
.setEventTimestamp(row.getEventTimestamp())
531+
.setIngestionId(row.getIngestionId())
523532
.addAllFields(fieldsList)
524533
.build();
525534
})

0 commit comments

Comments
 (0)