1+ import atexit
12import json
23import os
4+ import signal
5+ import threading
36import uuid
47from datetime import timedelta
58from typing import Dict , List , Optional
2932
3033
3134class CouchbaseColumnarDataSourceCreator (DataSourceCreator ):
32- collections : List [str ] = []
35+ _shutting_down = False
36+ _cluster = None
37+ _cluster_lock = threading .Lock ()
38+
39+ @classmethod
40+ def get_cluster (cls ):
41+ with cls ._cluster_lock :
42+ if cls ._cluster is None :
43+ cred = Credential .from_username_and_password (
44+ os .environ ["COUCHBASE_COLUMNAR_USER" ],
45+ os .environ ["COUCHBASE_COLUMNAR_PASSWORD" ]
46+ )
47+ timeout_opts = TimeoutOptions (dispatch_timeout = timedelta (seconds = 120 ))
48+ cls ._cluster = Cluster .create_instance (
49+ os .environ ["COUCHBASE_COLUMNAR_CONNECTION_STRING" ],
50+ cred ,
51+ ClusterOptions (timeout_options = timeout_opts ),
52+ )
53+ return cls ._cluster
3354
34- def __init__ (self , project_name : str , * args , * *kwargs ):
55+ def __init__ (self , project_name : str , ** kwargs ):
3556 super ().__init__ (project_name )
57+ self .project_name = project_name
58+ self .collections : List [str ] = []
59+
3660 self .offline_store_config = CouchbaseColumnarOfflineStoreConfig (
3761 type = "couchbase" ,
3862 connection_string = os .environ ["COUCHBASE_COLUMNAR_CONNECTION_STRING" ],
@@ -64,18 +88,8 @@ def format_row(row):
6488
6589 collection_name = self .get_prefixed_collection_name (destination_name )
6690
67- cred = Credential .from_username_and_password (
68- self .offline_store_config .user , self .offline_store_config .password
69- )
70- timeout_opts = TimeoutOptions (dispatch_timeout = timedelta (seconds = 120 ))
71- cluster = Cluster .create_instance (
72- self .offline_store_config .connection_string ,
73- cred ,
74- ClusterOptions (timeout_options = timeout_opts ),
75- )
76-
7791 create_cluster_query = f"CREATE ANALYTICS COLLECTION { COUCHBASE_COLUMNAR_DATABASE } .{ COUCHBASE_COLUMNAR_SCOPE } .{ collection_name } IF NOT EXISTS PRIMARY KEY(pk: UUID) AUTOGENERATED;"
78- cluster .execute_query (
92+ self . get_cluster () .execute_query (
7993 create_cluster_query ,
8094 QueryOptions (timeout = timedelta (seconds = self .offline_store_config .timeout )),
8195 )
@@ -88,7 +102,7 @@ def format_row(row):
88102 { values_clause }
89103 ])
90104 """
91- cluster .execute_query (
105+ self . get_cluster () .execute_query (
92106 insert_query ,
93107 QueryOptions (timeout = timedelta (seconds = self .offline_store_config .timeout )),
94108 )
@@ -126,22 +140,52 @@ def create_offline_store_config(self) -> FeastConfigBaseModel:
126140 def get_prefixed_collection_name (self , suffix : str ) -> str :
127141 return f"{ self .project_name } _{ suffix } "
128142
129- def teardown (self ):
130- cred = Credential .from_username_and_password (
131- self .offline_store_config .user , self .offline_store_config .password
132- )
133-
134- timeout_opts = TimeoutOptions (dispatch_timeout = timedelta (seconds = 120 ))
135- cluster = Cluster .create_instance (
136- self .offline_store_config .connection_string ,
137- cred ,
138- ClusterOptions (timeout_options = timeout_opts ),
139- )
143+ @classmethod
144+ def get_dangling_collections (cls ) -> List [str ]:
145+ query = """
146+ SELECT VALUE d.DatabaseName || '.' || d.DataverseName || '.' || d.DatasetName
147+ FROM System.Metadata.`Dataset` d
148+ WHERE d.DataverseName <> "Metadata"
149+ AND (REGEXP_CONTAINS(d.DatasetName, "integration_test_.*")
150+ OR REGEXP_CONTAINS(d.DatasetName, "feast_entity_df_.*"));
151+ """
152+ try :
153+ res = cls .get_cluster ().execute_query (query )
154+ return res .get_all_rows ()
155+ except Exception as e :
156+ print (f"Error fetching collections: { e } " )
157+ return []
158+
159+ @classmethod
160+ def cleanup_all (cls ):
161+ if cls ._shutting_down :
162+ return
163+ cls ._shutting_down = True
164+ try :
165+ collections = cls .get_dangling_collections ()
166+ if len (collections ) == 0 :
167+ print ("No collections to clean up." )
168+ return
169+
170+ print (f"Found { len (collections )} collections to clean up." )
171+ if len (collections ) > 5 :
172+ print ("This may take a few minutes..." )
173+ for collection in collections :
174+ try :
175+ query = f"DROP COLLECTION { collection } IF EXISTS;"
176+ cls .get_cluster ().execute_query (query )
177+ print (f"Dropped collection: { collection } " )
178+ except Exception as e :
179+ print (f"Error dropping collection { collection } : { e } " )
180+ finally :
181+ print ("Cleanup complete." )
182+ cls ._shutting_down = False
140183
184+ def teardown (self ):
141185 for collection in self .collections :
142186 query = f"DROP COLLECTION { COUCHBASE_COLUMNAR_DATABASE } .{ COUCHBASE_COLUMNAR_SCOPE } .`{ collection } ` IF EXISTS;"
143187 try :
144- cluster .execute_query (
188+ self . get_cluster () .execute_query (
145189 query ,
146190 QueryOptions (
147191 timeout = timedelta (seconds = self .offline_store_config .timeout )
@@ -150,3 +194,20 @@ def teardown(self):
150194 print (f"Successfully dropped collection: { collection } " )
151195 except Exception as e :
152196 print (f"Error dropping collection { collection } : { e } " )
197+
198+
199+ def cleanup_handler (signum , frame ):
200+ print ("\n Cleaning up dangling resources..." )
201+ try :
202+ CouchbaseColumnarDataSourceCreator .cleanup_all ()
203+ except Exception as e :
204+ print (f"Error during cleanup: { e } " )
205+ finally :
206+ # Re-raise the signal to properly exit
207+ signal .default_int_handler (signum , frame )
208+
209+
210+ # Register both SIGINT and SIGTERM handlers
211+ signal .signal (signal .SIGINT , cleanup_handler )
212+ signal .signal (signal .SIGTERM , cleanup_handler )
213+ atexit .register (CouchbaseColumnarDataSourceCreator .cleanup_all )
0 commit comments