Skip to content

Commit a46f7d1

Browse files
terryyylimTerencewoop
authored
Add subscriptions blacklist functionality (#813)
* Add subscriptions blacklist functionality * Resolve PR comments * Change comments on exclude flag Co-authored-by: Terence <terence.limxp@go-jek.com> Co-authored-by: Willem Pienaar <6728866+woop@users.noreply.github.com>
1 parent 46e3e09 commit a46f7d1

File tree

5 files changed

+254
-31
lines changed

5 files changed

+254
-31
lines changed

common/src/main/java/feast/common/models/Store.java

Lines changed: 85 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,47 @@
1717
package feast.common.models;
1818

1919
import feast.proto.core.StoreProto.Store.Subscription;
20+
import java.util.Arrays;
2021
import java.util.List;
2122
import java.util.regex.Pattern;
23+
import java.util.stream.Collectors;
2224

2325
public class Store {
2426

27+
/**
28+
* Accepts a comma-delimited string and converts it to a list of Subscription class objects.
29+
*
30+
* @param subscriptions String formatted Subscriptions, comma delimited.
31+
* @return List of Subscription class objects
32+
*/
33+
public static List<Subscription> parseSubFromStr(String subscriptions) {
34+
List<Subscription> allSubscriptions =
35+
Arrays.stream(subscriptions.split(","))
36+
.map(subscriptionStr -> convertStringToSubscription(subscriptionStr))
37+
.collect(Collectors.toList());
38+
39+
return allSubscriptions;
40+
}
41+
42+
/**
43+
* Accepts a comma-delimited string and converts it to a list of Subscription class objects, with
44+
* exclusions filtered out.
45+
*
46+
* @param subscriptions String formatted Subscriptions, comma delimited.
47+
* @return List of Subscription class objects
48+
*/
49+
public static List<Subscription> parseSubFromStrWithoutExclusions(String subscriptions) {
50+
List<Subscription> allSubscriptions =
51+
Arrays.stream(subscriptions.split(","))
52+
.map(subscriptionStr -> convertStringToSubscription(subscriptionStr))
53+
.collect(Collectors.toList());
54+
55+
allSubscriptions =
56+
allSubscriptions.stream().filter(sub -> !sub.getExclude()).collect(Collectors.toList());
57+
58+
return allSubscriptions;
59+
}
60+
2561
/**
2662
* Accepts a Subscription class object and returns it in string format
2763
*
@@ -34,7 +70,8 @@ public static String parseSubscriptionFrom(Subscription subscription) {
3470
String.format("Missing arguments in subscription string: %s", subscription.toString()));
3571
}
3672

37-
return String.format("%s:%s", subscription.getProject(), subscription.getName());
73+
return String.format(
74+
"%s:%s:%s", subscription.getProject(), subscription.getName(), subscription.getExclude());
3875
}
3976

4077
/**
@@ -48,7 +85,15 @@ public static Subscription convertStringToSubscription(String subscription) {
4885
return Subscription.newBuilder().build();
4986
}
5087
String[] split = subscription.split(":");
51-
return Subscription.newBuilder().setProject(split[0]).setName(split[1]).build();
88+
if (split.length == 2) {
89+
// Backward compatibility check
90+
return Subscription.newBuilder().setProject(split[0]).setName(split[1]).build();
91+
}
92+
return Subscription.newBuilder()
93+
.setProject(split[0])
94+
.setName(split[1])
95+
.setExclude(Boolean.parseBoolean(split[2]))
96+
.build();
5297
}
5398

5499
/**
@@ -62,37 +107,57 @@ public static Subscription convertStringToSubscription(String subscription) {
62107
*/
63108
public static boolean isSubscribedToFeatureSet(
64109
List<Subscription> subscriptions, String projectName, String featureSetName) {
110+
// Case 1: Highest priority check, to exclude all matching subscriptions with excluded flag =
111+
// true
65112
for (Subscription sub : subscriptions) {
66113
// If configuration missing, fail
67114
if (sub.getProject().isEmpty() || sub.getName().isEmpty()) {
68115
throw new IllegalArgumentException(
69116
String.format("Subscription is missing arguments: %s", sub.toString()));
70117
}
118+
// Match feature set name to pattern
119+
Pattern patternName = getNamePattern(sub);
120+
Pattern patternProject = getProjectPattern(sub);
121+
// SubCase: Project name and feature set name matches and excluded flag is true
122+
if (patternProject.matcher(projectName).matches()
123+
&& patternName.matcher(featureSetName).matches()
124+
&& sub.getExclude()) {
125+
return false;
126+
}
127+
}
128+
// Case 2: Featureset is not excluded, check if it is included in the current subscriptions
129+
// filteredSubscriptions only contain subscriptions with excluded flag = false
130+
List<Subscription> filteredSubscriptions =
131+
subscriptions.stream().filter(sub -> !sub.getExclude()).collect(Collectors.toList());
71132

72-
// If all wildcards, subscribe to everything
73-
if (sub.getProject().equals("*") || sub.getName().equals("*")) {
133+
for (Subscription filteredSub : filteredSubscriptions) {
134+
// Match feature set name to pattern
135+
Pattern patternName = getNamePattern(filteredSub);
136+
Pattern patternProject = getProjectPattern(filteredSub);
137+
// SubCase: Project name and feature set name matches
138+
if (patternProject.matcher(projectName).matches()
139+
&& patternName.matcher(featureSetName).matches()) {
74140
return true;
75141
}
142+
}
143+
return false;
144+
}
76145

77-
// Match project name
78-
if (!projectName.equals(sub.getProject())) {
79-
continue;
80-
}
146+
private static Pattern getProjectPattern(Subscription subscription) {
147+
String subProject = subscription.getProject();
148+
if (!subscription.getProject().contains(".*")) {
149+
subProject = subProject.replace("*", ".*");
150+
}
81151

82-
// Convert wildcard to regex
83-
String subName = sub.getName();
84-
if (!sub.getName().contains(".*")) {
85-
subName = subName.replace("*", ".*");
86-
}
152+
return Pattern.compile(subProject);
153+
}
87154

88-
// Match feature set name to pattern
89-
Pattern pattern = Pattern.compile(subName);
90-
if (!pattern.matcher(featureSetName).matches()) {
91-
continue;
92-
}
93-
return true;
155+
private static Pattern getNamePattern(Subscription subscription) {
156+
String subName = subscription.getName();
157+
if (!subscription.getProject().contains(".*")) {
158+
subName = subName.replace("*", ".*");
94159
}
95160

96-
return false;
161+
return Pattern.compile(subName);
97162
}
98163
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright 2018-2020 The Feast Authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package feast.common.models;
18+
19+
import static org.hamcrest.MatcherAssert.assertThat;
20+
import static org.hamcrest.core.IsEqual.equalTo;
21+
import static org.junit.Assert.assertTrue;
22+
23+
import feast.proto.core.StoreProto.Store.Subscription;
24+
import java.util.Arrays;
25+
import java.util.List;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
29+
public class StoreTest {
30+
31+
private List<Subscription> allSubscriptions;
32+
33+
@Before
34+
public void setUp() {
35+
36+
Subscription emptySubscription = Subscription.newBuilder().build();
37+
Subscription subscription1 = Subscription.newBuilder().setProject("*").setName("*").build();
38+
Subscription subscription2 =
39+
Subscription.newBuilder().setProject("project1").setName("fs_2").build();
40+
Subscription subscription3 =
41+
Subscription.newBuilder().setProject("project1").setName("fs_1").setExclude(true).build();
42+
allSubscriptions =
43+
Arrays.asList(emptySubscription, subscription1, subscription2, subscription3);
44+
}
45+
46+
@Test
47+
public void shouldReturnSubscriptionsBasedOnStr() {
48+
String subscriptions = "project1:fs_1:true,project1:fs_2";
49+
List<Subscription> actual1 = Store.parseSubFromStr(subscriptions);
50+
List<Subscription> expected1 = Arrays.asList(allSubscriptions.get(2), allSubscriptions.get(3));
51+
52+
List<Subscription> actual2 = Store.parseSubFromStrWithoutExclusions(subscriptions);
53+
List<Subscription> expected2 = Arrays.asList(allSubscriptions.get(2));
54+
55+
assertTrue(actual1.containsAll(expected1) && expected1.containsAll(actual1));
56+
assertTrue(actual2.containsAll(expected2) && expected2.containsAll(actual2));
57+
}
58+
59+
@Test
60+
public void shouldReturnStringBasedOnSubscription() {
61+
// Case: default exclude should be false
62+
String actual1 = Store.parseSubscriptionFrom(allSubscriptions.get(2));
63+
Subscription sub1 = allSubscriptions.get(2);
64+
String expected1 = sub1.getProject() + ":" + sub1.getName() + ":" + sub1.getExclude();
65+
66+
// Case: explicit setting of exclude to true
67+
String actual2 = Store.parseSubscriptionFrom(allSubscriptions.get(3));
68+
Subscription sub2 = allSubscriptions.get(3);
69+
String expected2 = sub2.getProject() + ":" + sub2.getName() + ":" + sub2.getExclude();
70+
71+
assertThat(actual1, equalTo(expected1));
72+
assertThat(actual2, equalTo(expected2));
73+
}
74+
75+
@Test
76+
public void shouldSubscribeToFeatureSet() {
77+
allSubscriptions = allSubscriptions.subList(2, 4);
78+
// Case: excluded flag = true
79+
boolean actual1 = Store.isSubscribedToFeatureSet(allSubscriptions, "project1", "fs_1");
80+
boolean expected1 = false;
81+
82+
// Case: excluded flag = false
83+
boolean actual2 = Store.isSubscribedToFeatureSet(allSubscriptions, "project1", "fs_2");
84+
boolean expected2 = true;
85+
86+
// Case: featureset does not exist
87+
boolean actual3 =
88+
Store.isSubscribedToFeatureSet(allSubscriptions, "project1", "fs_nonexistent");
89+
boolean expected3 = false;
90+
91+
assertThat(actual1, equalTo(expected1));
92+
assertThat(actual2, equalTo(expected2));
93+
assertThat(actual3, equalTo(expected3));
94+
}
95+
}

ingestion/src/test/java/feast/ingestion/transform/FeatureRowToStoreAllocatorTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
import com.google.common.collect.ImmutableList;
2020
import com.google.common.collect.ImmutableMap;
2121
import feast.proto.core.StoreProto;
22+
import feast.proto.core.StoreProto.Store.Subscription;
2223
import feast.proto.types.FeatureRowProto;
24+
import java.util.Arrays;
25+
import java.util.List;
2326
import java.util.Map;
2427
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
2528
import org.apache.beam.sdk.testing.PAssert;
@@ -41,6 +44,10 @@ private StoreProto.Store newStore(String s) {
4144
.build();
4245
}
4346

47+
private StoreProto.Store newStore(List<StoreProto.Store.Subscription> subscriptionList) {
48+
return StoreProto.Store.newBuilder().addAllSubscriptions(subscriptionList).build();
49+
}
50+
4451
@Test
4552
public void featureRowShouldBeAllocatedToStoreTagsAccordingToSubscription() {
4653
StoreProto.Store bqOnlyStore = newStore("bq*");
@@ -96,4 +103,55 @@ public void featureRowShouldBeAllocatedToStoreTagsAccordingToSubscription() {
96103

97104
p.run();
98105
}
106+
107+
@Test
108+
public void featureRowShouldBeAllocatedToStoreTagsAccordingToSubscriptionBlacklist() {
109+
Subscription subscription1 = Subscription.newBuilder().setProject("*").setName("*").build();
110+
Subscription subscription2 =
111+
Subscription.newBuilder().setProject("project1").setName("fs_2").build();
112+
Subscription subscription3 =
113+
Subscription.newBuilder().setProject("project1").setName("fs_1").setExclude(true).build();
114+
Subscription subscription4 =
115+
Subscription.newBuilder().setProject("project2").setName("*").setExclude(true).build();
116+
117+
List<Subscription> testStoreSubscriptions1 =
118+
Arrays.asList(subscription1, subscription2, subscription3);
119+
StoreProto.Store testStore1 = newStore(testStoreSubscriptions1);
120+
121+
List<Subscription> testStoreSubscriptions2 = Arrays.asList(subscription1, subscription4);
122+
StoreProto.Store testStore2 = newStore(testStoreSubscriptions2);
123+
124+
Map<StoreProto.Store, TupleTag<FeatureRowProto.FeatureRow>> storeTags =
125+
ImmutableMap.of(
126+
testStore1, new TupleTag<>(),
127+
testStore2, new TupleTag<>());
128+
129+
PCollectionTuple allocatedRows =
130+
p.apply(
131+
Create.of(
132+
FeatureRowProto.FeatureRow.newBuilder().setFeatureSet("project1/fs_1").build(),
133+
FeatureRowProto.FeatureRow.newBuilder().setFeatureSet("project2/fs_1").build(),
134+
FeatureRowProto.FeatureRow.newBuilder().setFeatureSet("project2/fs_2").build()))
135+
.apply(
136+
FeatureRowToStoreAllocator.newBuilder()
137+
.setStoreTags(storeTags)
138+
.setStores(ImmutableList.of(testStore1, testStore2))
139+
.build());
140+
141+
PAssert.that(
142+
allocatedRows
143+
.get(storeTags.get(testStore1))
144+
.setCoder(ProtoCoder.of(FeatureRowProto.FeatureRow.class))
145+
.apply("CountStore1", Count.globally()))
146+
.containsInAnyOrder(2L);
147+
148+
PAssert.that(
149+
allocatedRows
150+
.get(storeTags.get(testStore2))
151+
.setCoder(ProtoCoder.of(FeatureRowProto.FeatureRow.class))
152+
.apply("CountStore2", Count.globally()))
153+
.containsInAnyOrder(1L);
154+
155+
p.run();
156+
}
99157
}

protos/feast/core/Store.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ message Store {
149149
// - my-feature-set-6 can be used to select a single feature set
150150
string name = 1;
151151

152+
// All matches with exclude enabled will be filtered out instead of added
153+
bool exclude = 4;
154+
152155
// Feature set version was removed in v0.5.0.
153156
reserved 2;
154157
}

serving/src/main/java/feast/serving/specs/CachedSpecService.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -195,18 +195,20 @@ private Map<String, FeatureSetSpec> getFeatureSetMap() {
195195

196196
for (Subscription subscription : this.store.getSubscriptionsList()) {
197197
try {
198-
ListFeatureSetsResponse featureSetsResponse =
199-
coreService.listFeatureSets(
200-
ListFeatureSetsRequest.newBuilder()
201-
.setFilter(
202-
ListFeatureSetsRequest.Filter.newBuilder()
203-
.setProject(subscription.getProject())
204-
.setFeatureSetName(subscription.getName()))
205-
.build());
198+
if (!subscription.getExclude()) {
199+
ListFeatureSetsResponse featureSetsResponse =
200+
coreService.listFeatureSets(
201+
ListFeatureSetsRequest.newBuilder()
202+
.setFilter(
203+
ListFeatureSetsRequest.Filter.newBuilder()
204+
.setProject(subscription.getProject())
205+
.setFeatureSetName(subscription.getName()))
206+
.build());
206207

207-
for (FeatureSet featureSet : featureSetsResponse.getFeatureSetsList()) {
208-
FeatureSetSpec spec = featureSet.getSpec();
209-
featureSets.put(getFeatureSetStringRef(spec), spec);
208+
for (FeatureSet featureSet : featureSetsResponse.getFeatureSetsList()) {
209+
FeatureSetSpec spec = featureSet.getSpec();
210+
featureSets.put(getFeatureSetStringRef(spec), spec);
211+
}
210212
}
211213
} catch (StatusRuntimeException e) {
212214
throw new RuntimeException(

0 commit comments

Comments
 (0)