From d5818c18dc3e64427715885b3923625a28b1623b Mon Sep 17 00:00:00 2001 From: Justin Giorgi Date: Tue, 1 Mar 2016 16:03:34 -0800 Subject: [PATCH 1/2] Added thread safety to Zeroconf.close() Zeroconf.close() immediately closed sockets which child threads may still need to finish their operations. This was causing EBADF (bad file descriptor) errors in some cases. Zeroconf.close() now blocks until all known child threads have stopped then closes the sockets safely. If a child thread was created that Zeroconf is not aware of (ie a ServiceBrowser object manually created but not registered in zeroconf.browsers) the EBADF errors may still occur. Also some of the unit tests were not in a unittest.TestCase subclass so they were not running. Added test cases to Misc TestCase and added default execution of unittest.main(). --- test_zeroconf.py | 124 +++++++++++++++++++++++++++-------------------- zeroconf.py | 26 +++++++++- 2 files changed, 96 insertions(+), 54 deletions(-) diff --git a/test_zeroconf.py b/test_zeroconf.py index 6ecf6728a..7d06d7546 100644 --- a/test_zeroconf.py +++ b/test_zeroconf.py @@ -151,56 +151,74 @@ def test_launch_and_close(self): rv.close() -def test_integration(): - service_added = Event() - service_removed = Event() - - type_ = "_http._tcp.local." - registration_name = "xxxyyy.%s" % type_ - - def on_service_state_change(zeroconf, service_type, state_change, name): - if name == registration_name: - if state_change is ServiceStateChange.Added: - service_added.set() - elif state_change is ServiceStateChange.Removed: - service_removed.set() - - zeroconf_browser = Zeroconf() - browser = ServiceBrowser(zeroconf_browser, type_, [on_service_state_change]) - - zeroconf_registrar = Zeroconf() - desc = {'path': '/~paulsm/'} - info = ServiceInfo( - type_, registration_name, - socket.inet_aton("10.0.1.2"), 80, 0, 0, - desc, "ash-2.local.") - zeroconf_registrar.register_service(info) - - try: - service_added.wait(1) - assert service_added.is_set() - zeroconf_registrar.unregister_service(info) - service_removed.wait(1) - assert service_removed.is_set() - finally: - zeroconf_registrar.close() - browser.cancel() - zeroconf_browser.close() - - -def test_listener_handles_closed_socket_situation_gracefully(): - error = socket.error(socket.EBADF) - error.errno = socket.EBADF - - zeroconf = Mock() - zeroconf.socket.recvfrom.side_effect = error - - listener = Listener(zeroconf) - listener.handle_read(zeroconf.socket) - - -def test_dnstext_repr_works(): - # There was an issue on Python 3 that prevented DNSText's repr - # from working when the text was longer than 10 bytes - text = DNSText('irrelevant', None, 0, 0, b'12345678901') - repr(text) +class Misc(unittest.TestCase): + def test_integration(self): + service_added = Event() + service_removed = Event() + + type_ = "_http._tcp.local." + registration_name = "xxxyyy.%s" % type_ + + def on_service_state_change(zeroconf, service_type, state_change, name): + if name == registration_name: + if state_change is ServiceStateChange.Added: + service_added.set() + elif state_change is ServiceStateChange.Removed: + service_removed.set() + + zeroconf_browser = Zeroconf() + browser = ServiceBrowser(zeroconf_browser, type_, [on_service_state_change]) + + zeroconf_registrar = Zeroconf() + desc = {'path': '/~paulsm/'} + info = ServiceInfo( + type_, registration_name, + socket.inet_aton("10.0.1.2"), 80, 0, 0, + desc, "ash-2.local.") + zeroconf_registrar.register_service(info) + + try: + service_added.wait(1) + assert service_added.is_set() + zeroconf_registrar.unregister_service(info) + service_removed.wait(1) + assert service_removed.is_set() + finally: + zeroconf_registrar.close() + browser.cancel() + zeroconf_browser.close() + + + def test_listener_handles_closed_socket_situation_gracefully(self): + error = socket.error(socket.EBADF) + error.errno = socket.EBADF + + zeroconf = Mock() + zeroconf.socket.recvfrom.side_effect = error + + listener = Listener(zeroconf) + listener.handle_read(zeroconf.socket) + + + def test_dnstext_repr_works(self): + # There was an issue on Python 3 that prevented DNSText's repr + # from working when the text was longer than 10 bytes + text = DNSText('irrelevant', None, 0, 0, b'12345678901') + repr(text) + + + def test_close_waits_for_threads(self): + class Dummy(object): + def add_service(self, zeroconf_obj, service_type, name): + pass + def remove_service(self, zeroconf_obj, service_type, name): + pass + + z = Zeroconf() + z.add_service_listener('_privet._tcp.local.', listener=Dummy()) + z.close() + assert not z.browsers[0].is_alive() + + +if __name__ == '__main__': + unittest.main() diff --git a/zeroconf.py b/zeroconf.py index b5dfb956e..4392d4496 100644 --- a/zeroconf.py +++ b/zeroconf.py @@ -1671,14 +1671,38 @@ def send(self, out, addr=_MDNS_ADDR, port=_MDNS_PORT): 'Should not happen, sent %d out of %d bytes' % ( bytes_sent, len(packet))) + def _check_threads(self): + """Check if any threads are still active. + + Returns: + True if the Reaper or Engine started by this object is still alive or + if any ServiceBrowser known to this object is still alive. Otherwise + False. + """ + threads = [self.reaper, self.engine] + self.browsers + for t in threads: + if isinstance(t, threading.Thread) and t.is_alive(): + return True + return False + def close(self): """Ends the background threads, and prevent this instance from - servicing further queries.""" + servicing further queries. + + This operation blocks until all threads exit. + """ global _GLOBAL_DONE if not _GLOBAL_DONE: _GLOBAL_DONE = True self.notify_all() self.engine.notify() self.unregister_all_services() + + # Wait for threads to actually die before destroying the sockets + threads_alive = self._check_threads() + while threads_alive: + time.sleep(0.01) + threads_alive = self._check_threads() + for s in [self._listen_socket] + self._respond_sockets: s.close() From c8ff62c76699c70d3194800042ac44d0e50fb1a9 Mon Sep 17 00:00:00 2001 From: Justin Giorgi Date: Thu, 10 Mar 2016 14:34:23 -0800 Subject: [PATCH 2/2] Removed unnecessary changes to tests. --- test_zeroconf.py | 133 +++++++++++++++++++++++------------------------ 1 file changed, 66 insertions(+), 67 deletions(-) diff --git a/test_zeroconf.py b/test_zeroconf.py index 7d06d7546..ad1c00dce 100644 --- a/test_zeroconf.py +++ b/test_zeroconf.py @@ -151,73 +151,72 @@ def test_launch_and_close(self): rv.close() -class Misc(unittest.TestCase): - def test_integration(self): - service_added = Event() - service_removed = Event() - - type_ = "_http._tcp.local." - registration_name = "xxxyyy.%s" % type_ - - def on_service_state_change(zeroconf, service_type, state_change, name): - if name == registration_name: - if state_change is ServiceStateChange.Added: - service_added.set() - elif state_change is ServiceStateChange.Removed: - service_removed.set() - - zeroconf_browser = Zeroconf() - browser = ServiceBrowser(zeroconf_browser, type_, [on_service_state_change]) - - zeroconf_registrar = Zeroconf() - desc = {'path': '/~paulsm/'} - info = ServiceInfo( - type_, registration_name, - socket.inet_aton("10.0.1.2"), 80, 0, 0, - desc, "ash-2.local.") - zeroconf_registrar.register_service(info) - - try: - service_added.wait(1) - assert service_added.is_set() - zeroconf_registrar.unregister_service(info) - service_removed.wait(1) - assert service_removed.is_set() - finally: - zeroconf_registrar.close() - browser.cancel() - zeroconf_browser.close() - - - def test_listener_handles_closed_socket_situation_gracefully(self): - error = socket.error(socket.EBADF) - error.errno = socket.EBADF - - zeroconf = Mock() - zeroconf.socket.recvfrom.side_effect = error - - listener = Listener(zeroconf) - listener.handle_read(zeroconf.socket) - - - def test_dnstext_repr_works(self): - # There was an issue on Python 3 that prevented DNSText's repr - # from working when the text was longer than 10 bytes - text = DNSText('irrelevant', None, 0, 0, b'12345678901') - repr(text) - - - def test_close_waits_for_threads(self): - class Dummy(object): - def add_service(self, zeroconf_obj, service_type, name): - pass - def remove_service(self, zeroconf_obj, service_type, name): - pass - - z = Zeroconf() - z.add_service_listener('_privet._tcp.local.', listener=Dummy()) - z.close() - assert not z.browsers[0].is_alive() +def test_integration(): + service_added = Event() + service_removed = Event() + + type_ = "_http._tcp.local." + registration_name = "xxxyyy.%s" % type_ + + def on_service_state_change(zeroconf, service_type, state_change, name): + if name == registration_name: + if state_change is ServiceStateChange.Added: + service_added.set() + elif state_change is ServiceStateChange.Removed: + service_removed.set() + + zeroconf_browser = Zeroconf() + browser = ServiceBrowser(zeroconf_browser, type_, [on_service_state_change]) + + zeroconf_registrar = Zeroconf() + desc = {'path': '/~paulsm/'} + info = ServiceInfo( + type_, registration_name, + socket.inet_aton("10.0.1.2"), 80, 0, 0, + desc, "ash-2.local.") + zeroconf_registrar.register_service(info) + + try: + service_added.wait(1) + assert service_added.is_set() + zeroconf_registrar.unregister_service(info) + service_removed.wait(1) + assert service_removed.is_set() + finally: + zeroconf_registrar.close() + browser.cancel() + zeroconf_browser.close() + + +def test_listener_handles_closed_socket_situation_gracefully(): + error = socket.error(socket.EBADF) + error.errno = socket.EBADF + + zeroconf = Mock() + zeroconf.socket.recvfrom.side_effect = error + + listener = Listener(zeroconf) + listener.handle_read(zeroconf.socket) + + +def test_dnstext_repr_works(): + # There was an issue on Python 3 that prevented DNSText's repr + # from working when the text was longer than 10 bytes + text = DNSText('irrelevant', None, 0, 0, b'12345678901') + repr(text) + + +def test_close_waits_for_threads(): + class Dummy(object): + def add_service(self, zeroconf_obj, service_type, name): + pass + def remove_service(self, zeroconf_obj, service_type, name): + pass + + z = Zeroconf() + z.add_service_listener('_privet._tcp.local.', listener=Dummy()) + z.close() + assert not z.browsers[0].is_alive() if __name__ == '__main__':