Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Cleanup Steps for Test Resources
Signed-off-by: Elliot Scribner <elliot.scribner@couchbase.com>
  • Loading branch information
ejscribner committed Feb 27, 2025
commit 37791a87bc0c0b91af1a79edb9dc7d720027cd0a
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import atexit
import json
import os
import signal
import threading
import uuid
from datetime import timedelta
from typing import Dict, List, Optional
Expand Down Expand Up @@ -29,10 +32,31 @@


class CouchbaseColumnarDataSourceCreator(DataSourceCreator):
collections: List[str] = []
_shutting_down = False
_cluster = None
_cluster_lock = threading.Lock()

@classmethod
def get_cluster(cls):
with cls._cluster_lock:
if cls._cluster is None:
cred = Credential.from_username_and_password(
os.environ["COUCHBASE_COLUMNAR_USER"],
os.environ["COUCHBASE_COLUMNAR_PASSWORD"],
)
timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120))
cls._cluster = Cluster.create_instance(
os.environ["COUCHBASE_COLUMNAR_CONNECTION_STRING"],
cred,
ClusterOptions(timeout_options=timeout_opts),
)
return cls._cluster

def __init__(self, project_name: str, *args, **kwargs):
def __init__(self, project_name: str, **kwargs):
super().__init__(project_name)
self.project_name = project_name
self.collections: List[str] = []

self.offline_store_config = CouchbaseColumnarOfflineStoreConfig(
type="couchbase",
connection_string=os.environ["COUCHBASE_COLUMNAR_CONNECTION_STRING"],
Expand Down Expand Up @@ -64,18 +88,8 @@ def format_row(row):

collection_name = self.get_prefixed_collection_name(destination_name)

cred = Credential.from_username_and_password(
self.offline_store_config.user, self.offline_store_config.password
)
timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120))
cluster = Cluster.create_instance(
self.offline_store_config.connection_string,
cred,
ClusterOptions(timeout_options=timeout_opts),
)

create_cluster_query = f"CREATE ANALYTICS COLLECTION {COUCHBASE_COLUMNAR_DATABASE}.{COUCHBASE_COLUMNAR_SCOPE}.{collection_name} IF NOT EXISTS PRIMARY KEY(pk: UUID) AUTOGENERATED;"
cluster.execute_query(
self.get_cluster().execute_query(
create_cluster_query,
QueryOptions(timeout=timedelta(seconds=self.offline_store_config.timeout)),
)
Expand All @@ -88,7 +102,7 @@ def format_row(row):
{values_clause}
])
"""
cluster.execute_query(
self.get_cluster().execute_query(
insert_query,
QueryOptions(timeout=timedelta(seconds=self.offline_store_config.timeout)),
)
Expand Down Expand Up @@ -126,22 +140,52 @@ def create_offline_store_config(self) -> FeastConfigBaseModel:
def get_prefixed_collection_name(self, suffix: str) -> str:
return f"{self.project_name}_{suffix}"

def teardown(self):
cred = Credential.from_username_and_password(
self.offline_store_config.user, self.offline_store_config.password
)

timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120))
cluster = Cluster.create_instance(
self.offline_store_config.connection_string,
cred,
ClusterOptions(timeout_options=timeout_opts),
)
@classmethod
def get_dangling_collections(cls) -> List[str]:
query = """
SELECT VALUE d.DatabaseName || '.' || d.DataverseName || '.' || d.DatasetName
FROM System.Metadata.`Dataset` d
WHERE d.DataverseName <> "Metadata"
AND (REGEXP_CONTAINS(d.DatasetName, "integration_test_.*")
OR REGEXP_CONTAINS(d.DatasetName, "feast_entity_df_.*"));
"""
try:
res = cls.get_cluster().execute_query(query)
return res.get_all_rows()
except Exception as e:
print(f"Error fetching collections: {e}")
return []

@classmethod
def cleanup_all(cls):
if cls._shutting_down:
return
cls._shutting_down = True
try:
collections = cls.get_dangling_collections()
if len(collections) == 0:
print("No collections to clean up.")
return

print(f"Found {len(collections)} collections to clean up.")
if len(collections) > 5:
print("This may take a few minutes...")
for collection in collections:
try:
query = f"DROP COLLECTION {collection} IF EXISTS;"
cls.get_cluster().execute_query(query)
print(f"Dropped collection: {collection}")
except Exception as e:
print(f"Error dropping collection {collection}: {e}")
finally:
print("Cleanup complete.")
cls._shutting_down = False

def teardown(self):
for collection in self.collections:
query = f"DROP COLLECTION {COUCHBASE_COLUMNAR_DATABASE}.{COUCHBASE_COLUMNAR_SCOPE}.`{collection}` IF EXISTS;"
try:
cluster.execute_query(
self.get_cluster().execute_query(
query,
QueryOptions(
timeout=timedelta(seconds=self.offline_store_config.timeout)
Expand All @@ -150,3 +194,20 @@ def teardown(self):
print(f"Successfully dropped collection: {collection}")
except Exception as e:
print(f"Error dropping collection {collection}: {e}")


def cleanup_handler(signum, frame):
print("\nCleaning up dangling resources...")
try:
CouchbaseColumnarDataSourceCreator.cleanup_all()
except Exception as e:
print(f"Error during cleanup: {e}")
finally:
# Re-raise the signal to properly exit
signal.default_int_handler(signum, frame)


# Register both SIGINT and SIGTERM handlers
signal.signal(signal.SIGINT, cleanup_handler)
signal.signal(signal.SIGTERM, cleanup_handler)
atexit.register(CouchbaseColumnarDataSourceCreator.cleanup_all)