1313# limitations under the License.
1414
1515import os
16- import re
1716import shutil
1817import tempfile
1918import uuid
2019from datetime import datetime
2120from typing import List , Optional , Tuple , Union
22- from urllib .parse import ParseResult , urlparse
21+ from urllib .parse import urlparse
2322
2423import pandas as pd
25- from google .cloud import storage
2624from pandavro import to_avro
2725
26+ from feast .staging .storage_client import get_staging_client
27+
2828
2929def export_source_to_staging_location (
3030 source : Union [pd .DataFrame , str ], staging_location_uri : str
@@ -44,12 +44,14 @@ def export_source_to_staging_location(
4444 * Pandas DataFrame
4545 * Local Avro file
4646 * GCS Avro file
47+ * S3 Avro file
4748
4849
4950 staging_location_uri (str):
5051 Remote staging location where DataFrame should be written.
5152 Examples:
5253 * gs://bucket/path/
54+ * s3://bucket/path/
5355 * file:///data/subfolder/
5456
5557 Returns:
@@ -66,52 +68,37 @@ def export_source_to_staging_location(
6668 uri_path = None # type: Optional[str]
6769 if uri .scheme == "file" :
6870 uri_path = uri .path
69-
7071 # Remote gs staging location provided by serving
7172 dir_path , file_name , source_path = export_dataframe_to_local (
7273 df = source , dir_path = uri_path
7374 )
74- elif urlparse (source ).scheme in ["" , "file" ]:
75- # Local file provided as a source
76- dir_path = ""
77- file_name = os .path .basename (source )
78- source_path = os .path .abspath (
79- os .path .join (urlparse (source ).netloc , urlparse (source ).path )
80- )
81- elif urlparse (source ).scheme == "gs" :
82- # Google Cloud Storage path provided
83- input_source_uri = urlparse (source )
84- if "*" in source :
85- # Wildcard path
86- return _get_files (
87- bucket = str (input_source_uri .hostname ), uri = input_source_uri
75+ elif isinstance (source , str ):
76+ source_uri = urlparse (source )
77+ if source_uri .scheme in ["" , "file" ]:
78+ # Local file provided as a source
79+ dir_path = ""
80+ file_name = os .path .basename (source )
81+ source_path = os .path .abspath (
82+ os .path .join (source_uri .netloc , source_uri .path )
8883 )
8984 else :
90- return [source ]
85+ # gs, s3 file provided as a source.
86+ return get_staging_client (source_uri .scheme ).list_files (
87+ bucket = source_uri .hostname , path = source_uri .path
88+ )
9189 else :
9290 raise Exception (
9391 f"Only string and DataFrame types are allowed as a "
9492 f"source, { type (source )} was provided."
9593 )
9694
9795 # Push data to required staging location
98- if uri .scheme == "gs" :
99- # Staging location is a Google Cloud Storage path
100- upload_file_to_gcs (
101- source_path , str (uri .hostname ), str (uri .path ).strip ("/" ) + "/" + file_name
102- )
103- elif uri .scheme == "file" :
104- # Staging location is a file path
105- # Used for end-to-end test
106- pass
107- else :
108- raise Exception (
109- f"Staging location { staging_location_uri } does not have a "
110- f"valid URI. Only gs:// and file:// uri scheme are supported."
111- )
96+ get_staging_client (uri .scheme ).upload_file (
97+ source_path , uri .hostname , str (uri .path ).strip ("/" ) + "/" + file_name ,
98+ )
11299
113100 # Clean up, remove local staging file
114- if isinstance (source , pd .DataFrame ) and len (str ( dir_path ) ) > 4 :
101+ if dir_path and isinstance (source , pd .DataFrame ) and len (dir_path ) > 4 :
115102 shutil .rmtree (dir_path )
116103
117104 return [staging_location_uri .rstrip ("/" ) + "/" + file_name ]
@@ -162,70 +149,6 @@ def export_dataframe_to_local(
162149 return dir_path , file_name , dest_path
163150
164151
165- def upload_file_to_gcs (local_path : str , bucket : str , remote_path : str ) -> None :
166- """
167- Upload a file from the local file system to Google Cloud Storage (GCS).
168-
169- Args:
170- local_path (str):
171- Local filesystem path of file to upload.
172-
173- bucket (str):
174- GCS bucket destination to upload to.
175-
176- remote_path (str):
177- Path within GCS bucket to upload file to, includes file name.
178-
179- Returns:
180- None:
181- None
182- """
183-
184- storage_client = storage .Client (project = None )
185- bucket_storage = storage_client .get_bucket (bucket )
186- blob = bucket_storage .blob (remote_path )
187- blob .upload_from_filename (local_path )
188-
189-
190- def _get_files (bucket : str , uri : ParseResult ) -> List [str ]:
191- """
192- List all available files within a Google storage bucket that matches a wild
193- card path.
194-
195- Args:
196- bucket (str):
197- Google Storage bucket to reference.
198-
199- uri (urllib.parse.ParseResult):
200- Wild card uri path containing the "*" character.
201- Example:
202- * gs://feast/staging_location/*
203- * gs://feast/staging_location/file_*.avro
204-
205- Returns:
206- List[str]:
207- List of all available files matching the wildcard path.
208- """
209-
210- storage_client = storage .Client (project = None )
211- bucket_storage = storage_client .get_bucket (bucket )
212- path = uri .path
213-
214- if "*" in path :
215- regex = re .compile (path .replace ("*" , ".*?" ).strip ("/" ))
216- blob_list = bucket_storage .list_blobs (
217- prefix = path .strip ("/" ).split ("*" )[0 ], delimiter = "/"
218- )
219- # File path should not be in path (file path must be longer than path)
220- return [
221- f"{ uri .scheme } ://{ uri .hostname } /{ file } "
222- for file in [x .name for x in blob_list ]
223- if re .match (regex , file ) and file not in path
224- ]
225- else :
226- raise Exception (f"{ path } is not a wildcard path" )
227-
228-
229152def _get_file_name () -> str :
230153 """
231154 Create a random file name.
0 commit comments