Skip to content

Commit 35e23e9

Browse files
committed
Cleanup Test Resources
Signed-off-by: Elliot Scribner <elliot.scribner@couchbase.com>
1 parent 3b7e870 commit 35e23e9

File tree

1 file changed

+87
-26
lines changed
  • sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests

1 file changed

+87
-26
lines changed

sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py

Lines changed: 87 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
import atexit
12
import json
23
import os
4+
import signal
5+
import threading
36
import uuid
47
from datetime import timedelta
58
from typing import Dict, List, Optional
@@ -29,10 +32,31 @@
2932

3033

3134
class CouchbaseColumnarDataSourceCreator(DataSourceCreator):
32-
collections: List[str] = []
35+
_shutting_down = False
36+
_cluster = None
37+
_cluster_lock = threading.Lock()
38+
39+
@classmethod
40+
def get_cluster(cls):
41+
with cls._cluster_lock:
42+
if cls._cluster is None:
43+
cred = Credential.from_username_and_password(
44+
os.environ["COUCHBASE_COLUMNAR_USER"],
45+
os.environ["COUCHBASE_COLUMNAR_PASSWORD"]
46+
)
47+
timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120))
48+
cls._cluster = Cluster.create_instance(
49+
os.environ["COUCHBASE_COLUMNAR_CONNECTION_STRING"],
50+
cred,
51+
ClusterOptions(timeout_options=timeout_opts),
52+
)
53+
return cls._cluster
3354

34-
def __init__(self, project_name: str, *args, **kwargs):
55+
def __init__(self, project_name: str, **kwargs):
3556
super().__init__(project_name)
57+
self.project_name = project_name
58+
self.collections: List[str] = []
59+
3660
self.offline_store_config = CouchbaseColumnarOfflineStoreConfig(
3761
type="couchbase",
3862
connection_string=os.environ["COUCHBASE_COLUMNAR_CONNECTION_STRING"],
@@ -64,18 +88,8 @@ def format_row(row):
6488

6589
collection_name = self.get_prefixed_collection_name(destination_name)
6690

67-
cred = Credential.from_username_and_password(
68-
self.offline_store_config.user, self.offline_store_config.password
69-
)
70-
timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120))
71-
cluster = Cluster.create_instance(
72-
self.offline_store_config.connection_string,
73-
cred,
74-
ClusterOptions(timeout_options=timeout_opts),
75-
)
76-
7791
create_cluster_query = f"CREATE ANALYTICS COLLECTION {COUCHBASE_COLUMNAR_DATABASE}.{COUCHBASE_COLUMNAR_SCOPE}.{collection_name} IF NOT EXISTS PRIMARY KEY(pk: UUID) AUTOGENERATED;"
78-
cluster.execute_query(
92+
self.get_cluster().execute_query(
7993
create_cluster_query,
8094
QueryOptions(timeout=timedelta(seconds=self.offline_store_config.timeout)),
8195
)
@@ -88,7 +102,7 @@ def format_row(row):
88102
{values_clause}
89103
])
90104
"""
91-
cluster.execute_query(
105+
self.get_cluster().execute_query(
92106
insert_query,
93107
QueryOptions(timeout=timedelta(seconds=self.offline_store_config.timeout)),
94108
)
@@ -126,22 +140,52 @@ def create_offline_store_config(self) -> FeastConfigBaseModel:
126140
def get_prefixed_collection_name(self, suffix: str) -> str:
127141
return f"{self.project_name}_{suffix}"
128142

129-
def teardown(self):
130-
cred = Credential.from_username_and_password(
131-
self.offline_store_config.user, self.offline_store_config.password
132-
)
133-
134-
timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120))
135-
cluster = Cluster.create_instance(
136-
self.offline_store_config.connection_string,
137-
cred,
138-
ClusterOptions(timeout_options=timeout_opts),
139-
)
143+
@classmethod
144+
def get_dangling_collections(cls) -> List[str]:
145+
query = """
146+
SELECT VALUE d.DatabaseName || '.' || d.DataverseName || '.' || d.DatasetName
147+
FROM System.Metadata.`Dataset` d
148+
WHERE d.DataverseName <> "Metadata"
149+
AND (REGEXP_CONTAINS(d.DatasetName, "integration_test_.*")
150+
OR REGEXP_CONTAINS(d.DatasetName, "feast_entity_df_.*"));
151+
"""
152+
try:
153+
res = cls.get_cluster().execute_query(query)
154+
return res.get_all_rows()
155+
except Exception as e:
156+
print(f"Error fetching collections: {e}")
157+
return []
158+
159+
@classmethod
160+
def cleanup_all(cls):
161+
if cls._shutting_down:
162+
return
163+
cls._shutting_down = True
164+
try:
165+
collections = cls.get_dangling_collections()
166+
if len(collections) == 0:
167+
print("No collections to clean up.")
168+
return
169+
170+
print(f"Found {len(collections)} collections to clean up.")
171+
if len(collections) > 5:
172+
print("This may take a few minutes...")
173+
for collection in collections:
174+
try:
175+
query = f"DROP COLLECTION {collection} IF EXISTS;"
176+
cls.get_cluster().execute_query(query)
177+
print(f"Dropped collection: {collection}")
178+
except Exception as e:
179+
print(f"Error dropping collection {collection}: {e}")
180+
finally:
181+
print("Cleanup complete.")
182+
cls._shutting_down = False
140183

184+
def teardown(self):
141185
for collection in self.collections:
142186
query = f"DROP COLLECTION {COUCHBASE_COLUMNAR_DATABASE}.{COUCHBASE_COLUMNAR_SCOPE}.`{collection}` IF EXISTS;"
143187
try:
144-
cluster.execute_query(
188+
self.get_cluster().execute_query(
145189
query,
146190
QueryOptions(
147191
timeout=timedelta(seconds=self.offline_store_config.timeout)
@@ -150,3 +194,20 @@ def teardown(self):
150194
print(f"Successfully dropped collection: {collection}")
151195
except Exception as e:
152196
print(f"Error dropping collection {collection}: {e}")
197+
198+
199+
def cleanup_handler(signum, frame):
200+
print("\nCleaning up dangling resources...")
201+
try:
202+
CouchbaseColumnarDataSourceCreator.cleanup_all()
203+
except Exception as e:
204+
print(f"Error during cleanup: {e}")
205+
finally:
206+
# Re-raise the signal to properly exit
207+
signal.default_int_handler(signum, frame)
208+
209+
210+
# Register both SIGINT and SIGTERM handlers
211+
signal.signal(signal.SIGINT, cleanup_handler)
212+
signal.signal(signal.SIGTERM, cleanup_handler)
213+
atexit.register(CouchbaseColumnarDataSourceCreator.cleanup_all)

0 commit comments

Comments
 (0)