Skip to content

Commit bccd32d

Browse files
Add grpc.Channel.close
1 parent ca7ba4d commit bccd32d

6 files changed

Lines changed: 253 additions & 1 deletion

File tree

src/python/grpcio/grpc/__init__.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -813,7 +813,11 @@ def __call__(self,
813813

814814

815815
class Channel(six.with_metaclass(abc.ABCMeta)):
816-
"""Affords RPC invocation via generic methods on client-side."""
816+
"""Affords RPC invocation via generic methods on client-side.
817+
818+
Channel objects implement the Context Manager type, although they need not
819+
support being entered and exited multiple times.
820+
"""
817821

818822
@abc.abstractmethod
819823
def subscribe(self, callback, try_to_connect=False):
@@ -926,6 +930,17 @@ def stream_stream(self,
926930
"""
927931
raise NotImplementedError()
928932

933+
@abc.abstractmethod
934+
def close(self):
935+
"""Closes this Channel and releases all resources held by it.
936+
937+
Closing the Channel will immediately terminate all RPCs active with the
938+
Channel and it is not valid to invoke new RPCs with the Channel.
939+
940+
This method is idempotent.
941+
"""
942+
raise NotImplementedError()
943+
929944

930945
########################## Service-Side Context ##############################
931946

src/python/grpcio/grpc/_channel.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -909,5 +909,28 @@ def stream_stream(self,
909909
self._channel, _channel_managed_call_management(self._call_state),
910910
_common.encode(method), request_serializer, response_deserializer)
911911

912+
def _close(self):
913+
self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!')
914+
_moot(self._connectivity_state)
915+
916+
def __enter__(self):
917+
return self
918+
919+
def __exit__(self, exc_type, exc_val, exc_tb):
920+
self._close()
921+
return False
922+
923+
def close(self):
924+
self._close()
925+
912926
def __del__(self):
927+
# TODO(https://github.com/grpc/grpc/issues/12531): Several releases
928+
# after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
929+
# here (or more likely, call self._close() here). We don't do this today
930+
# because many valid use cases today allow the channel to be deleted
931+
# immediately after stubs are created. After a sufficient period of time
932+
# has passed for all users to be trusted to hang out to their channels
933+
# for as long as they are in use and to close them after using them,
934+
# then deletion of this grpc._channel.Channel instance can be made to
935+
# effect closure of the underlying cygrpc.Channel instance.
913936
_moot(self._connectivity_state)

src/python/grpcio/grpc/_interceptor.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,19 @@ def stream_stream(self,
334334
else:
335335
return thunk(method)
336336

337+
def _close(self):
338+
self._channel.close()
339+
340+
def __enter__(self):
341+
return self
342+
343+
def __exit__(self, exc_type, exc_val, exc_tb):
344+
self._close()
345+
return False
346+
347+
def close(self):
348+
self._channel.close()
349+
337350

338351
def intercept_channel(channel, *interceptors):
339352
for interceptor in reversed(list(interceptors)):

src/python/grpcio_testing/grpc_testing/_channel/_channel.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,21 @@ def stream_stream(self,
5656
response_deserializer=None):
5757
return _multi_callable.StreamStream(method, self._state)
5858

59+
def _close(self):
60+
# TODO(https://github.com/grpc/grpc/issues/12531): Decide what
61+
# action to take here, if any?
62+
pass
63+
64+
def __enter__(self):
65+
return self
66+
67+
def __exit__(self, exc_type, exc_val, exc_tb):
68+
self._close()
69+
return False
70+
71+
def close(self):
72+
self._close()
73+
5974
def take_unary_unary(self, method_descriptor):
6075
return _channel_rpc.unary_unary(self._state, method_descriptor)
6176

src/python/grpcio_tests/tests/tests.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
"unit._auth_test.AccessTokenAuthMetadataPluginTest",
2626
"unit._auth_test.GoogleCallCredentialsTest",
2727
"unit._channel_args_test.ChannelArgsTest",
28+
"unit._channel_close_test.ChannelCloseTest",
2829
"unit._channel_connectivity_test.ChannelConnectivityTest",
2930
"unit._channel_ready_future_test.ChannelReadyFutureTest",
3031
"unit._compression_test.CompressionTest",
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
# Copyright 2018 gRPC authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Tests server and client side compression."""
15+
16+
import threading
17+
import time
18+
import unittest
19+
20+
import grpc
21+
22+
from tests.unit import test_common
23+
from tests.unit.framework.common import test_constants
24+
25+
_BEAT = 0.5
26+
_SOME_TIME = 5
27+
_MORE_TIME = 10
28+
29+
30+
class _MethodHandler(grpc.RpcMethodHandler):
31+
32+
request_streaming = True
33+
response_streaming = True
34+
request_deserializer = None
35+
response_serializer = None
36+
37+
def stream_stream(self, request_iterator, servicer_context):
38+
for request in request_iterator:
39+
yield request * 2
40+
41+
42+
_METHOD_HANDLER = _MethodHandler()
43+
44+
45+
class _GenericHandler(grpc.GenericRpcHandler):
46+
47+
def service(self, handler_call_details):
48+
return _METHOD_HANDLER
49+
50+
51+
_GENERIC_HANDLER = _GenericHandler()
52+
53+
54+
class _Pipe(object):
55+
56+
def __init__(self, values):
57+
self._condition = threading.Condition()
58+
self._values = list(values)
59+
self._open = True
60+
61+
def __iter__(self):
62+
return self
63+
64+
def _next(self):
65+
with self._condition:
66+
while not self._values and self._open:
67+
self._condition.wait()
68+
if self._values:
69+
return self._values.pop(0)
70+
else:
71+
raise StopIteration()
72+
73+
def next(self):
74+
return self._next()
75+
76+
def __next__(self):
77+
return self._next()
78+
79+
def add(self, value):
80+
with self._condition:
81+
self._values.append(value)
82+
self._condition.notify()
83+
84+
def close(self):
85+
with self._condition:
86+
self._open = False
87+
self._condition.notify()
88+
89+
def __enter__(self):
90+
return self
91+
92+
def __exit__(self, type, value, traceback):
93+
self.close()
94+
95+
96+
class ChannelCloseTest(unittest.TestCase):
97+
98+
def setUp(self):
99+
self._server = test_common.test_server(
100+
max_workers=test_constants.THREAD_CONCURRENCY)
101+
self._server.add_generic_rpc_handlers((_GENERIC_HANDLER,))
102+
self._port = self._server.add_insecure_port('[::]:0')
103+
self._server.start()
104+
105+
def tearDown(self):
106+
self._server.stop(None)
107+
108+
def test_close_immediately_after_call_invocation(self):
109+
channel = grpc.insecure_channel('localhost:{}'.format(self._port))
110+
multi_callable = channel.stream_stream('Meffod')
111+
request_iterator = _Pipe(())
112+
response_iterator = multi_callable(request_iterator)
113+
channel.close()
114+
request_iterator.close()
115+
116+
self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
117+
118+
def test_close_while_call_active(self):
119+
channel = grpc.insecure_channel('localhost:{}'.format(self._port))
120+
multi_callable = channel.stream_stream('Meffod')
121+
request_iterator = _Pipe((b'abc',))
122+
response_iterator = multi_callable(request_iterator)
123+
next(response_iterator)
124+
channel.close()
125+
request_iterator.close()
126+
127+
self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
128+
129+
def test_context_manager_close_while_call_active(self):
130+
with grpc.insecure_channel('localhost:{}'.format(
131+
self._port)) as channel: # pylint: disable=bad-continuation
132+
multi_callable = channel.stream_stream('Meffod')
133+
request_iterator = _Pipe((b'abc',))
134+
response_iterator = multi_callable(request_iterator)
135+
next(response_iterator)
136+
request_iterator.close()
137+
138+
self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
139+
140+
def test_context_manager_close_while_many_calls_active(self):
141+
with grpc.insecure_channel('localhost:{}'.format(
142+
self._port)) as channel: # pylint: disable=bad-continuation
143+
multi_callable = channel.stream_stream('Meffod')
144+
request_iterators = tuple(
145+
_Pipe((b'abc',))
146+
for _ in range(test_constants.THREAD_CONCURRENCY))
147+
response_iterators = []
148+
for request_iterator in request_iterators:
149+
response_iterator = multi_callable(request_iterator)
150+
next(response_iterator)
151+
response_iterators.append(response_iterator)
152+
for request_iterator in request_iterators:
153+
request_iterator.close()
154+
155+
for response_iterator in response_iterators:
156+
self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
157+
158+
def test_many_concurrent_closes(self):
159+
channel = grpc.insecure_channel('localhost:{}'.format(self._port))
160+
multi_callable = channel.stream_stream('Meffod')
161+
request_iterator = _Pipe((b'abc',))
162+
response_iterator = multi_callable(request_iterator)
163+
next(response_iterator)
164+
start = time.time()
165+
end = start + _MORE_TIME
166+
167+
def sleep_some_time_then_close():
168+
time.sleep(_SOME_TIME)
169+
channel.close()
170+
171+
for _ in range(test_constants.THREAD_CONCURRENCY):
172+
close_thread = threading.Thread(target=sleep_some_time_then_close)
173+
close_thread.start()
174+
while True:
175+
request_iterator.add(b'def')
176+
time.sleep(_BEAT)
177+
if end < time.time():
178+
break
179+
request_iterator.close()
180+
181+
self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED)
182+
183+
184+
if __name__ == '__main__':
185+
unittest.main(verbosity=2)

0 commit comments

Comments
 (0)