Skip to content

Commit 456c56b

Browse files
author
zhilingc
committed
Fix kafka2featureRow dofn test, catch feature rows with invalid feature set ids
1 parent 3082a75 commit 456c56b

2 files changed

Lines changed: 32 additions & 23 deletions

File tree

ingestion/src/main/java/feast/ingestion/transform/fn/KafkaRecordToFeatureRowDoFn.java

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
import org.apache.commons.lang3.exception.ExceptionUtils;
1616

1717
@AutoValue
18-
public abstract class KafkaRecordToFeatureRowDoFn extends DoFn<KafkaRecord<byte[], byte[]>, FeatureRow> {
18+
public abstract class KafkaRecordToFeatureRowDoFn extends
19+
DoFn<KafkaRecord<byte[], byte[]>, FeatureRow> {
1920

2021
public abstract String getFeatureSetName();
2122

@@ -70,30 +71,38 @@ public void processElement(ProcessContext context) {
7071
// If FeatureRow contains field names that do not exist as EntitySpec
7172
// or FeatureSpec in FeatureSetSpec, mark the FeatureRow as FailedElement.
7273
String error = null;
73-
for (FieldProto.Field field : featureRow.getFieldsList()) {
74-
if (!getFieldByName().containsKey(field.getName())) {
75-
error =
76-
String.format(
77-
"FeatureRow contains field '%s' which do not exists in FeatureSet '%s' version '%d'. Please check the FeatureRow data.",
78-
field.getName(), getFeatureSetName(), getFeatureSetVersion());
79-
break;
80-
}
81-
// If value is set in the FeatureRow, make sure the value type matches
82-
// that defined in FeatureSetSpec
83-
if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) {
84-
int expectedTypeFieldNumber =
85-
getFieldByName().get(field.getName()).getType().getNumber();
86-
int actualTypeFieldNumber = field.getValue().getValCase().getNumber();
87-
if (expectedTypeFieldNumber != actualTypeFieldNumber) {
74+
String featureSetId = String.format("%s:%d", getFeatureSetName(), getFeatureSetVersion());
75+
if (featureRow.getFeatureSet().equals(featureSetId)) {
76+
77+
for (FieldProto.Field field : featureRow.getFieldsList()) {
78+
if (!getFieldByName().containsKey(field.getName())) {
8879
error =
8980
String.format(
90-
"FeatureRow contains field '%s' with invalid type '%s'. Feast expects the field type to match that in FeatureSet '%s'. Please check the FeatureRow data.",
91-
field.getName(),
92-
field.getValue().getValCase(),
93-
getFieldByName().get(field.getName()).getType());
81+
"FeatureRow contains field '%s' which do not exists in FeatureSet '%s' version '%d'. Please check the FeatureRow data.",
82+
field.getName(), getFeatureSetName(), getFeatureSetVersion());
9483
break;
9584
}
85+
// If value is set in the FeatureRow, make sure the value type matches
86+
// that defined in FeatureSetSpec
87+
if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) {
88+
int expectedTypeFieldNumber =
89+
getFieldByName().get(field.getName()).getType().getNumber();
90+
int actualTypeFieldNumber = field.getValue().getValCase().getNumber();
91+
if (expectedTypeFieldNumber != actualTypeFieldNumber) {
92+
error =
93+
String.format(
94+
"FeatureRow contains field '%s' with invalid type '%s'. Feast expects the field type to match that in FeatureSet '%s'. Please check the FeatureRow data.",
95+
field.getName(),
96+
field.getValue().getValCase(),
97+
getFieldByName().get(field.getName()).getType());
98+
break;
99+
}
100+
}
96101
}
102+
} else {
103+
error = String.format(
104+
"FeatureRow contains invalid feature set id %s. Please check that the feature rows are being published to the correct topic on the feature stream.",
105+
featureSetId);
97106
}
98107

99108
if (error != null) {

ingestion/src/test/java/feast/ingestion/transform/fn/KafkaRecordToFeatureRowDoFnTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void shouldOutputInvalidRowsWithFailureTag() {
5050
Values<KafkaRecord<byte[], byte[]>> featureRows = Create
5151
.of(kafkaRecordOf(dummyFeatureRow("invalid:1", "field1", "field2")), // invalid featureset name
5252
kafkaRecordOf(frWithStringVal), // invalid field type
53-
kafkaRecordOf(dummyFeatureRow("valid:1", "field1"))) // invalid fields
53+
kafkaRecordOf(dummyFeatureRow("valid:1", "field1", "field2", "field3"))) // invalid fields
5454
.withCoder(KafkaRecordCoder.of(ByteArrayCoder.of(), ByteArrayCoder.of()));
5555

5656
HashMap<String, Field> fieldByName = new HashMap<>();
@@ -66,10 +66,10 @@ public void shouldOutputInvalidRowsWithFailureTag() {
6666
.setFeatureSetName("valid")
6767
.setFeatureSetVersion(1)
6868
.build()).withOutputTags(FEATURE_ROW_OUT, TupleTagList.of(DEADLETTER_OUT)));
69-
p.run();
7069

7170
PAssert.that(output.get(FEATURE_ROW_OUT)).empty();
7271
PAssert.thatSingleton(output.get(DEADLETTER_OUT).apply(Count.globally())).isEqualTo(3L);
72+
p.run();
7373
}
7474

7575
@Test
@@ -93,10 +93,10 @@ public void shouldOutputValidRowsWithSuccessTag() {
9393
.setFeatureSetVersion(1)
9494
.build()).withOutputTags(FEATURE_ROW_OUT, TupleTagList.of(DEADLETTER_OUT)));
9595

96-
p.run();
9796

9897
PAssert.that(output.get(DEADLETTER_OUT)).empty();
9998
PAssert.thatSingleton(output.get(FEATURE_ROW_OUT).apply(Count.globally())).isEqualTo(3L);
99+
p.run();
100100
}
101101

102102
private FeatureRow dummyFeatureRow(String featureSet, String... fieldNames) {

0 commit comments

Comments
 (0)