Skip to content

Commit a0111bb

Browse files
pradithyafeast-ci-bot
authored andcommitted
Prevent throwing RuntimeException when invalid proto is received (#166)
1 parent 5b6c7db commit a0111bb

1 file changed

Lines changed: 19 additions & 13 deletions

File tree

ingestion/src/main/java/feast/store/errors/json/JsonFileErrorsWrite.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,24 @@ public class JsonFileErrorsWrite extends FeatureStoreWrite {
3939

4040
@Override
4141
public PDone expand(PCollection<FeatureRowExtended> input) {
42-
return input.apply("Map to strings", MapElements.into(kvs(strings(), strings())).via(
43-
(rowExtended) -> {
44-
try {
45-
return KV.of(
46-
rowExtended.getRow().getEntityName(),
47-
JsonFormat.printer().omittingInsignificantWhitespace()
48-
.print(rowExtended)
49-
);
50-
} catch (InvalidProtocolBufferException e) {
51-
throw new RuntimeException(e);
52-
}
53-
}
54-
)).apply("Write Error Json Files", new TextFileDynamicIO.Write(options, ".json"));
42+
return input
43+
.apply(
44+
"Map to strings",
45+
MapElements.into(kvs(strings(), strings()))
46+
.via(
47+
(rowExtended) -> {
48+
try {
49+
return KV.of(
50+
rowExtended.getRow().getEntityName(),
51+
JsonFormat.printer()
52+
.omittingInsignificantWhitespace()
53+
.print(rowExtended));
54+
} catch (InvalidProtocolBufferException e) {
55+
return KV.of(
56+
rowExtended.getRow().getEntityName(),
57+
e.toString());
58+
}
59+
}))
60+
.apply("Write Error Json Files", new TextFileDynamicIO.Write(options, ".json"));
5561
}
5662
}

0 commit comments

Comments
 (0)