1717
1818package feast .core .grpc ;
1919
20+ import com .google .cloud .storage .BlobInfo ;
21+ import com .google .cloud .storage .Storage ;
22+ import com .google .cloud .storage .StorageOptions ;
2023import com .google .protobuf .Empty ;
24+ import com .google .protobuf .Timestamp ;
2125import com .timgroup .statsd .StatsDClient ;
2226import feast .core .CoreServiceGrpc .CoreServiceImplBase ;
23- import feast .core .CoreServiceProto .CoreServiceTypes .ApplyEntityResponse ;
24- import feast .core .CoreServiceProto .CoreServiceTypes .ApplyFeatureGroupResponse ;
25- import feast .core .CoreServiceProto .CoreServiceTypes .ApplyFeatureResponse ;
26- import feast .core .CoreServiceProto .CoreServiceTypes .GetEntitiesRequest ;
27- import feast .core .CoreServiceProto .CoreServiceTypes .GetEntitiesResponse ;
28- import feast .core .CoreServiceProto .CoreServiceTypes .GetFeaturesRequest ;
29- import feast .core .CoreServiceProto .CoreServiceTypes .GetFeaturesResponse ;
30- import feast .core .CoreServiceProto .CoreServiceTypes .ListEntitiesResponse ;
31- import feast .core .CoreServiceProto .CoreServiceTypes .ListFeaturesResponse ;
27+ import feast .core .CoreServiceProto .CoreServiceTypes .*;
28+ import feast .core .CoreServiceProto .CoreServiceTypes .GetUploadUrlResponse .HttpMethod ;
3229import feast .core .config .StorageConfig .StorageSpecs ;
3330import feast .core .exception .RegistrationException ;
3431import feast .core .exception .RetrievalException ;
3532import feast .core .model .EntityInfo ;
3633import feast .core .model .FeatureGroupInfo ;
3734import feast .core .model .FeatureInfo ;
35+ import feast .core .service .JobManagementService ;
3836import feast .core .service .SpecService ;
3937import feast .core .validators .SpecValidator ;
4038import feast .specs .EntitySpecProto .EntitySpec ;
4341import io .grpc .Status ;
4442import io .grpc .StatusRuntimeException ;
4543import io .grpc .stub .StreamObserver ;
46- import java .util .List ;
47- import java .util .stream .Collectors ;
4844import lombok .extern .slf4j .Slf4j ;
45+ import org .apache .commons .codec .digest .DigestUtils ;
46+ import org .apache .commons .lang3 .RandomUtils ;
47+ import org .apache .commons .lang3 .StringUtils ;
48+ import org .apache .http .NameValuePair ;
49+ import org .apache .http .client .utils .URLEncodedUtils ;
4950import org .lognet .springboot .grpc .GRpcService ;
5051import org .springframework .beans .factory .annotation .Autowired ;
5152
52- /**
53- * Implementation of the feast core GRPC service.
54- */
53+ import java .net .URISyntaxException ;
54+ import java .net .URL ;
55+ import java .nio .charset .Charset ;
56+ import java .util .List ;
57+ import java .util .concurrent .TimeUnit ;
58+ import java .util .stream .Collectors ;
59+
60+ /** Implementation of the feast core GRPC service. */
5561@ Slf4j
5662@ GRpcService
5763public class CoreServiceImpl extends CoreServiceImplBase {
5864
59- @ Autowired
60- private SpecService specService ;
65+ private static final String UPLOAD_URL_DIR = "uploads" ;
66+ private static final int UPLOAD_URL_VALIDITY_IN_MINUTES = 5 ;
67+ private Storage storage = StorageOptions .getDefaultInstance ().getService ();
6168
62- @ Autowired
63- private SpecValidator validator ;
69+ @ Autowired private SpecService specService ;
70+ @ Autowired private SpecValidator validator ;
71+ @ Autowired private StatsDClient statsDClient ;
72+ @ Autowired private JobManagementService jobManagementService ;
73+ @ Autowired private StorageSpecs storageSpecs ;
6474
65- @ Autowired
66- private StatsDClient statsDClient ;
75+ public static long getUploadUrlValidityInMinutes () {
76+ return UPLOAD_URL_VALIDITY_IN_MINUTES ;
77+ }
6778
68- @ Autowired
69- private StorageSpecs storageSpecs ;
79+ public void setStorage (Storage storage ) {
80+ this .storage = storage ;
81+ }
7082
7183 /**
7284 * Gets specs for all entities requested in the request. If the retrieval of any one of them
@@ -79,9 +91,7 @@ public void getEntities(
7991 statsDClient .increment ("get_entities_request_count" );
8092 try {
8193 List <EntitySpec > entitySpecs =
82- specService
83- .getEntities (request .getIdsList ())
84- .stream ()
94+ specService .getEntities (request .getIdsList ()).stream ()
8595 .map (EntityInfo ::getEntitySpec )
8696 .collect (Collectors .toList ());
8797 GetEntitiesResponse response =
@@ -99,18 +109,14 @@ public void getEntities(
99109 }
100110 }
101111
102- /**
103- * Gets specs for all entities registered in the registry.
104- */
112+ /** Gets specs for all entities registered in the registry. */
105113 @ Override
106114 public void listEntities (Empty request , StreamObserver <ListEntitiesResponse > responseObserver ) {
107115 long now = System .currentTimeMillis ();
108116 statsDClient .increment ("list_entities_request_count" );
109117 try {
110118 List <EntitySpec > entitySpecs =
111- specService
112- .listEntities ()
113- .stream ()
119+ specService .listEntities ().stream ()
114120 .map (EntityInfo ::getEntitySpec )
115121 .collect (Collectors .toList ());
116122 ListEntitiesResponse response =
@@ -139,9 +145,7 @@ public void getFeatures(
139145 statsDClient .increment ("get_features_request_count" );
140146 try {
141147 List <FeatureSpec > featureSpecs =
142- specService
143- .getFeatures (request .getIdsList ())
144- .stream ()
148+ specService .getFeatures (request .getIdsList ()).stream ()
145149 .map (FeatureInfo ::getFeatureSpec )
146150 .collect (Collectors .toList ());
147151 GetFeaturesResponse response =
@@ -159,18 +163,14 @@ public void getFeatures(
159163 }
160164 }
161165
162- /**
163- * Gets specs for all features registered in the registry. TODO: some kind of pagination
164- */
166+ /** Gets specs for all features registered in the registry. TODO: some kind of pagination */
165167 @ Override
166168 public void listFeatures (Empty request , StreamObserver <ListFeaturesResponse > responseObserver ) {
167169 long now = System .currentTimeMillis ();
168170 statsDClient .increment ("list_features_request_count" );
169171 try {
170172 List <FeatureSpec > featureSpecs =
171- specService
172- .listFeatures ()
173- .stream ()
173+ specService .listFeatures ().stream ()
174174 .map (FeatureInfo ::getFeatureSpec )
175175 .collect (Collectors .toList ());
176176 ListFeaturesResponse response =
@@ -261,6 +261,119 @@ public void applyEntity(
261261 }
262262 }
263263
264+ /**
265+ * Get a signed URL where a Feast client can upload a CSV or JSON file by making an HTTP PUT
266+ * request. The signed URL references a bucket and blob in Google Cloud Storage.
267+ *
268+ * @param request
269+ * @param responseObserver
270+ */
271+ @ Override
272+ public void getUploadUrl (
273+ GetUploadUrlRequest request , StreamObserver <GetUploadUrlResponse > responseObserver ) {
274+ String bucketName = null ;
275+
276+ try {
277+ bucketName = getBucketNameFromWorkspace (jobManagementService .getWorkspace ());
278+ } catch (IllegalArgumentException e ) {
279+ responseObserver .onError (
280+ Status .FAILED_PRECONDITION
281+ .withDescription ("Failed to get upload URL from workspace\n " + e .getMessage ())
282+ .asRuntimeException ());
283+ }
284+ assert StringUtils .isNotEmpty (bucketName );
285+
286+ // Generated file names are always unique
287+ String fileName =
288+ String .format (
289+ "%s-%s" , System .currentTimeMillis (), DigestUtils .sha1Hex (RandomUtils .nextBytes (8 )));
290+
291+ BlobInfo blobInfo =
292+ BlobInfo .newBuilder (
293+ bucketName ,
294+ String .format (
295+ "%s/%s.%s" ,
296+ UPLOAD_URL_DIR , fileName , request .getFileType ().toString ().toLowerCase ()))
297+ .build ();
298+
299+ URL signedUrl = null ;
300+ try {
301+ signedUrl =
302+ storage .signUrl (
303+ blobInfo ,
304+ UPLOAD_URL_VALIDITY_IN_MINUTES ,
305+ TimeUnit .MINUTES ,
306+ Storage .SignUrlOption .httpMethod (com .google .cloud .storage .HttpMethod .PUT ));
307+ } catch (Exception e ) {
308+ responseObserver .onError (
309+ Status .FAILED_PRECONDITION
310+ .withDescription (
311+ "Failed to create signed URL. Please check your Feast deployment config\n "
312+ + e .getMessage ())
313+ .asRuntimeException ());
314+ }
315+ assert signedUrl != null ;
316+ long expiryInEpochTime = -1 ;
317+
318+ // Retrieve the actual expiry timestamp from the created signed URL
319+ try {
320+ List <NameValuePair > params =
321+ URLEncodedUtils .parse (signedUrl .toURI (), Charset .forName ("UTF-8" ));
322+ for (NameValuePair param : params ) {
323+ if (param .getName ().equals ("Expires" )) {
324+ expiryInEpochTime = Long .parseLong (param .getValue ());
325+ }
326+ }
327+ } catch (URISyntaxException e ) {
328+ responseObserver .onError (
329+ Status .UNKNOWN
330+ .withDescription ("Failed to parse signed upload URL\n " + e .getMessage ())
331+ .asRuntimeException ());
332+ }
333+
334+ GetUploadUrlResponse response =
335+ GetUploadUrlResponse .newBuilder ()
336+ .setUrl (signedUrl .toString ())
337+ .setPath (signedUrl .getPath ().substring (1 ))
338+ .setHttpMethod (HttpMethod .PUT )
339+ .setExpiration (Timestamp .newBuilder ().setSeconds (expiryInEpochTime ))
340+ .build ();
341+
342+ responseObserver .onNext (response );
343+ responseObserver .onCompleted ();
344+ }
345+
346+ /**
347+ * Get Google Cloud Storage (GCS) bucket name from job workspace value
348+ *
349+ * @param workspace job workspace in Feast
350+ * @return bucket name
351+ * @throws IllegalArgumentException if workspace is not a valid GCS URI e.g when workspace is set
352+ * to a local path
353+ */
354+ static String getBucketNameFromWorkspace (String workspace ) throws IllegalArgumentException {
355+ if (StringUtils .isEmpty (workspace )) {
356+ throw new IllegalArgumentException ("Workspace cannot be empty" );
357+ }
358+
359+ int start = workspace .indexOf ("gs://" );
360+ if (start < 0 || workspace .trim ().length () <= 5 ) {
361+ throw new IllegalArgumentException (
362+ String .format (
363+ "Cannot get bucket from workspace '%s' because it does not start with gs://[bucket_name]" ,
364+ workspace ));
365+ }
366+
367+ // Find the index where the "bucket name" string ends
368+ // start searching after the string "gs://" (length of 5)
369+ int end = workspace .indexOf ("/" , 5 );
370+ if (end < 0 ) {
371+ return workspace .substring (5 );
372+ } else {
373+ return workspace .substring (5 , end );
374+ }
375+ }
376+
264377 private StatusRuntimeException getRuntimeException (Exception e ) {
265378 return new StatusRuntimeException (
266379 Status .fromCode (Status .Code .INTERNAL ).withDescription (e .getMessage ()).withCause (e ));
0 commit comments