Skip to content

Commit 4885d8f

Browse files
davidheryantofeast-ci-bot
authored andcommitted
Update Feast SDK to make staging_location optional when starting import job or downloading dataset (feast-dev#221)
* Add GetUploadURL method to CoreService.proto * Implement CoreService.GetUploadUrl * Update spring boot version from 2.0.4 to 2.0.9 * Update generated protos for Python * Throw checked exception for getBucketNameFromWorkspace Since there is no guarantee that the passed workspace value is a valid GCS URI such error is "expected" and the caller should know how to handle such error e.g. by showing error message to the user. * Add path value to GetUploadUrlResponse * Warn if SchemaManager does not register serving/warehouse store * Make staging_location optional when starting import job or downloading dataset * Remove datastore registering capability and references * Update version of dependencies used in go.mod * Fix typo * Update workspace for integration test * Add google-cloud-bigquery-storage in requirements.txt * Fix typo * Update requirements.txt integration test * Add missing google-cloud-bigquery-storage dependencies in requirements.txt * Fix typo - missing comma * Make test bucket configurable, fix typo, reference default expiry in unit test
1 parent ba3dafa commit 4885d8f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1021
-796
lines changed

.prow/config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ presubmits:
142142
os: ubuntu
143143
containers:
144144
- image: google/cloud-sdk
145-
# securityContext and docker socket vol mounts are needed because we are building
145+
# securityContext and docker socket volume mounts are needed because we are building
146146
# Docker images in this job
147147
securityContext:
148148
privileged: true

.prow/scripts/run_unit_test.sh

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
# - downloading maven cache repository
88
# - saving the test output report so it can be viewed with Spyglass in Prow
99

10+
# Bucket in GCS used for running unit tests, when the unit tests need an
11+
# actual running GCS (e.g. because there is no existing mock implementation of the function to test)
12+
TEST_BUCKET=feast-templocation-kf-feast
13+
1014
usage()
1115
{
1216
echo "usage: run_unit_test.sh
@@ -30,7 +34,7 @@ fi
3034
if [[ ${COMPONENT} == "core" ]] || [[ ${COMPONENT} == "ingestion" ]] || [[ ${COMPONENT} == "serving" ]]; then
3135

3236
.prow/scripts/prepare_maven_cache.sh --archive-uri gs://feast-templocation-kf-feast/.m2.tar --output-dir /root/
33-
mvn --projects ${COMPONENT} test
37+
mvn --projects ${COMPONENT} -Dtestbucket=feast-templocation-kf-feast test
3438
TEST_EXIT_CODE=$?
3539
cp -r ${COMPONENT}/target/surefire-reports /logs/artifacts/surefire-reports
3640

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ test-integration:
2525

2626
build-proto:
2727
$(MAKE) -C protos gen-go
28+
$(MAKE) -C protos gen-python
2829

2930
build-cli:
3031
$(MAKE) build-proto

core/src/main/java/feast/core/grpc/CoreServiceImpl.java

Lines changed: 153 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,22 @@
1717

1818
package feast.core.grpc;
1919

20+
import com.google.cloud.storage.BlobInfo;
21+
import com.google.cloud.storage.Storage;
22+
import com.google.cloud.storage.StorageOptions;
2023
import com.google.protobuf.Empty;
24+
import com.google.protobuf.Timestamp;
2125
import com.timgroup.statsd.StatsDClient;
2226
import feast.core.CoreServiceGrpc.CoreServiceImplBase;
23-
import feast.core.CoreServiceProto.CoreServiceTypes.ApplyEntityResponse;
24-
import feast.core.CoreServiceProto.CoreServiceTypes.ApplyFeatureGroupResponse;
25-
import feast.core.CoreServiceProto.CoreServiceTypes.ApplyFeatureResponse;
26-
import feast.core.CoreServiceProto.CoreServiceTypes.GetEntitiesRequest;
27-
import feast.core.CoreServiceProto.CoreServiceTypes.GetEntitiesResponse;
28-
import feast.core.CoreServiceProto.CoreServiceTypes.GetFeaturesRequest;
29-
import feast.core.CoreServiceProto.CoreServiceTypes.GetFeaturesResponse;
30-
import feast.core.CoreServiceProto.CoreServiceTypes.ListEntitiesResponse;
31-
import feast.core.CoreServiceProto.CoreServiceTypes.ListFeaturesResponse;
27+
import feast.core.CoreServiceProto.CoreServiceTypes.*;
28+
import feast.core.CoreServiceProto.CoreServiceTypes.GetUploadUrlResponse.HttpMethod;
3229
import feast.core.config.StorageConfig.StorageSpecs;
3330
import feast.core.exception.RegistrationException;
3431
import feast.core.exception.RetrievalException;
3532
import feast.core.model.EntityInfo;
3633
import feast.core.model.FeatureGroupInfo;
3734
import feast.core.model.FeatureInfo;
35+
import feast.core.service.JobManagementService;
3836
import feast.core.service.SpecService;
3937
import feast.core.validators.SpecValidator;
4038
import feast.specs.EntitySpecProto.EntitySpec;
@@ -43,30 +41,44 @@
4341
import io.grpc.Status;
4442
import io.grpc.StatusRuntimeException;
4543
import io.grpc.stub.StreamObserver;
46-
import java.util.List;
47-
import java.util.stream.Collectors;
4844
import lombok.extern.slf4j.Slf4j;
45+
import org.apache.commons.codec.digest.DigestUtils;
46+
import org.apache.commons.lang3.RandomUtils;
47+
import org.apache.commons.lang3.StringUtils;
48+
import org.apache.http.NameValuePair;
49+
import org.apache.http.client.utils.URLEncodedUtils;
4950
import org.lognet.springboot.grpc.GRpcService;
5051
import org.springframework.beans.factory.annotation.Autowired;
5152

52-
/**
53-
* Implementation of the feast core GRPC service.
54-
*/
53+
import java.net.URISyntaxException;
54+
import java.net.URL;
55+
import java.nio.charset.Charset;
56+
import java.util.List;
57+
import java.util.concurrent.TimeUnit;
58+
import java.util.stream.Collectors;
59+
60+
/** Implementation of the feast core GRPC service. */
5561
@Slf4j
5662
@GRpcService
5763
public class CoreServiceImpl extends CoreServiceImplBase {
5864

59-
@Autowired
60-
private SpecService specService;
65+
private static final String UPLOAD_URL_DIR = "uploads";
66+
private static final int UPLOAD_URL_VALIDITY_IN_MINUTES = 5;
67+
private Storage storage = StorageOptions.getDefaultInstance().getService();
6168

62-
@Autowired
63-
private SpecValidator validator;
69+
@Autowired private SpecService specService;
70+
@Autowired private SpecValidator validator;
71+
@Autowired private StatsDClient statsDClient;
72+
@Autowired private JobManagementService jobManagementService;
73+
@Autowired private StorageSpecs storageSpecs;
6474

65-
@Autowired
66-
private StatsDClient statsDClient;
75+
public static long getUploadUrlValidityInMinutes() {
76+
return UPLOAD_URL_VALIDITY_IN_MINUTES;
77+
}
6778

68-
@Autowired
69-
private StorageSpecs storageSpecs;
79+
public void setStorage(Storage storage) {
80+
this.storage = storage;
81+
}
7082

7183
/**
7284
* Gets specs for all entities requested in the request. If the retrieval of any one of them
@@ -79,9 +91,7 @@ public void getEntities(
7991
statsDClient.increment("get_entities_request_count");
8092
try {
8193
List<EntitySpec> entitySpecs =
82-
specService
83-
.getEntities(request.getIdsList())
84-
.stream()
94+
specService.getEntities(request.getIdsList()).stream()
8595
.map(EntityInfo::getEntitySpec)
8696
.collect(Collectors.toList());
8797
GetEntitiesResponse response =
@@ -99,18 +109,14 @@ public void getEntities(
99109
}
100110
}
101111

102-
/**
103-
* Gets specs for all entities registered in the registry.
104-
*/
112+
/** Gets specs for all entities registered in the registry. */
105113
@Override
106114
public void listEntities(Empty request, StreamObserver<ListEntitiesResponse> responseObserver) {
107115
long now = System.currentTimeMillis();
108116
statsDClient.increment("list_entities_request_count");
109117
try {
110118
List<EntitySpec> entitySpecs =
111-
specService
112-
.listEntities()
113-
.stream()
119+
specService.listEntities().stream()
114120
.map(EntityInfo::getEntitySpec)
115121
.collect(Collectors.toList());
116122
ListEntitiesResponse response =
@@ -139,9 +145,7 @@ public void getFeatures(
139145
statsDClient.increment("get_features_request_count");
140146
try {
141147
List<FeatureSpec> featureSpecs =
142-
specService
143-
.getFeatures(request.getIdsList())
144-
.stream()
148+
specService.getFeatures(request.getIdsList()).stream()
145149
.map(FeatureInfo::getFeatureSpec)
146150
.collect(Collectors.toList());
147151
GetFeaturesResponse response =
@@ -159,18 +163,14 @@ public void getFeatures(
159163
}
160164
}
161165

162-
/**
163-
* Gets specs for all features registered in the registry. TODO: some kind of pagination
164-
*/
166+
/** Gets specs for all features registered in the registry. TODO: some kind of pagination */
165167
@Override
166168
public void listFeatures(Empty request, StreamObserver<ListFeaturesResponse> responseObserver) {
167169
long now = System.currentTimeMillis();
168170
statsDClient.increment("list_features_request_count");
169171
try {
170172
List<FeatureSpec> featureSpecs =
171-
specService
172-
.listFeatures()
173-
.stream()
173+
specService.listFeatures().stream()
174174
.map(FeatureInfo::getFeatureSpec)
175175
.collect(Collectors.toList());
176176
ListFeaturesResponse response =
@@ -261,6 +261,119 @@ public void applyEntity(
261261
}
262262
}
263263

264+
/**
265+
* Get a signed URL where a Feast client can upload a CSV or JSON file by making an HTTP PUT
266+
* request. The signed URL references a bucket and blob in Google Cloud Storage.
267+
*
268+
* @param request
269+
* @param responseObserver
270+
*/
271+
@Override
272+
public void getUploadUrl(
273+
GetUploadUrlRequest request, StreamObserver<GetUploadUrlResponse> responseObserver) {
274+
String bucketName = null;
275+
276+
try {
277+
bucketName = getBucketNameFromWorkspace(jobManagementService.getWorkspace());
278+
} catch (IllegalArgumentException e) {
279+
responseObserver.onError(
280+
Status.FAILED_PRECONDITION
281+
.withDescription("Failed to get upload URL from workspace\n" + e.getMessage())
282+
.asRuntimeException());
283+
}
284+
assert StringUtils.isNotEmpty(bucketName);
285+
286+
// Generated file names are always unique
287+
String fileName =
288+
String.format(
289+
"%s-%s", System.currentTimeMillis(), DigestUtils.sha1Hex(RandomUtils.nextBytes(8)));
290+
291+
BlobInfo blobInfo =
292+
BlobInfo.newBuilder(
293+
bucketName,
294+
String.format(
295+
"%s/%s.%s",
296+
UPLOAD_URL_DIR, fileName, request.getFileType().toString().toLowerCase()))
297+
.build();
298+
299+
URL signedUrl = null;
300+
try {
301+
signedUrl =
302+
storage.signUrl(
303+
blobInfo,
304+
UPLOAD_URL_VALIDITY_IN_MINUTES,
305+
TimeUnit.MINUTES,
306+
Storage.SignUrlOption.httpMethod(com.google.cloud.storage.HttpMethod.PUT));
307+
} catch (Exception e) {
308+
responseObserver.onError(
309+
Status.FAILED_PRECONDITION
310+
.withDescription(
311+
"Failed to create signed URL. Please check your Feast deployment config\n"
312+
+ e.getMessage())
313+
.asRuntimeException());
314+
}
315+
assert signedUrl != null;
316+
long expiryInEpochTime = -1;
317+
318+
// Retrieve the actual expiry timestamp from the created signed URL
319+
try {
320+
List<NameValuePair> params =
321+
URLEncodedUtils.parse(signedUrl.toURI(), Charset.forName("UTF-8"));
322+
for (NameValuePair param : params) {
323+
if (param.getName().equals("Expires")) {
324+
expiryInEpochTime = Long.parseLong(param.getValue());
325+
}
326+
}
327+
} catch (URISyntaxException e) {
328+
responseObserver.onError(
329+
Status.UNKNOWN
330+
.withDescription("Failed to parse signed upload URL\n" + e.getMessage())
331+
.asRuntimeException());
332+
}
333+
334+
GetUploadUrlResponse response =
335+
GetUploadUrlResponse.newBuilder()
336+
.setUrl(signedUrl.toString())
337+
.setPath(signedUrl.getPath().substring(1))
338+
.setHttpMethod(HttpMethod.PUT)
339+
.setExpiration(Timestamp.newBuilder().setSeconds(expiryInEpochTime))
340+
.build();
341+
342+
responseObserver.onNext(response);
343+
responseObserver.onCompleted();
344+
}
345+
346+
/**
347+
* Get Google Cloud Storage (GCS) bucket name from job workspace value
348+
*
349+
* @param workspace job workspace in Feast
350+
* @return bucket name
351+
* @throws IllegalArgumentException if workspace is not a valid GCS URI e.g when workspace is set
352+
* to a local path
353+
*/
354+
static String getBucketNameFromWorkspace(String workspace) throws IllegalArgumentException {
355+
if (StringUtils.isEmpty(workspace)) {
356+
throw new IllegalArgumentException("Workspace cannot be empty");
357+
}
358+
359+
int start = workspace.indexOf("gs://");
360+
if (start < 0 || workspace.trim().length() <= 5) {
361+
throw new IllegalArgumentException(
362+
String.format(
363+
"Cannot get bucket from workspace '%s' because it does not start with gs://[bucket_name]",
364+
workspace));
365+
}
366+
367+
// Find the index where the "bucket name" string ends
368+
// start searching after the string "gs://" (length of 5)
369+
int end = workspace.indexOf("/", 5);
370+
if (end < 0) {
371+
return workspace.substring(5);
372+
} else {
373+
return workspace.substring(5, end);
374+
}
375+
}
376+
264377
private StatusRuntimeException getRuntimeException(Exception e) {
265378
return new StatusRuntimeException(
266379
Status.fromCode(Status.Code.INTERNAL).withDescription(e.getMessage()).withCause(e));

core/src/main/java/feast/core/service/JobManagementService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,4 +312,8 @@ private String createJobId(String namePrefix) {
312312
String dateSuffix = String.valueOf(Instant.now().toEpochMilli());
313313
return namePrefix.isEmpty() ? JOB_PREFIX_DEFAULT + dateSuffix : namePrefix + dateSuffix;
314314
}
315+
316+
public String getWorkspace() {
317+
return defaults.getWorkspace();
318+
}
315319
}

core/src/main/java/feast/core/storage/SchemaManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,13 @@ public SchemaManager(BigQueryViewTemplater viewTemplater, StorageSpecs storageSp
3939
this.storageSpecs = storageSpecs;
4040
if (storageSpecs.getServingStorageSpec() != null) {
4141
registerStorage(storageSpecs.getServingStorageSpec());
42+
} else {
43+
log.warn("No serving storage is available from storageSpecs, SchemaManager will skip serving store registration");
4244
}
4345
if (storageSpecs.getWarehouseStorageSpec() != null) {
4446
registerStorage(storageSpecs.getWarehouseStorageSpec());
47+
} else {
48+
log.warn("No warehouse storage is available from storageSpecs, SchemaManager will skip warehouse store registration");
4549
}
4650
}
4751

0 commit comments

Comments
 (0)