|
29 | 29 | import feast.ingestion.options.StringListStreamConverter; |
30 | 30 | import feast.ingestion.transform.ReadFromSource; |
31 | 31 | import feast.ingestion.transform.ValidateFeatureRows; |
32 | | -import feast.ingestion.transform.WriteFailedElementToBigQuery; |
33 | 32 | import feast.ingestion.transform.metrics.WriteFailureMetricsTransform; |
34 | 33 | import feast.ingestion.transform.metrics.WriteSuccessMetricsTransform; |
35 | | -import feast.ingestion.utils.ResourceUtil; |
36 | 34 | import feast.ingestion.utils.SpecUtil; |
| 35 | +import feast.storage.api.write.DeadletterSink; |
37 | 36 | import feast.storage.api.write.FailedElement; |
38 | 37 | import feast.storage.api.write.FeatureSink; |
39 | 38 | import feast.storage.api.write.WriteResult; |
| 39 | +import feast.storage.connectors.bigquery.write.BigQueryDeadletterSink; |
40 | 40 | import feast.types.FeatureRowProto.FeatureRow; |
41 | 41 | import java.io.IOException; |
42 | 42 | import java.util.HashMap; |
@@ -141,32 +141,21 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti |
141 | 141 |
|
142 | 142 | // Step 4. Write FailedElements to a dead letter table in BigQuery. |
143 | 143 | if (options.getDeadLetterTableSpec() != null) { |
| 144 | + // TODO: make deadletter destination type configurable |
| 145 | + DeadletterSink deadletterSink = |
| 146 | + new BigQueryDeadletterSink(options.getDeadLetterTableSpec()); |
| 147 | + |
144 | 148 | convertedFeatureRows |
145 | 149 | .get(DEADLETTER_OUT) |
146 | | - .apply( |
147 | | - "WriteFailedElements_ReadFromSource", |
148 | | - WriteFailedElementToBigQuery.newBuilder() |
149 | | - .setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson()) |
150 | | - .setTableSpec(options.getDeadLetterTableSpec()) |
151 | | - .build()); |
| 150 | + .apply("WriteFailedElements_ReadFromSource", deadletterSink.write()); |
152 | 151 |
|
153 | 152 | validatedRows |
154 | 153 | .get(DEADLETTER_OUT) |
155 | | - .apply( |
156 | | - "WriteFailedElements_ValidateRows", |
157 | | - WriteFailedElementToBigQuery.newBuilder() |
158 | | - .setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson()) |
159 | | - .setTableSpec(options.getDeadLetterTableSpec()) |
160 | | - .build()); |
| 154 | + .apply("WriteFailedElements_ValidateRows", deadletterSink.write()); |
161 | 155 |
|
162 | 156 | writeFeatureRows |
163 | 157 | .getFailedInserts() |
164 | | - .apply( |
165 | | - "WriteFailedElements_WriteFeatureRowToStore", |
166 | | - WriteFailedElementToBigQuery.newBuilder() |
167 | | - .setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson()) |
168 | | - .setTableSpec(options.getDeadLetterTableSpec()) |
169 | | - .build()); |
| 158 | + .apply("WriteFailedElements_WriteFeatureRowToStore", deadletterSink.write()); |
170 | 159 | } |
171 | 160 |
|
172 | 161 | // Step 5. Write metrics to a metrics sink. |
|
0 commit comments