Skip to content

Commit 4874725

Browse files
author
Chen Zhiling
committed
Change pipeline to use DeadletterSink API (#586)
1 parent d999a5a commit 4874725

2 files changed

Lines changed: 15 additions & 21 deletions

File tree

ingestion/src/main/java/feast/ingestion/ImportJob.java

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@
2929
import feast.ingestion.options.StringListStreamConverter;
3030
import feast.ingestion.transform.ReadFromSource;
3131
import feast.ingestion.transform.ValidateFeatureRows;
32-
import feast.ingestion.transform.WriteFailedElementToBigQuery;
3332
import feast.ingestion.transform.metrics.WriteFailureMetricsTransform;
3433
import feast.ingestion.transform.metrics.WriteSuccessMetricsTransform;
35-
import feast.ingestion.utils.ResourceUtil;
3634
import feast.ingestion.utils.SpecUtil;
35+
import feast.storage.api.write.DeadletterSink;
3736
import feast.storage.api.write.FailedElement;
3837
import feast.storage.api.write.FeatureSink;
3938
import feast.storage.api.write.WriteResult;
39+
import feast.storage.connectors.bigquery.write.BigQueryDeadletterSink;
4040
import feast.types.FeatureRowProto.FeatureRow;
4141
import java.io.IOException;
4242
import java.util.HashMap;
@@ -141,32 +141,21 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
141141

142142
// Step 4. Write FailedElements to a dead letter table in BigQuery.
143143
if (options.getDeadLetterTableSpec() != null) {
144+
// TODO: make deadletter destination type configurable
145+
DeadletterSink deadletterSink =
146+
new BigQueryDeadletterSink(options.getDeadLetterTableSpec());
147+
144148
convertedFeatureRows
145149
.get(DEADLETTER_OUT)
146-
.apply(
147-
"WriteFailedElements_ReadFromSource",
148-
WriteFailedElementToBigQuery.newBuilder()
149-
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
150-
.setTableSpec(options.getDeadLetterTableSpec())
151-
.build());
150+
.apply("WriteFailedElements_ReadFromSource", deadletterSink.write());
152151

153152
validatedRows
154153
.get(DEADLETTER_OUT)
155-
.apply(
156-
"WriteFailedElements_ValidateRows",
157-
WriteFailedElementToBigQuery.newBuilder()
158-
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
159-
.setTableSpec(options.getDeadLetterTableSpec())
160-
.build());
154+
.apply("WriteFailedElements_ValidateRows", deadletterSink.write());
161155

162156
writeFeatureRows
163157
.getFailedInserts()
164-
.apply(
165-
"WriteFailedElements_WriteFeatureRowToStore",
166-
WriteFailedElementToBigQuery.newBuilder()
167-
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
168-
.setTableSpec(options.getDeadLetterTableSpec())
169-
.build());
158+
.apply("WriteFailedElements_WriteFeatureRowToStore", deadletterSink.write());
170159
}
171160

172161
// Step 5. Write metrics to a metrics sink.

storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/write/BigQueryDeadletterSink.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package feast.storage.connectors.bigquery.write;
1818

1919
import com.google.api.services.bigquery.model.TableRow;
20+
import com.google.api.services.bigquery.model.TimePartitioning;
2021
import com.google.auto.value.AutoValue;
2122
import com.google.common.io.Resources;
2223
import feast.storage.api.write.DeadletterSink;
@@ -36,6 +37,7 @@ public class BigQueryDeadletterSink implements DeadletterSink {
3637

3738
private static final String DEADLETTER_SCHEMA_FILE_PATH = "schemas/deadletter_table_schema.json";
3839
private static final Logger log = org.slf4j.LoggerFactory.getLogger(BigQueryDeadletterSink.class);
40+
private static final String TIMESTAMP_COLUMN = "timestamp";
3941

4042
private final String tableSpec;
4143
private String jsonSchema;
@@ -97,13 +99,16 @@ public abstract static class Builder {
9799

98100
@Override
99101
public PDone expand(PCollection<FailedElement> input) {
102+
TimePartitioning partition = new TimePartitioning().setType("DAY");
103+
partition.setField(TIMESTAMP_COLUMN);
100104
input
101105
.apply("FailedElementToTableRow", ParDo.of(new FailedElementToTableRowFn()))
102106
.apply(
103107
"WriteFailedElementsToBigQuery",
104108
BigQueryIO.writeTableRows()
105109
.to(getTableSpec())
106110
.withJsonSchema(getJsonSchema())
111+
.withTimePartitioning(partition)
107112
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
108113
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
109114
return PDone.in(input.getPipeline());
@@ -116,7 +121,7 @@ public void processElement(ProcessContext context) {
116121
final FailedElement element = context.element();
117122
final TableRow tableRow =
118123
new TableRow()
119-
.set("timestamp", element.getTimestamp().toString())
124+
.set(TIMESTAMP_COLUMN, element.getTimestamp().toString())
120125
.set("job_name", element.getJobName())
121126
.set("transform_name", element.getTransformName())
122127
.set("payload", element.getPayload())

0 commit comments

Comments
 (0)