1919 cast ,
2020 Any ,
2121 AsyncIterable ,
22+ Callable ,
2223 Optional ,
2324 Set ,
2425 Sequence ,
99100 )
100101 from google .cloud .bigtable .data ._async .mutations_batcher import _MB_SIZE
101102 from google .cloud .bigtable .data ._async ._swappable_channel import (
102- AsyncSwappableChannel ,
103+ AsyncSwappableChannel as SwappableChannelType ,
104+ )
105+ from google .cloud .bigtable .data ._async .metrics_interceptor import (
106+ AsyncBigtableMetricsInterceptor as MetricsInterceptorType ,
103107 )
104108else :
105109 from typing import Iterable # noqa: F401
106110 from grpc import insecure_channel
111+ from grpc import intercept_channel
107112 from google .cloud .bigtable_v2 .services .bigtable .transports import BigtableGrpcTransport as TransportType # type: ignore
108113 from google .cloud .bigtable_v2 .services .bigtable import BigtableClient as GapicClient # type: ignore
109114 from google .cloud .bigtable .data ._sync_autogen .mutations_batcher import _MB_SIZE
110115 from google .cloud .bigtable .data ._sync_autogen ._swappable_channel import ( # noqa: F401
111- SwappableChannel ,
116+ SwappableChannel as SwappableChannelType ,
117+ )
118+ from google .cloud .bigtable .data ._sync_autogen .metrics_interceptor import ( # noqa: F401
119+ BigtableMetricsInterceptor as MetricsInterceptorType ,
112120 )
113-
114121
115122if TYPE_CHECKING :
116123 from google .cloud .bigtable .data ._helpers import RowKeySamples
@@ -205,7 +212,7 @@ def __init__(
205212 credentials = google .auth .credentials .AnonymousCredentials ()
206213 if project is None :
207214 project = _DEFAULT_BIGTABLE_EMULATOR_CLIENT
208-
215+ self . _metrics_interceptor = MetricsInterceptorType ()
209216 # initialize client
210217 ClientWithProject .__init__ (
211218 self ,
@@ -259,12 +266,11 @@ def __init__(
259266 stacklevel = 2 ,
260267 )
261268
262- @CrossSync .convert (replace_symbols = {"AsyncSwappableChannel" : "SwappableChannel" })
263- def _build_grpc_channel (self , * args , ** kwargs ) -> AsyncSwappableChannel :
269+ def _build_grpc_channel (self , * args , ** kwargs ) -> SwappableChannelType :
264270 """
265271 This method is called by the gapic transport to create a grpc channel.
266272
267- The init arguments passed down are captured in a partial used by AsyncSwappableChannel
273+ The init arguments passed down are captured in a partial used by SwappableChannel
268274 to create new channel instances in the future, as part of the channel refresh logic
269275
270276 Emulators always use an inseucre channel
@@ -275,12 +281,30 @@ def _build_grpc_channel(self, *args, **kwargs) -> AsyncSwappableChannel:
275281 Returns:
276282 a custom wrapped swappable channel
277283 """
284+ create_channel_fn : Callable [[], Channel ]
278285 if self ._emulator_host is not None :
279- # emulators use insecure channel
286+ # Emulators use insecure channels
280287 create_channel_fn = partial (insecure_channel , self ._emulator_host )
281- else :
288+ elif CrossSync .is_async :
289+ # For async client, use the default create_channel.
282290 create_channel_fn = partial (TransportType .create_channel , * args , ** kwargs )
283- return AsyncSwappableChannel (create_channel_fn )
291+ else :
292+ # For sync client, wrap create_channel with interceptors.
293+ def sync_create_channel_fn ():
294+ return intercept_channel (
295+ TransportType .create_channel (* args , ** kwargs ),
296+ self ._metrics_interceptor ,
297+ )
298+
299+ create_channel_fn = sync_create_channel_fn
300+
301+ # Instantiate SwappableChannelType with the determined creation function.
302+ new_channel = SwappableChannelType (create_channel_fn )
303+ if CrossSync .is_async :
304+ # Attach async interceptors to the channel instance itself.
305+ new_channel ._unary_unary_interceptors .append (self ._metrics_interceptor )
306+ new_channel ._unary_stream_interceptors .append (self ._metrics_interceptor )
307+ return new_channel
284308
285309 @property
286310 def universe_domain (self ) -> str :
@@ -402,7 +426,7 @@ def _invalidate_channel_stubs(self):
402426 self .transport ._stubs = {}
403427 self .transport ._prep_wrapped_messages (self .client_info )
404428
405- @CrossSync .convert ( replace_symbols = { "AsyncSwappableChannel" : "SwappableChannel" })
429+ @CrossSync .convert
406430 async def _manage_channel (
407431 self ,
408432 refresh_interval_min : float = 60 * 35 ,
@@ -427,10 +451,10 @@ async def _manage_channel(
427451 grace_period: time to allow previous channel to serve existing
428452 requests before closing, in seconds
429453 """
430- if not isinstance (self .transport .grpc_channel , AsyncSwappableChannel ):
454+ if not isinstance (self .transport .grpc_channel , SwappableChannelType ):
431455 warnings .warn ("Channel does not support auto-refresh." )
432456 return
433- super_channel : AsyncSwappableChannel = self .transport .grpc_channel
457+ super_channel : SwappableChannelType = self .transport .grpc_channel
434458 first_refresh = self ._channel_init_time + random .uniform (
435459 refresh_interval_min , refresh_interval_max
436460 )
0 commit comments