Skip to content

Commit 25ff687

Browse files
author
Oleksii Moskalenko
authored
Improve parallelization in Redis Sink (#866)
* replace group with batch * featureReference as key * configurable flush frequncy in redis sink * pull config to RedisSink
1 parent b0fb5bb commit 25ff687

File tree

8 files changed

+57
-41
lines changed

8 files changed

+57
-41
lines changed

infra/scripts/setup-common-functions.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ start_feast_serving() {
9898

9999
if [ -n "$1" ]; then
100100
echo "Custom Spring application.yml location provided: $1"
101-
export CONFIG_ARG="--spring.config.location=file://$1"
101+
export CONFIG_ARG="--spring.config.location=classpath:/application.yml,file://$1"
102102
fi
103103

104104
nohup java -jar serving/target/feast-serving-$FEAST_BUILD_VERSION.jar $CONFIG_ARG &>/var/log/feast-serving-online.log &

infra/scripts/test-end-to-end-redis-cluster.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ feast:
5656
config:
5757
# Connection string specifies the IP and ports of Redis instances in Redis cluster
5858
connection_string: "localhost:7000,localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7005"
59+
flush_frequency_seconds: 1
5960
# Subscriptions indicate which feature sets needs to be retrieved and used to populate this store
6061
subscriptions:
6162
# Wildcards match all options. No filtering is done.

infra/scripts/test-end-to-end.sh

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,21 @@ if [[ ${ENABLE_AUTH} = "True" ]];
7474
start_feast_core
7575
fi
7676

77-
start_feast_serving
77+
cat <<EOF > /tmp/serving.warehouse.application.yml
78+
feast:
79+
stores:
80+
- name: online
81+
type: REDIS
82+
config:
83+
host: localhost
84+
port: 6379
85+
flush_frequency_seconds: 1
86+
subscriptions:
87+
- name: "*"
88+
project: "*"
89+
EOF
90+
91+
start_feast_serving /tmp/serving.warehouse.application.yml
7892
install_python_with_miniconda_and_feast_sdk
7993

8094
print_banner "Running end-to-end tests with pytest at 'tests/e2e'"

protos/feast/core/Store.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ message Store {
108108
int32 initial_backoff_ms = 3;
109109
// Optional. Maximum total number of retries for connecting to Redis. Default to zero retries.
110110
int32 max_retries = 4;
111+
// Optional. how often flush data to redis
112+
int32 flush_frequency_seconds = 5;
111113
}
112114

113115
message BigQueryConfig {
@@ -129,6 +131,8 @@ message Store {
129131
string connection_string = 1;
130132
int32 initial_backoff_ms = 2;
131133
int32 max_retries = 3;
134+
// Optional. how often flush data to redis
135+
int32 flush_frequency_seconds = 4;
132136
}
133137

134138
message Subscription {

storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,12 @@
4040
import org.apache.beam.sdk.values.*;
4141
import org.apache.commons.lang3.exception.ExceptionUtils;
4242
import org.apache.commons.lang3.tuple.ImmutablePair;
43+
import org.joda.time.Duration;
4344
import org.slf4j.Logger;
4445
import org.slf4j.LoggerFactory;
4546

4647
public class RedisCustomIO {
4748

48-
private static final int DEFAULT_BATCH_SIZE = 1000;
49-
private static final int DEFAULT_TIMEOUT = 2000;
50-
5149
private static TupleTag<FeatureRow> successfulInsertsTag =
5250
new TupleTag<FeatureRow>("successfulInserts") {};
5351
private static TupleTag<FailedElement> failedInsertsTupleTag =
@@ -69,7 +67,7 @@ public static class Write extends PTransform<PCollection<FeatureRow>, WriteResul
6967
private PCollectionView<Map<String, Iterable<FeatureSetSpec>>> featureSetSpecs;
7068
private RedisIngestionClient redisIngestionClient;
7169
private int batchSize;
72-
private int timeout;
70+
private Duration flushFrequency;
7371

7472
public Write(
7573
RedisIngestionClient redisIngestionClient,
@@ -83,23 +81,28 @@ public Write withBatchSize(int batchSize) {
8381
return this;
8482
}
8583

86-
public Write withTimeout(int timeout) {
87-
this.timeout = timeout;
84+
public Write withFlushFrequency(Duration frequency) {
85+
this.flushFrequency = frequency;
8886
return this;
8987
}
9088

9189
@Override
9290
public WriteResult expand(PCollection<FeatureRow> input) {
9391
PCollectionTuple redisWrite =
9492
input
93+
.apply("FixedFlushWindow", Window.<FeatureRow>into(FixedWindows.of(flushFrequency)))
9594
.apply(
96-
"CollectBatchBeforeWrite",
97-
Window.<FeatureRow>into(new GlobalWindows())
98-
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(batchSize)))
99-
.discardingFiredPanes())
100-
.apply("AttachSingletonKey", WithKeys.of((Void) null))
101-
.apply("GroupOntoSingleton", GroupByKey.create())
95+
"AttachFeatureReferenceKey",
96+
ParDo.of(
97+
new DoFn<FeatureRow, KV<String, FeatureRow>>() {
98+
@ProcessElement
99+
public void process(ProcessContext c) {
100+
c.output(KV.of(c.element().getFeatureSet(), c.element()));
101+
}
102+
}))
103+
.apply("IntoBatches", GroupIntoBatches.ofSize(batchSize))
102104
.apply("ExtractResultValues", Values.create())
105+
.apply("GlobalWindow", Window.<Iterable<FeatureRow>>into(new GlobalWindows()))
103106
.apply(
104107
ParDo.of(new WriteDoFn(redisIngestionClient, featureSetSpecs))
105108
.withOutputTags(successfulInsertsTag, TupleTagList.of(failedInsertsTupleTag))
@@ -112,8 +115,6 @@ public WriteResult expand(PCollection<FeatureRow> input) {
112115

113116
public static class WriteDoFn extends DoFn<Iterable<FeatureRow>, FeatureRow> {
114117
private PCollectionView<Map<String, Iterable<FeatureSetSpec>>> featureSetSpecsView;
115-
private int batchSize = DEFAULT_BATCH_SIZE;
116-
private int timeout = DEFAULT_TIMEOUT;
117118
private RedisIngestionClient redisIngestionClient;
118119

119120
WriteDoFn(
@@ -124,20 +125,6 @@ public static class WriteDoFn extends DoFn<Iterable<FeatureRow>, FeatureRow> {
124125
this.featureSetSpecsView = featureSetSpecsView;
125126
}
126127

127-
public WriteDoFn withBatchSize(int batchSize) {
128-
if (batchSize > 0) {
129-
this.batchSize = batchSize;
130-
}
131-
return this;
132-
}
133-
134-
public WriteDoFn withTimeout(int timeout) {
135-
if (timeout > 0) {
136-
this.timeout = timeout;
137-
}
138-
return this;
139-
}
140-
141128
@Setup
142129
public void setup() {
143130
this.redisIngestionClient.setup();

storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisFeatureSink.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,12 @@
3535
import org.apache.beam.sdk.values.KV;
3636
import org.apache.beam.sdk.values.PCollection;
3737
import org.apache.beam.sdk.values.PCollectionView;
38+
import org.joda.time.Duration;
3839

3940
@AutoValue
4041
public abstract class RedisFeatureSink implements FeatureSink {
42+
private static final int DEFAULT_BATCH_SIZE = 10000;
43+
private static final int DEFAULT_FREQUENCY_SECONDS = 30;
4144

4245
/**
4346
* Initialize a {@link RedisFeatureSink.Builder} from a {@link StoreProto.Store.RedisConfig}.
@@ -112,12 +115,28 @@ public PCollection<FeatureSetReference> prepareWrite(
112115

113116
@Override
114117
public PTransform<PCollection<FeatureRow>, WriteResult> writer() {
118+
int flushFrequencySeconds = DEFAULT_FREQUENCY_SECONDS;
119+
115120
if (getRedisClusterConfig() != null) {
121+
122+
if (getRedisClusterConfig().getFlushFrequencySeconds() > 0) {
123+
flushFrequencySeconds = getRedisClusterConfig().getFlushFrequencySeconds();
124+
}
125+
116126
return new RedisCustomIO.Write(
117-
new RedisClusterIngestionClient(getRedisClusterConfig()), getSpecsView());
127+
new RedisClusterIngestionClient(getRedisClusterConfig()), getSpecsView())
128+
.withFlushFrequency(Duration.standardSeconds(flushFrequencySeconds))
129+
.withBatchSize(DEFAULT_BATCH_SIZE);
130+
118131
} else if (getRedisConfig() != null) {
132+
if (getRedisConfig().getFlushFrequencySeconds() > 0) {
133+
flushFrequencySeconds = getRedisConfig().getFlushFrequencySeconds();
134+
}
135+
119136
return new RedisCustomIO.Write(
120-
new RedisStandaloneIngestionClient(getRedisConfig()), getSpecsView());
137+
new RedisStandaloneIngestionClient(getRedisConfig()), getSpecsView())
138+
.withFlushFrequency(Duration.standardSeconds(flushFrequencySeconds))
139+
.withBatchSize(DEFAULT_BATCH_SIZE);
121140
} else {
122141
throw new RuntimeException(
123142
"At least one RedisConfig or RedisClusterConfig must be provided to Redis Sink");

storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisClusterFeatureSinkTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import feast.proto.types.FieldProto.Field;
3434
import feast.proto.types.ValueProto.Value;
3535
import feast.proto.types.ValueProto.ValueType.Enum;
36-
import feast.storage.api.writer.FailedElement;
3736
import io.lettuce.core.RedisURI;
3837
import io.lettuce.core.cluster.RedisClusterClient;
3938
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
@@ -221,7 +220,6 @@ public void shouldRetryFailConnection() throws InterruptedException {
221220
p.apply(Create.of(featureRows))
222221
.apply(redisClusterFeatureSink.writer())
223222
.getFailedInserts()
224-
.apply(Window.<FailedElement>into(new GlobalWindows()).triggering(Never.ever()))
225223
.apply(Count.globally());
226224

227225
redisCluster.stop();
@@ -283,7 +281,6 @@ public void shouldProduceFailedElementIfRetryExceeded() {
283281
p.apply(Create.of(featureRows))
284282
.apply("modifiedSink", redisClusterFeatureSink.writer())
285283
.getFailedInserts()
286-
.apply(Window.<FailedElement>into(new GlobalWindows()).triggering(Never.ever()))
287284
.apply(Count.globally());
288285

289286
PAssert.that(failedElementCount).containsInAnyOrder(1L);

storage/connectors/redis/src/test/java/feast/storage/connectors/redis/writer/RedisFeatureSinkTest.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import feast.proto.types.FieldProto.Field;
3535
import feast.proto.types.ValueProto.Value;
3636
import feast.proto.types.ValueProto.ValueType.Enum;
37-
import feast.storage.api.writer.FailedElement;
3837
import io.lettuce.core.RedisClient;
3938
import io.lettuce.core.RedisURI;
4039
import io.lettuce.core.api.StatefulRedisConnection;
@@ -49,9 +48,6 @@
4948
import org.apache.beam.sdk.testing.TestPipeline;
5049
import org.apache.beam.sdk.transforms.Count;
5150
import org.apache.beam.sdk.transforms.Create;
52-
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
53-
import org.apache.beam.sdk.transforms.windowing.Never;
54-
import org.apache.beam.sdk.transforms.windowing.Window;
5551
import org.apache.beam.sdk.values.PCollection;
5652
import org.junit.After;
5753
import org.junit.Before;
@@ -212,7 +208,6 @@ public void shouldRetryFailConnection() throws InterruptedException {
212208
p.apply(Create.of(featureRows))
213209
.apply(redisFeatureSink.writer())
214210
.getFailedInserts()
215-
.apply(Window.<FailedElement>into(new GlobalWindows()).triggering(Never.ever()))
216211
.apply(Count.globally());
217212

218213
redis.stop();
@@ -271,7 +266,6 @@ public void shouldProduceFailedElementIfRetryExceeded() {
271266
p.apply(Create.of(featureRows))
272267
.apply(redisFeatureSink.writer())
273268
.getFailedInserts()
274-
.apply(Window.<FailedElement>into(new GlobalWindows()).triggering(Never.ever()))
275269
.apply(Count.globally());
276270

277271
redis.stop();

0 commit comments

Comments
 (0)