Skip to content

Commit 2327b29

Browse files
Oleksii Moskalenkowoop
andauthored
BQ sink produces sample of successful inserts (#875)
* feature row batch produces sample * lint Co-authored-by: Willem Pienaar <git@willem.co>
1 parent 558453a commit 2327b29

3 files changed

Lines changed: 97 additions & 39 deletions

File tree

storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/compression/FeatureRowsBatch.java

Lines changed: 48 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -207,48 +207,58 @@ public static FeatureRowsBatch fromRow(Row row) {
207207
return new FeatureRowsBatch(row.getSchema(), row.getValues());
208208
}
209209

210-
public Iterator<FeatureRowProto.FeatureRow> getFeatureRows() {
210+
private FeatureRowProto.FeatureRow restoreFeatureRow(int rowIdx) {
211211
int timestampColumnIdx = schema.indexOf("eventTimestamp");
212212
int ingestionIdColumnIdx = schema.indexOf("ingestionId");
213213

214-
return IntStream.range(0, ((List<Object>) values.get(0)).size())
214+
return FeatureRowProto.FeatureRow.newBuilder()
215+
.setFeatureSet(getFeatureSetReference())
216+
.setEventTimestamp(
217+
Timestamp.newBuilder()
218+
.setSeconds((long) (((List<Object>) values.get(timestampColumnIdx)).get(rowIdx)))
219+
.build())
220+
.setIngestionId((String) (((List<Object>) values.get(ingestionIdColumnIdx)).get(rowIdx)))
221+
.addAllFields(
222+
schema.getFieldNames().stream()
223+
.map(
224+
fieldName -> {
225+
if (SERVICE_FIELDS.contains(fieldName)) {
226+
return null;
227+
}
228+
int fieldIdx = schema.indexOf(fieldName);
229+
230+
return FieldProto.Field.newBuilder()
231+
.setName(schema.getField(fieldIdx).getName())
232+
.setValue(
233+
objectToProtoValue(
234+
((List<Object>) values.get(fieldIdx)).get(rowIdx),
235+
schemaToProtoTypes.get(
236+
schema
237+
.getField(fieldIdx)
238+
.getType()
239+
.getCollectionElementType())))
240+
.build();
241+
})
242+
.filter(Objects::nonNull)
243+
.collect(Collectors.toList()))
244+
.build();
245+
}
246+
247+
public Iterator<FeatureRowProto.FeatureRow> getFeatureRows() {
248+
int featureCount = ((List<Object>) values.get(0)).size();
249+
250+
return IntStream.range(0, featureCount).parallel().mapToObj(this::restoreFeatureRow).iterator();
251+
}
252+
253+
public Iterator<FeatureRowProto.FeatureRow> getFeatureRowsSample(int maxCount) {
254+
int featureCount = ((List<Object>) values.get(0)).size();
255+
Random rd = new Random(42);
256+
257+
return IntStream.range(0, featureCount)
258+
.filter(idx -> rd.nextInt(featureCount) < maxCount)
215259
.parallel()
216-
.mapToObj(
217-
rowIdx ->
218-
FeatureRowProto.FeatureRow.newBuilder()
219-
.setFeatureSet(getFeatureSetReference())
220-
.setEventTimestamp(
221-
Timestamp.newBuilder()
222-
.setSeconds(
223-
(long)
224-
(((List<Object>) values.get(timestampColumnIdx)).get(rowIdx)))
225-
.build())
226-
.setIngestionId(
227-
(String) (((List<Object>) values.get(ingestionIdColumnIdx)).get(rowIdx)))
228-
.addAllFields(
229-
schema.getFieldNames().stream()
230-
.map(
231-
fieldName -> {
232-
if (SERVICE_FIELDS.contains(fieldName)) {
233-
return null;
234-
}
235-
int fieldIdx = schema.indexOf(fieldName);
236-
237-
return FieldProto.Field.newBuilder()
238-
.setName(schema.getField(fieldIdx).getName())
239-
.setValue(
240-
objectToProtoValue(
241-
((List<Object>) values.get(fieldIdx)).get(rowIdx),
242-
schemaToProtoTypes.get(
243-
schema
244-
.getField(fieldIdx)
245-
.getType()
246-
.getCollectionElementType())))
247-
.build();
248-
})
249-
.filter(Objects::nonNull)
250-
.collect(Collectors.toList()))
251-
.build())
260+
.mapToObj(this::restoreFeatureRow)
261+
.limit(maxCount)
252262
.iterator();
253263
}
254264

storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryWrite.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class BigQueryWrite extends PTransform<PCollection<FeatureRow>, WriteResu
5555
private static final Duration BIGQUERY_JOB_MAX_EXPECTING_RESULT_TIME = Duration.standardHours(1);
5656
private static final int BIGQUERY_MAX_JOB_RETRIES = 20;
5757
private static final int DEFAULT_COMPACTION_BATCH_SIZE = 10000;
58+
private static final int MAX_SUCCESSFUL_OUTPUTS_PER_DESTINATION = 1000;
5859

5960
private DatasetId destination;
6061
private PCollectionView<Map<String, Iterable<TableSchema>>> schemas;
@@ -63,6 +64,7 @@ public class BigQueryWrite extends PTransform<PCollection<FeatureRow>, WriteResu
6364
private Duration expectingResultTime = BIGQUERY_JOB_MAX_EXPECTING_RESULT_TIME;
6465
private BigQueryServices testServices;
6566
private int compactionBatchSize = DEFAULT_COMPACTION_BATCH_SIZE;
67+
private int maxSuccessfulOutputs = MAX_SUCCESSFUL_OUTPUTS_PER_DESTINATION;
6668

6769
public BigQueryWrite(
6870
DatasetId destination, PCollectionView<Map<String, Iterable<TableSchema>>> schemas) {
@@ -90,6 +92,11 @@ public BigQueryWrite withCompactionBatchSize(int batchSize) {
9092
return this;
9193
}
9294

95+
public BigQueryWrite withMaxSuccessfulOutputs(int maxSuccessfulOutputs) {
96+
this.maxSuccessfulOutputs = maxSuccessfulOutputs;
97+
return this;
98+
}
99+
93100
/**
94101
* BigQuery writer 1. choose destination based on featureSetName {@link
95102
* FeatureDynamicDestinations} 2. dynamically pull destination's schema from schemas' view 3.
@@ -225,7 +232,10 @@ public void process(ProcessContext c) {
225232

226233
result
227234
.getAll(inputTag)
228-
.forEach(rows -> rows.getFeatureRows().forEachRemaining(c::output));
235+
.forEach(
236+
rows ->
237+
rows.getFeatureRowsSample(maxSuccessfulOutputs)
238+
.forEachRemaining(c::output));
229239
}
230240
}));
231241
}

storage/connectors/bigquery/src/test/java/feast/storage/connectors/bigquery/writer/BigQuerySinkTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static feast.storage.connectors.bigquery.writer.FeatureSetSpecToTableSchema.*;
2222
import static org.hamcrest.CoreMatchers.*;
2323
import static org.hamcrest.Matchers.containsInAnyOrder;
24+
import static org.hamcrest.Matchers.lessThan;
2425
import static org.junit.Assert.*;
2526
import static org.mockito.Mockito.*;
2627
import static org.mockito.MockitoAnnotations.initMocks;
@@ -498,6 +499,30 @@ public void featureRowCompressShouldPackAndUnpackSuccessfully() {
498499
p.run();
499500
}
500501

502+
@Test
503+
public void featureRowBatchShouldSampleOnRestore() {
504+
List<FeatureRow> stream =
505+
IntStream.range(0, 1000)
506+
.mapToObj(i -> generateRow("project/fs"))
507+
.collect(Collectors.toList());
508+
509+
PCollection<Long> result =
510+
p.apply(Create.of(stream))
511+
.apply("KV", ParDo.of(new ExtractKV()))
512+
.apply(new CompactFeatureRows(1000))
513+
.apply(ParDo.of(new FlatMapWithSample(100)))
514+
.apply(Count.globally());
515+
516+
PAssert.that(result)
517+
.satisfies(
518+
r -> {
519+
// sample size is within bound of required size
520+
assertThat(Math.abs(r.iterator().next() - 100), lessThan(5L));
521+
return null;
522+
});
523+
p.run();
524+
}
525+
501526
private List<FeatureRow> dropNullFeature(List<FeatureRow> input) {
502527
return input.stream()
503528
.map(
@@ -549,4 +574,17 @@ public Table answer(InvocationOnMock invocationOnMock) throws Throwable {
549574
return FakeTable.create(mock(BigQuery.class), tableId, tableDefinition);
550575
}
551576
}
577+
578+
private static class FlatMapWithSample extends DoFn<KV<String, FeatureRowsBatch>, FeatureRow> {
579+
private int sampleSize;
580+
581+
FlatMapWithSample(int sampleSize) {
582+
this.sampleSize = sampleSize;
583+
}
584+
585+
@ProcessElement
586+
public void process(ProcessContext c) {
587+
c.element().getValue().getFeatureRowsSample(sampleSize).forEachRemaining(c::output);
588+
}
589+
}
552590
}

0 commit comments

Comments
 (0)