|
15 | 15 | import org.apache.commons.lang3.exception.ExceptionUtils; |
16 | 16 |
|
17 | 17 | @AutoValue |
18 | | -public abstract class KafkaRecordToFeatureRowDoFn extends DoFn<KafkaRecord<byte[], byte[]>, FeatureRow> { |
| 18 | +public abstract class KafkaRecordToFeatureRowDoFn extends |
| 19 | + DoFn<KafkaRecord<byte[], byte[]>, FeatureRow> { |
19 | 20 |
|
20 | 21 | public abstract String getFeatureSetName(); |
21 | 22 |
|
@@ -70,30 +71,38 @@ public void processElement(ProcessContext context) { |
70 | 71 | // If FeatureRow contains field names that do not exist as EntitySpec |
71 | 72 | // or FeatureSpec in FeatureSetSpec, mark the FeatureRow as FailedElement. |
72 | 73 | String error = null; |
73 | | - for (FieldProto.Field field : featureRow.getFieldsList()) { |
74 | | - if (!getFieldByName().containsKey(field.getName())) { |
75 | | - error = |
76 | | - String.format( |
77 | | - "FeatureRow contains field '%s' which do not exists in FeatureSet '%s' version '%d'. Please check the FeatureRow data.", |
78 | | - field.getName(), getFeatureSetName(), getFeatureSetVersion()); |
79 | | - break; |
80 | | - } |
81 | | - // If value is set in the FeatureRow, make sure the value type matches |
82 | | - // that defined in FeatureSetSpec |
83 | | - if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) { |
84 | | - int expectedTypeFieldNumber = |
85 | | - getFieldByName().get(field.getName()).getType().getNumber(); |
86 | | - int actualTypeFieldNumber = field.getValue().getValCase().getNumber(); |
87 | | - if (expectedTypeFieldNumber != actualTypeFieldNumber) { |
| 74 | + String featureSetId = String.format("%s:%d", getFeatureSetName(), getFeatureSetVersion()); |
| 75 | + if (featureRow.getFeatureSet().equals(featureSetId)) { |
| 76 | + |
| 77 | + for (FieldProto.Field field : featureRow.getFieldsList()) { |
| 78 | + if (!getFieldByName().containsKey(field.getName())) { |
88 | 79 | error = |
89 | 80 | String.format( |
90 | | - "FeatureRow contains field '%s' with invalid type '%s'. Feast expects the field type to match that in FeatureSet '%s'. Please check the FeatureRow data.", |
91 | | - field.getName(), |
92 | | - field.getValue().getValCase(), |
93 | | - getFieldByName().get(field.getName()).getType()); |
| 81 | + "FeatureRow contains field '%s' which do not exists in FeatureSet '%s' version '%d'. Please check the FeatureRow data.", |
| 82 | + field.getName(), getFeatureSetName(), getFeatureSetVersion()); |
94 | 83 | break; |
95 | 84 | } |
| 85 | + // If value is set in the FeatureRow, make sure the value type matches |
| 86 | + // that defined in FeatureSetSpec |
| 87 | + if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) { |
| 88 | + int expectedTypeFieldNumber = |
| 89 | + getFieldByName().get(field.getName()).getType().getNumber(); |
| 90 | + int actualTypeFieldNumber = field.getValue().getValCase().getNumber(); |
| 91 | + if (expectedTypeFieldNumber != actualTypeFieldNumber) { |
| 92 | + error = |
| 93 | + String.format( |
| 94 | + "FeatureRow contains field '%s' with invalid type '%s'. Feast expects the field type to match that in FeatureSet '%s'. Please check the FeatureRow data.", |
| 95 | + field.getName(), |
| 96 | + field.getValue().getValCase(), |
| 97 | + getFieldByName().get(field.getName()).getType()); |
| 98 | + break; |
| 99 | + } |
| 100 | + } |
96 | 101 | } |
| 102 | + } else { |
| 103 | + error = String.format( |
| 104 | + "FeatureRow contains invalid feature set id %s. Please check that the feature rows are being published to the correct topic on the feature stream.", |
| 105 | + featureSetId); |
97 | 106 | } |
98 | 107 |
|
99 | 108 | if (error != null) { |
|
0 commit comments