Skip to content

Commit 2729f17

Browse files
authored
Support of GC and S3 storages for registry in Java Feature Server (#2043)
* gs and s3 storages for registry Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * some cleanup Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
1 parent 73af2fa commit 2729f17

23 files changed

+788
-270
lines changed

java/serving/pom.xml

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,31 @@
211211
<version>1.6.6</version>
212212
</dependency>
213213

214+
<dependency>
215+
<groupId>com.google.cloud</groupId>
216+
<artifactId>google-cloud-storage</artifactId>
217+
<version>1.118.0</version>
218+
</dependency>
219+
220+
<dependency>
221+
<groupId>com.google.cloud</groupId>
222+
<artifactId>google-cloud-nio</artifactId>
223+
<version>0.123.10</version>
224+
<scope>test</scope>
225+
</dependency>
226+
227+
<dependency>
228+
<groupId>com.amazonaws</groupId>
229+
<artifactId>aws-java-sdk-s3</artifactId>
230+
<version>1.12.110</version>
231+
</dependency>
232+
233+
<dependency>
234+
<groupId>com.adobe.testing</groupId>
235+
<artifactId>s3mock-testcontainers</artifactId>
236+
<version>2.2.3</version>
237+
<scope>test</scope>
238+
</dependency>
214239

215240
<!--testCompile "io.grpc:grpc-testing:${grpc.version}"-->
216241
<dependency>
@@ -310,7 +335,7 @@
310335
<dependency>
311336
<groupId>org.awaitility</groupId>
312337
<artifactId>awaitility</artifactId>
313-
<version>3.0.0</version>
338+
<version>4.1.1</version>
314339
<scope>test</scope>
315340
</dependency>
316341
<dependency>

java/serving/src/main/java/feast/serving/config/FeastProperties.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,36 @@ public void setRegistry(final String registry) {
6868
this.registry = registry;
6969
}
7070

71+
private int registryRefreshInterval;
72+
73+
public int getRegistryRefreshInterval() {
74+
return registryRefreshInterval;
75+
}
76+
77+
public void setRegistryRefreshInterval(final int registryRefreshInterval) {
78+
this.registryRefreshInterval = registryRefreshInterval;
79+
}
80+
81+
private String gcpProject;
82+
83+
public String getGcpProject() {
84+
return gcpProject;
85+
}
86+
87+
public void setGcpProject(final String gcpProject) {
88+
this.gcpProject = gcpProject;
89+
}
90+
91+
private String awsRegion;
92+
93+
public String getAwsRegion() {
94+
return awsRegion;
95+
}
96+
97+
public void setAwsRegion(final String awsRegion) {
98+
this.awsRegion = awsRegion;
99+
}
100+
71101
private String transformationServiceEndpoint;
72102

73103
public String getTransformationServiceEndpoint() {
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright 2018-2021 The Feast Authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package feast.serving.config;
18+
19+
import com.amazonaws.services.s3.AmazonS3;
20+
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
21+
import com.google.cloud.storage.Storage;
22+
import com.google.cloud.storage.StorageOptions;
23+
import feast.serving.registry.*;
24+
import java.net.URI;
25+
import java.nio.file.Paths;
26+
import java.util.Optional;
27+
import org.springframework.context.ApplicationContext;
28+
import org.springframework.context.annotation.Bean;
29+
import org.springframework.context.annotation.Configuration;
30+
import org.springframework.context.annotation.Lazy;
31+
32+
@Configuration
33+
public class RegistryConfig {
34+
@Bean
35+
@Lazy
36+
Storage googleStorage(FeastProperties feastProperties) {
37+
return StorageOptions.newBuilder()
38+
.setProjectId(feastProperties.getGcpProject())
39+
.build()
40+
.getService();
41+
}
42+
43+
@Bean
44+
@Lazy
45+
AmazonS3 awsStorage(FeastProperties feastProperties) {
46+
return AmazonS3ClientBuilder.standard().withRegion(feastProperties.getAwsRegion()).build();
47+
}
48+
49+
@Bean
50+
RegistryFile registryFile(FeastProperties feastProperties, ApplicationContext context) {
51+
52+
String registryPath = feastProperties.getRegistry();
53+
Optional<String> scheme = Optional.ofNullable(URI.create(registryPath).getScheme());
54+
55+
switch (scheme.orElseGet(() -> "")) {
56+
case "gs":
57+
return new GSRegistryFile(context.getBean(Storage.class), registryPath);
58+
case "s3":
59+
return new S3RegistryFile(context.getBean(AmazonS3.class), registryPath);
60+
case "":
61+
case "file":
62+
return new LocalRegistryFile(Paths.get(registryPath));
63+
default:
64+
throw new RuntimeException("Registry storage %s is unsupported");
65+
}
66+
}
67+
68+
@Bean
69+
RegistryRepository registryRepository(
70+
RegistryFile registryFile, FeastProperties feastProperties) {
71+
return new RegistryRepository(registryFile, feastProperties.getRegistryRefreshInterval());
72+
}
73+
}

java/serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,13 @@
1616
*/
1717
package feast.serving.config;
1818

19-
import feast.serving.registry.LocalRegistryRepo;
19+
import feast.serving.registry.*;
2020
import feast.serving.service.OnlineServingServiceV2;
2121
import feast.serving.service.OnlineTransformationService;
2222
import feast.serving.service.ServingServiceV2;
23-
import feast.serving.specs.FeatureSpecRetriever;
24-
import feast.serving.specs.RegistryFeatureSpecRetriever;
2523
import feast.storage.api.retriever.OnlineRetrieverV2;
2624
import feast.storage.connectors.redis.retriever.*;
2725
import io.opentracing.Tracer;
28-
import java.nio.file.Paths;
2926
import org.slf4j.Logger;
3027
import org.springframework.context.annotation.Bean;
3128
import org.springframework.context.annotation.Configuration;
@@ -36,7 +33,7 @@ public class ServingServiceConfigV2 {
3633

3734
@Bean
3835
public ServingServiceV2 registryBasedServingServiceV2(
39-
FeastProperties feastProperties, Tracer tracer) {
36+
FeastProperties feastProperties, RegistryRepository registryRepository, Tracer tracer) {
4037
final ServingServiceV2 servingService;
4138
final FeastProperties.Store store = feastProperties.getActiveStore();
4239

@@ -56,23 +53,19 @@ public ServingServiceV2 registryBasedServingServiceV2(
5653
default:
5754
throw new RuntimeException(
5855
String.format(
59-
"Unable to identify online store type: %s for Regsitry Backed Serving Service",
56+
"Unable to identify online store type: %s for Registry Backed Serving Service",
6057
store.getType()));
6158
}
6259

63-
final FeatureSpecRetriever featureSpecRetriever;
64-
log.info("Created RegistryFeatureSpecRetriever");
6560
log.info("Working Directory = " + System.getProperty("user.dir"));
66-
final LocalRegistryRepo repo = new LocalRegistryRepo(Paths.get(feastProperties.getRegistry()));
67-
featureSpecRetriever = new RegistryFeatureSpecRetriever(repo);
6861

6962
final String transformationServiceEndpoint = feastProperties.getTransformationServiceEndpoint();
7063
final OnlineTransformationService onlineTransformationService =
71-
new OnlineTransformationService(transformationServiceEndpoint, featureSpecRetriever);
64+
new OnlineTransformationService(transformationServiceEndpoint, registryRepository);
7265

7366
servingService =
7467
new OnlineServingServiceV2(
75-
retrieverV2, tracer, featureSpecRetriever, onlineTransformationService);
68+
retrieverV2, tracer, registryRepository, onlineTransformationService);
7669

7770
return servingService;
7871
}

java/serving/src/main/java/feast/serving/controller/ServingServiceGRpcController.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import net.devh.boot.grpc.server.service.GrpcService;
3737
import org.slf4j.Logger;
3838
import org.springframework.beans.factory.annotation.Autowired;
39-
import org.springframework.security.access.AccessDeniedException;
4039

4140
@GrpcService(
4241
interceptors = {
@@ -89,19 +88,13 @@ public void getOnlineFeaturesV2(
8988
responseObserver.onNext(onlineFeatures);
9089
responseObserver.onCompleted();
9190
} catch (SpecRetrievalException e) {
92-
log.error("Failed to retrieve specs in SpecService", e);
91+
log.error("Failed to retrieve specs from Registry", e);
9392
responseObserver.onError(
9493
Status.NOT_FOUND.withDescription(e.getMessage()).withCause(e).asException());
95-
} catch (AccessDeniedException e) {
96-
log.info(String.format("User prevented from accessing one of the projects in request"));
97-
responseObserver.onError(
98-
Status.PERMISSION_DENIED
99-
.withDescription(e.getMessage())
100-
.withCause(e)
101-
.asRuntimeException());
10294
} catch (Exception e) {
10395
log.warn("Failed to get Online Features", e);
104-
responseObserver.onError(e);
96+
responseObserver.onError(
97+
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
10598
}
10699
}
107100
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright 2018-2021 The Feast Authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package feast.serving.registry;
18+
19+
import com.google.cloud.storage.*;
20+
import com.google.protobuf.InvalidProtocolBufferException;
21+
import feast.proto.core.RegistryProto;
22+
import java.util.Optional;
23+
24+
public class GSRegistryFile implements RegistryFile {
25+
private Blob blob;
26+
27+
public GSRegistryFile(Storage storage, String url) {
28+
blob = storage.get(BlobId.fromGsUtilUri(url));
29+
if (blob == null) {
30+
throw new RuntimeException(String.format("Registry file %s was not found", url));
31+
}
32+
}
33+
34+
public RegistryProto.Registry getContent() {
35+
try {
36+
return RegistryProto.Registry.parseFrom(blob.getContent());
37+
} catch (InvalidProtocolBufferException e) {
38+
throw new RuntimeException(
39+
String.format(
40+
"Couldn't read remote registry: %s. Error: %s",
41+
blob.getBlobId().toGsUtilUri(), e.getMessage()));
42+
}
43+
}
44+
45+
public Optional<RegistryProto.Registry> getContentIfModified() {
46+
try {
47+
this.blob = blob.reload(Blob.BlobSourceOption.generationNotMatch());
48+
} catch (StorageException e) {
49+
if (e.getCode() == 304) {
50+
// Content not modified
51+
return Optional.empty();
52+
} else {
53+
throw new RuntimeException(
54+
String.format(
55+
"Couldn't read remote registry: %s. Error: %s",
56+
blob.getBlobId().toGsUtilUri(), e.getMessage()));
57+
}
58+
}
59+
60+
return Optional.of(this.getContent());
61+
}
62+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright 2018-2021 The Feast Authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package feast.serving.registry;
18+
19+
import com.google.protobuf.InvalidProtocolBufferException;
20+
import feast.proto.core.RegistryProto;
21+
import java.io.IOException;
22+
import java.nio.file.Files;
23+
import java.nio.file.Paths;
24+
import java.util.Optional;
25+
26+
public class LocalRegistryFile implements RegistryFile {
27+
private RegistryProto.Registry cachedRegistry;
28+
29+
public LocalRegistryFile(String path) {
30+
try {
31+
cachedRegistry = RegistryProto.Registry.parseFrom(Files.readAllBytes(Paths.get(path)));
32+
} catch (InvalidProtocolBufferException e) {
33+
throw new RuntimeException(
34+
String.format(
35+
"Couldn't read local registry: %s. Protobuf is invalid: %s", path, e.getMessage()));
36+
} catch (IOException e) {
37+
throw new RuntimeException(
38+
String.format("Couldn't read local registry file: %s. Error: %s", path, e.getMessage()));
39+
}
40+
}
41+
42+
@Override
43+
public RegistryProto.Registry getContent() {
44+
return this.cachedRegistry;
45+
}
46+
47+
@Override
48+
public Optional<RegistryProto.Registry> getContentIfModified() {
49+
return Optional.empty();
50+
}
51+
}

0 commit comments

Comments
 (0)