|
1 | | -import hashlib |
2 | 1 | import logging |
3 | 2 | import os |
4 | 3 | import random |
5 | 4 | import string |
6 | 5 | import time |
7 | | -from typing import IO, Any, Dict, List, NamedTuple, Optional, Tuple |
| 6 | +from typing import Any, Dict, List, NamedTuple, Optional |
| 7 | +from urllib.parse import urlparse, urlunparse |
8 | 8 |
|
9 | | -import boto3 |
10 | | -import botocore |
11 | 9 | import yaml |
12 | 10 |
|
13 | 11 | __all__ = [ |
|
27 | 25 | "_list_jobs", |
28 | 26 | "_load_new_cluster_template", |
29 | 27 | "_random_string", |
30 | | - "_s3_upload", |
31 | 28 | "_stream_ingestion_step", |
32 | 29 | "_sync_offline_to_online_step", |
33 | 30 | "_upload_jar", |
34 | 31 | "_wait_for_job_state", |
35 | 32 | ] |
| 33 | +from feast.staging.storage_client import get_staging_client |
36 | 34 |
|
37 | 35 | log = logging.getLogger("aws") |
38 | 36 |
|
@@ -77,82 +75,13 @@ def _random_string(length) -> str: |
77 | 75 | return "".join(random.choice(string.ascii_lowercase) for _ in range(length)) |
78 | 76 |
|
79 | 77 |
|
80 | | -def _s3_split_path(path: str) -> Tuple[str, str]: |
81 | | - """ Convert s3:// url to (bucket, key) """ |
82 | | - assert path.startswith("s3://") |
83 | | - _, _, bucket, key = path.split("/", 3) |
84 | | - return bucket, key |
85 | | - |
86 | | - |
87 | | -def _hash_fileobj(fileobj: IO[bytes]) -> str: |
88 | | - """ Compute sha256 hash of a file. File pointer will be reset to 0 on return. """ |
89 | | - fileobj.seek(0) |
90 | | - h = hashlib.sha256() |
91 | | - for block in iter(lambda: fileobj.read(2 ** 20), b""): |
92 | | - h.update(block) |
93 | | - fileobj.seek(0) |
94 | | - return h.hexdigest() |
95 | | - |
96 | | - |
97 | | -def _s3_upload( |
98 | | - fileobj: IO[bytes], |
99 | | - local_path: str, |
100 | | - *, |
101 | | - remote_path: Optional[str] = None, |
102 | | - remote_path_prefix: Optional[str] = None, |
103 | | - remote_path_suffix: Optional[str] = None, |
104 | | -) -> str: |
105 | | - """ |
106 | | - Upload a local file to S3. We store the file sha256 sum in S3 metadata and skip the upload |
107 | | - if the file hasn't changed. |
108 | | -
|
109 | | - You can either specify remote_path or remote_path_prefix+remote_path_suffix. In the latter case, |
110 | | - the remote path will be computed as $remote_path_prefix/$sha256$remote_path_suffix |
111 | | - """ |
112 | | - |
113 | | - assert (remote_path is not None) or ( |
114 | | - remote_path_prefix is not None and remote_path_suffix is not None |
115 | | - ) |
116 | | - |
117 | | - sha256sum = _hash_fileobj(fileobj) |
118 | | - |
119 | | - if remote_path is None: |
120 | | - assert remote_path_prefix is not None |
121 | | - remote_path = os.path.join( |
122 | | - remote_path_prefix, f"{sha256sum}{remote_path_suffix}" |
123 | | - ) |
124 | | - |
125 | | - bucket, key = _s3_split_path(remote_path) |
126 | | - client = boto3.client("s3") |
127 | | - |
128 | | - try: |
129 | | - head_response = client.head_object(Bucket=bucket, Key=key) |
130 | | - if head_response["Metadata"]["sha256sum"] == sha256sum: |
131 | | - # File already exists |
132 | | - return remote_path |
133 | | - else: |
134 | | - log.info("Uploading {local_path} to {remote_path}") |
135 | | - client.upload_fileobj( |
136 | | - fileobj, bucket, key, ExtraArgs={"Metadata": {"sha256sum": sha256sum}}, |
137 | | - ) |
138 | | - return remote_path |
139 | | - except botocore.exceptions.ClientError as e: |
140 | | - if e.response["Error"]["Code"] == "404": |
141 | | - log.info("Uploading {local_path} to {remote_path}") |
142 | | - client.upload_fileobj( |
143 | | - fileobj, bucket, key, ExtraArgs={"Metadata": {"sha256sum": sha256sum}}, |
144 | | - ) |
145 | | - return remote_path |
146 | | - else: |
147 | | - raise |
148 | | - |
149 | | - |
150 | 78 | def _upload_jar(jar_s3_prefix: str, local_path: str) -> str: |
151 | 79 | with open(local_path, "rb") as f: |
152 | | - return _s3_upload( |
153 | | - f, |
154 | | - local_path, |
155 | | - remote_path=os.path.join(jar_s3_prefix, os.path.basename(local_path)), |
| 80 | + uri = urlparse(os.path.join(jar_s3_prefix, os.path.basename(local_path))) |
| 81 | + return urlunparse( |
| 82 | + get_staging_client(uri.scheme).upload_fileobj( |
| 83 | + f, local_path, remote_uri=uri, |
| 84 | + ) |
156 | 85 | ) |
157 | 86 |
|
158 | 87 |
|
|
0 commit comments