Skip to content

Commit e8e4972

Browse files
authored
Speed up Datastore deletes by batch deletions with multithreading (feast-dev#2182)
* Speed up Datastore deletes by batch deletions with multithreading Signed-off-by: Pamela Toman <ptoman@paloaltonetworks.com> * Linted Signed-off-by: Pamela Toman <ptoman@paloaltonetworks.com> * Move AtomicCounter inside _delete_all_values Signed-off-by: Pamela Toman <ptoman@paloaltonetworks.com> * Add link to datastore limits Signed-off-by: Pamela Toman <ptoman@paloaltonetworks.com>
1 parent 4b16ca6 commit e8e4972

1 file changed

Lines changed: 41 additions & 5 deletions

File tree

sdk/python/feast/infra/online_stores/datastore.py

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
import itertools
15+
import logging
1516
from datetime import datetime
1617
from multiprocessing.pool import ThreadPool
18+
from queue import Queue
19+
from threading import Lock, Thread
1720
from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple
1821

1922
from pydantic import PositiveInt, StrictStr
@@ -33,6 +36,8 @@
3336
from feast.repo_config import FeastConfigBaseModel, RepoConfig
3437
from feast.usage import log_exceptions_and_usage, tracing_span
3538

39+
LOGGER = logging.getLogger(__name__)
40+
3641
try:
3742
from google.auth.exceptions import DefaultCredentialsError
3843
from google.cloud import datastore
@@ -262,15 +267,46 @@ def online_read(
262267
def _delete_all_values(client, key):
263268
"""
264269
Delete all data under the key path in datastore.
270+
271+
Creates and uses a queue of lists of entity keys, which are batch deleted
272+
by multiple threads.
265273
"""
274+
275+
class AtomicCounter(object):
276+
# for tracking how many deletions have already occurred; not used outside this method
277+
def __init__(self):
278+
self.value = 0
279+
self.lock = Lock()
280+
281+
def increment(self):
282+
with self.lock:
283+
self.value += 1
284+
285+
BATCH_SIZE = 500 # Dec 2021: delete_multi has a max size of 500: https://cloud.google.com/datastore/docs/concepts/limits
286+
NUM_THREADS = 3
287+
deletion_queue = Queue()
288+
status_info_counter = AtomicCounter()
289+
290+
def worker(shared_counter):
291+
while True:
292+
client.delete_multi(deletion_queue.get())
293+
shared_counter.increment()
294+
LOGGER.debug(
295+
f"batch deletions completed: {shared_counter.value} ({shared_counter.value * BATCH_SIZE} total entries) & outstanding queue size: {deletion_queue.qsize()}"
296+
)
297+
deletion_queue.task_done()
298+
299+
for _ in range(NUM_THREADS):
300+
Thread(target=worker, args=(status_info_counter,), daemon=True).start()
301+
302+
query = client.query(kind="Row", ancestor=key)
266303
while True:
267-
query = client.query(kind="Row", ancestor=key)
268-
entities = list(query.fetch(limit=1000))
304+
entities = list(query.fetch(limit=BATCH_SIZE))
269305
if not entities:
270-
return
306+
break
307+
deletion_queue.put([entity.key for entity in entities])
271308

272-
for entity in entities:
273-
client.delete(entity.key)
309+
deletion_queue.join()
274310

275311

276312
def _initialize_client(

0 commit comments

Comments
 (0)