|
12 | 12 | # See the License for the specific language governing permissions and |
13 | 13 | # limitations under the License. |
14 | 14 | import itertools |
| 15 | +import logging |
15 | 16 | from datetime import datetime |
16 | 17 | from multiprocessing.pool import ThreadPool |
| 18 | +from queue import Queue |
| 19 | +from threading import Lock, Thread |
17 | 20 | from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple |
18 | 21 |
|
19 | 22 | from pydantic import PositiveInt, StrictStr |
|
33 | 36 | from feast.repo_config import FeastConfigBaseModel, RepoConfig |
34 | 37 | from feast.usage import log_exceptions_and_usage, tracing_span |
35 | 38 |
|
| 39 | +LOGGER = logging.getLogger(__name__) |
| 40 | + |
36 | 41 | try: |
37 | 42 | from google.auth.exceptions import DefaultCredentialsError |
38 | 43 | from google.cloud import datastore |
@@ -262,15 +267,46 @@ def online_read( |
262 | 267 | def _delete_all_values(client, key): |
263 | 268 | """ |
264 | 269 | Delete all data under the key path in datastore. |
| 270 | +
|
| 271 | + Creates and uses a queue of lists of entity keys, which are batch deleted |
| 272 | + by multiple threads. |
265 | 273 | """ |
| 274 | + |
| 275 | + class AtomicCounter(object): |
| 276 | + # for tracking how many deletions have already occurred; not used outside this method |
| 277 | + def __init__(self): |
| 278 | + self.value = 0 |
| 279 | + self.lock = Lock() |
| 280 | + |
| 281 | + def increment(self): |
| 282 | + with self.lock: |
| 283 | + self.value += 1 |
| 284 | + |
| 285 | + BATCH_SIZE = 500 # Dec 2021: delete_multi has a max size of 500: https://cloud.google.com/datastore/docs/concepts/limits |
| 286 | + NUM_THREADS = 3 |
| 287 | + deletion_queue = Queue() |
| 288 | + status_info_counter = AtomicCounter() |
| 289 | + |
| 290 | + def worker(shared_counter): |
| 291 | + while True: |
| 292 | + client.delete_multi(deletion_queue.get()) |
| 293 | + shared_counter.increment() |
| 294 | + LOGGER.debug( |
| 295 | + f"batch deletions completed: {shared_counter.value} ({shared_counter.value * BATCH_SIZE} total entries) & outstanding queue size: {deletion_queue.qsize()}" |
| 296 | + ) |
| 297 | + deletion_queue.task_done() |
| 298 | + |
| 299 | + for _ in range(NUM_THREADS): |
| 300 | + Thread(target=worker, args=(status_info_counter,), daemon=True).start() |
| 301 | + |
| 302 | + query = client.query(kind="Row", ancestor=key) |
266 | 303 | while True: |
267 | | - query = client.query(kind="Row", ancestor=key) |
268 | | - entities = list(query.fetch(limit=1000)) |
| 304 | + entities = list(query.fetch(limit=BATCH_SIZE)) |
269 | 305 | if not entities: |
270 | | - return |
| 306 | + break |
| 307 | + deletion_queue.put([entity.key for entity in entities]) |
271 | 308 |
|
272 | | - for entity in entities: |
273 | | - client.delete(entity.key) |
| 309 | + deletion_queue.join() |
274 | 310 |
|
275 | 311 |
|
276 | 312 | def _initialize_client( |
|
0 commit comments