Skip to content

Commit 29071f1

Browse files
authored
KE-908 Store dead-letter messages (feast-dev#68)
Closes KE-908 Store dead-letter messages (not passing validation) as Delta onto DBFS.
1 parent 86e8a6b commit 29071f1

10 files changed

Lines changed: 135 additions & 30 deletions

File tree

core/src/main/java/feast/core/job/databricks/DatabricksJobManager.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public class DatabricksJobManager implements JobManager {
5757
private final String checkpointLocation;
5858
private final String jarFile;
5959
private final DatabricksRunnerConfigOptions.DatabricksNewClusterOptions newClusterConfigOptions;
60+
private final String deadLetterPath;
6061
private final HttpClient httpClient;
6162
private static final ObjectMapper mapper = ObjectMapperFactory.createObjectMapper();
6263
private final int timeoutSeconds;
@@ -69,6 +70,7 @@ public DatabricksJobManager(
6970
this.checkpointLocation = runnerConfigOptions.getCheckpointLocation();
7071
this.httpClient = httpClient;
7172
this.newClusterConfigOptions = runnerConfigOptions.getNewCluster();
73+
this.deadLetterPath = runnerConfigOptions.getDeadLetterPath();
7274
this.jarFile = runnerConfigOptions.getJarFile();
7375
this.timeoutSeconds = runnerConfigOptions.getTimeoutSeconds();
7476
}
@@ -222,7 +224,12 @@ private long createDatabricksRun(Job job) {
222224

223225
List<String> params =
224226
Arrays.asList(
225-
job.getId(), checkpointLocation, defaultFeastProject, featureSetsJson, storesJson);
227+
job.getId(),
228+
checkpointLocation,
229+
defaultFeastProject,
230+
deadLetterPath,
231+
featureSetsJson,
232+
storesJson);
226233
RunsSubmitRequest runRequest = getJobRequest(jobName, params);
227234

228235
try {

core/src/main/resources/application.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,14 @@ feast:
6161
host: databricks_host
6262
# Token for authentication
6363
token: ${DATABRICKS_TOKEN}
64-
# Path to store Spark job checkpoints
64+
# Path to store Spark job checkpoints
6565
checkpointLocation: dbfs:/checkpoints/feast
6666
# Path to the Jar file on the databricks file system
6767
jarFile: jar_file
6868
# Timeout in seconds for Databricks jobs, or -1 for unlimited
6969
timeoutSeconds: -1
70+
# Path to store dead letter data
71+
deadLetterPath: dbfs:/feast/deadletter
7072
newCluster:
7173
# The platform version for scala and spark (e.g. 6.5.x-scala2.11)
7274
sparkVersion: your_spark_version

infra/docker-compose/core/databricks.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ feast:
1010
host: http://databricks-emulator:8080
1111
token: unused
1212
checkpointLocation: /tmp/checkpoints/feast
13+
deadLetterPath: dbfs:/feast/deadletter
1314
jarFile: /opt/sparkjars/spark-ingestion-job.jar
1415
timeoutSeconds: -1
1516
newCluster:

infra/terraform/app/main.tf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ feast-core:
176176
host: "${var.databricks_workspace_url}"
177177
token: "${databricks_token.feast.token_value}"
178178
checkpointLocation: dbfs:/checkpoints/feast
179+
deadLetterPath: dbfs:/feast/deadletter
179180
jarFile: "${local.databricks_dbfs_jar_folder}/sparkjars/spark-ingestion-job.jar"
180181
timeoutSeconds: 1200
181182
newCluster:

protos/feast/core/Runner.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,4 +103,6 @@ message DatabricksRunnerConfigOptions {
103103
string checkpointLocation = 5;
104104

105105
DatabricksNewClusterOptions newCluster = 6;
106+
107+
string deadLetterPath = 7;
106108
}

spark/spark-ingestion-job/src/main/java/feast/ingestion/transform/ProcessAndValidateFeatureRows.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,10 @@
1616
*/
1717
package feast.ingestion.transform;
1818

19-
import feast.ingestion.enums.ValidationStatus;
2019
import feast.ingestion.transform.fn.ProcessFeatureRowDoFn;
2120
import feast.ingestion.transform.fn.ValidateFeatureRowDoFn;
2221
import feast.ingestion.values.FeatureSet;
23-
import feast.proto.types.FeatureRowProto;
22+
import feast.proto.types.FeatureRowProto.FeatureRow;
2423
import feast.spark.ingestion.RowWithValidationResult;
2524
import java.util.HashMap;
2625
import org.apache.spark.sql.Dataset;
@@ -35,7 +34,7 @@ public ProcessAndValidateFeatureRows(String defaultFeastProject) {
3534
this.defaultFeastProject = defaultFeastProject;
3635
}
3736

38-
public Dataset<byte[]> processDataset(
37+
public Dataset<RowWithValidationResult> processDataset(
3938
Dataset<Row> input, HashMap<String, FeatureSet> featureSets) {
4039
ValidateFeatureRowDoFn validFeat = new ValidateFeatureRowDoFn(featureSets);
4140

@@ -46,20 +45,11 @@ public Dataset<byte[]> processDataset(
4645
.select("value")
4746
.map(
4847
r -> {
49-
return validFeat.validateElement((byte[]) r.getAs(0));
48+
FeatureRow featureRow = FeatureRow.parseFrom((byte[]) r.getAs(0));
49+
FeatureRow el = procFeat.processElement(featureRow);
50+
return validFeat.validateElement(el);
5051
},
5152
Encoders.kryo(RowWithValidationResult.class));
52-
53-
Dataset<RowWithValidationResult> validRows =
54-
rowsWithValidationResult.filter(
55-
row -> row.getValidationStatus().equals(ValidationStatus.SUCCESS));
56-
57-
return validRows.map(
58-
r -> {
59-
FeatureRowProto.FeatureRow featureRow =
60-
FeatureRowProto.FeatureRow.parseFrom(r.getFeatureRow());
61-
return procFeat.processElement(featureRow).toByteArray();
62-
},
63-
Encoders.BINARY());
53+
return rowsWithValidationResult;
6454
}
6555
}

spark/spark-ingestion-job/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@ public ValidateFeatureRowDoFn(HashMap<String, FeatureSet> featureSets) {
3838
this.featureSets = featureSets;
3939
}
4040

41-
public RowWithValidationResult validateElement(byte[] featureRowBytes)
41+
public RowWithValidationResult validateElement(FeatureRow featureRow)
4242
throws InvalidProtocolBufferException {
4343
// TODO: Abstract duplicated validation logic into shared module.
4444
String error = null;
45-
FeatureRow featureRow = FeatureRow.parseFrom(featureRowBytes);
45+
byte[] featureRowBytes = featureRow.toByteArray();
4646
FeatureSet featureSet = featureSets.get(featureRow.getFeatureSet());
4747
List<FieldProto.Field> fields = new ArrayList<>();
4848
if (featureSet != null) {

spark/spark-ingestion-job/src/main/java/feast/spark/ingestion/SparkIngestion.java

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static feast.ingestion.utils.SpecUtil.getFeatureSetReference;
2020

2121
import com.google.protobuf.InvalidProtocolBufferException;
22+
import feast.ingestion.enums.ValidationStatus;
2223
import feast.ingestion.transform.ProcessAndValidateFeatureRows;
2324
import feast.ingestion.transform.ReadFromSource;
2425
import feast.ingestion.utils.SpecUtil;
@@ -28,11 +29,15 @@
2829
import feast.proto.core.StoreProto.Store;
2930
import feast.spark.ingestion.delta.SparkDeltaSink;
3031
import feast.spark.ingestion.redis.SparkRedisSink;
32+
import feast.storage.api.writer.FailedElement;
3133
import java.util.*;
3234
import java.util.stream.Collectors;
3335
import org.apache.spark.api.java.function.VoidFunction2;
3436
import org.apache.spark.sql.*;
37+
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
3538
import org.apache.spark.sql.streaming.StreamingQuery;
39+
import org.apache.spark.sql.types.DataTypes;
40+
import org.apache.spark.sql.types.StructType;
3641
import org.slf4j.Logger;
3742
import org.slf4j.LoggerFactory;
3843

@@ -44,8 +49,21 @@ public class SparkIngestion {
4449
private final String jobId;
4550
private final String checkpointLocation;
4651
private final String defaultFeastProject;
52+
private final String deadLetterPath;
4753
private final List<FeatureSet> featureSets;
4854
private final List<Store> stores;
55+
private static final StructType deadLetterType;
56+
57+
static {
58+
StructType schema = new StructType();
59+
schema = schema.add("timestamp", DataTypes.TimestampType, false);
60+
schema = schema.add("job_name", DataTypes.StringType, true);
61+
schema = schema.add("transform_name", DataTypes.StringType, true);
62+
schema = schema.add("payload", DataTypes.StringType, true);
63+
schema = schema.add("error_message", DataTypes.StringType, true);
64+
schema = schema.add("stack_trace", DataTypes.StringType, true);
65+
deadLetterType = schema;
66+
}
4967

5068
/**
5169
* Run a Spark ingestion job.
@@ -70,7 +88,7 @@ public static void main(String[] args) throws Exception {
7088
* @throws InvalidProtocolBufferException
7189
*/
7290
public SparkIngestion(String[] args) throws InvalidProtocolBufferException {
73-
int numArgs = 5;
91+
int numArgs = 6;
7492
if (args.length != numArgs) {
7593
throw new IllegalArgumentException("Expecting " + numArgs + " arguments");
7694
}
@@ -79,6 +97,7 @@ public SparkIngestion(String[] args) throws InvalidProtocolBufferException {
7997
jobId = args[index++];
8098
checkpointLocation = args[index++];
8199
defaultFeastProject = args[index++];
100+
deadLetterPath = args[index++];
82101
String featureSetSpecsJson = args[index++];
83102
String storesJson = args[index++];
84103

@@ -182,7 +201,7 @@ public StreamingQuery createQuery() {
182201
ProcessAndValidateFeatureRows processAndValidateFeatureRows =
183202
new ProcessAndValidateFeatureRows(defaultFeastProject);
184203

185-
Dataset<byte[]> processedRows =
204+
Dataset<RowWithValidationResult> processedRows =
186205
processAndValidateFeatureRows.processDataset(input, featureSets);
187206

188207
// Start running the query that writes the data to sink
@@ -195,17 +214,50 @@ public StreamingQuery createQuery() {
195214
.foreachBatch(
196215
(batchDF, batchId) -> {
197216
batchDF.persist();
217+
Dataset<byte[]> validRows =
218+
batchDF
219+
.filter(row -> row.getValidationStatus().equals(ValidationStatus.SUCCESS))
220+
.map(RowWithValidationResult::getFeatureRow, Encoders.BINARY());
221+
222+
validRows.persist();
198223
consumerSinks.forEach(
199224
c -> {
200225
try {
201-
c.call(batchDF, batchId);
226+
c.call(validRows, batchId);
202227
} catch (Exception e) {
203228
log.error("Error invoking sink", e);
204229
throw new RuntimeException(e);
205230
}
206231
});
232+
validRows.unpersist();
233+
234+
storeDeadLetter(batchDF);
235+
207236
batchDF.unpersist();
208237
})
209238
.start();
210239
}
240+
241+
private void storeDeadLetter(Dataset<RowWithValidationResult> batchDF) {
242+
Dataset<RowWithValidationResult> invalidRows =
243+
batchDF.filter(row -> row.getValidationStatus().equals(ValidationStatus.FAILURE));
244+
245+
invalidRows
246+
.map(
247+
e -> {
248+
FailedElement element = e.getFailedElement();
249+
return RowFactory.create(
250+
new java.sql.Timestamp(element.getTimestamp().getMillis()),
251+
element.getJobName(),
252+
element.getTransformName(),
253+
element.getPayload(),
254+
element.getErrorMessage(),
255+
element.getStackTrace());
256+
},
257+
RowEncoder.apply(deadLetterType))
258+
.write()
259+
.format("delta")
260+
.mode("append")
261+
.save(deadLetterPath);
262+
}
211263
}

spark/spark-ingestion-job/src/test/java/feast/spark/ingestion/SparkIngestionTest.java

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ public class SparkIngestionTest {
107107

108108
@Rule public TemporaryFolder checkpointFolder = new TemporaryFolder();
109109

110+
@Rule public TemporaryFolder deadLetterFolder = new TemporaryFolder();
111+
110112
@Rule public final SparkSessionRule spark = new SparkSessionRule();
111113

112114
@BeforeClass
@@ -146,11 +148,20 @@ public void streamingQueryShouldWriteKafkaPayloadAsDeltaLakeAndRedis() throws Ex
146148
FeatureSet featureSetForDelta = TestUtil.createFeatureSetForDelta(kafka);
147149
FeatureSetSpec specForRedis = featureSetForRedis.getSpec();
148150
FeatureSetSpec specForDelta = featureSetForDelta.getSpec();
151+
FeatureSetSpec invalidSpec =
152+
FeatureSetSpec.newBuilder(specForDelta).setProject("invalid_project").build();
149153

150154
List<FeatureRow> inputForRedis =
151155
TestUtil.generateTestData(specForRedis, IMPORT_JOB_SAMPLE_FEATURE_ROW_SIZE);
152156
List<FeatureRow> inputForDelta =
153157
TestUtil.generateTestData(specForDelta, IMPORT_JOB_SAMPLE_FEATURE_ROW_SIZE);
158+
List<FeatureRow> invalidInput =
159+
TestUtil.generateTestData(invalidSpec, IMPORT_JOB_SAMPLE_FEATURE_ROW_SIZE);
160+
List<FeatureRow> allInputs =
161+
Stream.concat(
162+
Stream.concat(inputForRedis.stream(), invalidInput.stream()),
163+
inputForDelta.stream())
164+
.collect(Collectors.toList());
154165

155166
LOGGER.info("Starting Import Job");
156167

@@ -173,22 +184,26 @@ public void streamingQueryShouldWriteKafkaPayloadAsDeltaLakeAndRedis() throws Ex
173184
Dataset<Row> data = null;
174185

175186
String checkpointDir = checkpointFolder.getRoot().getAbsolutePath();
187+
String deadLetterDir = deadLetterFolder.getRoot().getAbsolutePath();
176188

177189
SparkIngestion ingestion =
178190
new SparkIngestion(
179191
new String[] {
180-
TEST_JOB_ID, checkpointDir, "myDefaultFeastProject", featureSetsJson, storesJson
192+
TEST_JOB_ID,
193+
checkpointDir,
194+
"myDefaultFeastProject",
195+
deadLetterDir,
196+
featureSetsJson,
197+
storesJson
181198
});
182199

183200
StreamingQuery query = ingestion.createQuery();
184201

185-
LOGGER.info(
186-
"Publishing {} Feature Row messages to Kafka ...",
187-
inputForRedis.size() + inputForDelta.size());
202+
LOGGER.info("Publishing {} Feature Row messages to Kafka ...", allInputs.size());
188203
TestUtil.publishFeatureRowsToKafka(
189204
KAFKA_BOOTSTRAP_SERVERS,
190205
KAFKA_TOPIC,
191-
Stream.concat(inputForRedis.stream(), inputForDelta.stream()).collect(Collectors.toList()),
206+
allInputs,
192207
ByteArraySerializer.class,
193208
KAFKA_PUBLISH_TIMEOUT_SEC);
194209

@@ -219,6 +234,8 @@ public void streamingQueryShouldWriteKafkaPayloadAsDeltaLakeAndRedis() throws Ex
219234
TestUtil.validateRedis(featureSetForRedis, inputForRedis, redisConfig, TEST_JOB_ID);
220235

221236
validateDelta(featureSetForDelta, inputForDelta, data);
237+
238+
validateDeadLetter(invalidInput);
222239
}
223240

224241
private <T extends MessageOrBuilder> String toJsonLines(Collection<T> items) {
@@ -254,6 +271,39 @@ public static void validateDelta(
254271
assertEquals(new HashSet<>(input), delta);
255272
}
256273

274+
private void validateDeadLetter(List<FeatureRow> invalidInput) throws Exception {
275+
String deadLetterDir = deadLetterFolder.getRoot().getAbsolutePath();
276+
for (int i = 0; i < 60; i++) {
277+
278+
Dataset<Row> data = spark.session.read().format("delta").load(deadLetterDir.toString());
279+
long count = data.count();
280+
assertThat(count, is((long) IMPORT_JOB_SAMPLE_FEATURE_ROW_SIZE));
281+
Row f = data.first();
282+
if (f.length() > 0) {
283+
break;
284+
} else {
285+
LOGGER.info("Delta directory not yet created.");
286+
}
287+
Thread.sleep(1000);
288+
}
289+
290+
Dataset<Row> data = spark.session.read().format("delta").load(deadLetterDir.toString());
291+
long count = data.count();
292+
assertThat(count, is((long) IMPORT_JOB_SAMPLE_FEATURE_ROW_SIZE));
293+
Row f = data.first();
294+
assertThat(f.length(), is(6));
295+
int i = 0;
296+
assertThat("timestamp", f.get(i++), instanceOf(java.sql.Timestamp.class));
297+
assertThat("jobName", (String) f.getAs(i++), equalTo(""));
298+
assertThat("transformName", (String) f.getAs(i++), is("ValidateFeatureRow"));
299+
assertThat("payload", (String) f.getAs(i++), startsWith("fields"));
300+
assertThat(
301+
"errorMessage",
302+
(String) f.getAs(i++),
303+
containsString("FeatureRow contains invalid feature set id"));
304+
assertThat("stackTrace", (String) f.getAs(i++), equalTo(null));
305+
}
306+
257307
public static FeatureRow sparkRowToFeatureRow(FeatureSetSpec featureSetSpec, Row row) {
258308
java.sql.Timestamp ts = row.getAs(FeatureRowToSparkRow.EVENT_TIMESTAMP_COLUMN);
259309
Builder builder =

spark/spark-ingestion-job/src/test/java/feast/spark/ingestion/transform/fn/ValidateFeatureRowDoFnTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public void shouldOutputFailedElementOnFailedValidation() throws Exception {
7777

7878
ValidateFeatureRowDoFn validFeat = new ValidateFeatureRowDoFn(featureSets);
7979

80-
RowWithValidationResult result = validFeat.validateElement(invalidRow.toByteArray());
80+
RowWithValidationResult result = validFeat.validateElement(invalidRow);
8181

8282
assertThat(result.getValidationStatus(), equalTo(ValidationStatus.FAILURE));
8383
}
@@ -124,7 +124,7 @@ public void shouldOutputSuccessStatusOnSuccessfulValidation() throws Exception {
124124

125125
FeatureRowProto.FeatureRow randomRow = TestUtil.createRandomFeatureRow(fs1);
126126

127-
RowWithValidationResult result = validFeat.validateElement(randomRow.toByteArray());
127+
RowWithValidationResult result = validFeat.validateElement(randomRow);
128128

129129
assertThat(result.getValidationStatus(), equalTo(ValidationStatus.SUCCESS));
130130
}

0 commit comments

Comments
 (0)