From d8562fd3546d6cd27b1ba9e95105ea534649a43e Mon Sep 17 00:00:00 2001 From: Stephen Rauch Date: Thu, 17 Mar 2016 16:24:50 -0700 Subject: [PATCH 1/8] Fix ability for a cache lookup to match properly When querying for a service type, the response is processed. During the processing, an info lookup is performed. If the info is not found in the cache, then a query is sent. Trouble is that the info requested is present in the same packet that triggered the lookup, and a query is not necessary. But two problems caused the cache lookup to fail. 1) The info was not yet in the cache. The call back was fired before all answers in the packet were cached. 2) The test for a cache hit did not work, because the cache hit test uses a DNSEntry as the comparison object. But some of the objects in the cache are descendents of DNSEntry and have their own __eq__() defined which accesses fields only present on the descendent. Thus the test can NEVER work since the descendent's __eq__() will be used. Also continuing the theme of some other recent pull requests, add three _GLOBAL_DONE tests to avoid doing work after the attempted stop, and thus avoid generating (harmless, but annoying) exceptions during shutdown --- zeroconf.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/zeroconf.py b/zeroconf.py index 8c00d2b4d..172254391 100644 --- a/zeroconf.py +++ b/zeroconf.py @@ -768,7 +768,9 @@ def get(self, entry): matching entry.""" try: list_ = self.cache[entry.key] - return list_[list_.index(entry)] + for cached_entry in list_: + if entry.__eq__(cached_entry): + return cached_entry except (KeyError, ValueError): return None @@ -829,6 +831,8 @@ def run(self): else: try: rr, wr, er = select.select(rs, [], [], self.timeout) + if _GLOBAL_DONE: + break for socket_ in rr: try: self.readers[socket_].handle_read(socket_) @@ -865,7 +869,7 @@ class Listener(object): to cache information as it arrives. It requires registration with an Engine object in order to have - the read() method called when a socket is availble for reading.""" + the read() method called when a socket is available for reading.""" def __init__(self, zc): self.zc = zc @@ -1061,7 +1065,7 @@ def run(self): self.next_time = now + self.delay self.delay = min(20 * 1000, self.delay * 2) - if len(self._handlers_to_call) > 0: + if len(self._handlers_to_call) > 0 and not _GLOBAL_DONE: handler = self._handlers_to_call.pop(0) handler(self.zc) @@ -1595,6 +1599,7 @@ def handle_response(self, msg): else: self.cache.add(record) + for record in msg.answers: self.update_record(now, record) def handle_query(self, msg, addr, port): @@ -1667,6 +1672,8 @@ def send(self, out, addr=_MDNS_ADDR, port=_MDNS_PORT): packet = out.packet() log.debug('Sending %r as %r...', out, packet) for s in self._respond_sockets: + if _GLOBAL_DONE: + return bytes_sent = s.sendto(packet, 0, (addr, port)) if bytes_sent != len(packet): raise Error( From c49145c35de09b2631d8a2b4751d787a6b4dc904 Mon Sep 17 00:00:00 2001 From: Stephen Rauch Date: Mon, 21 Mar 2016 15:22:12 -0700 Subject: [PATCH 2/8] Remove unnecessary packet send in ServiceInfo.request() When performing an info query via request(), a listener is started, and a packet is formed. As the packet is formed, known answers are taken from the cache and placed into the packet. Then the packet is sent. The packet is self received (via multicast loopback, I assume). At that point the listener is fired and the answers in the packet are propagated back to the object that started the request. This is a really long way around the barn. The PR queries the cache directly in request() and then calls update_record(). If all of the information is in the cache, then no packet is formed or sent or received. This approach was taken because, for whatever reason, the reception of the packets on windows via the loopback was proving to be unreliable. The method has the side benefit of being a whole lot faster. This PR also incorporates the joins() from PR #30. In addition it moves the two joins() in close() to their own thread because they can take quite a while to execute. --- zeroconf.py | 38 ++++++++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/zeroconf.py b/zeroconf.py index 172254391..bcc1fcffe 100644 --- a/zeroconf.py +++ b/zeroconf.py @@ -1045,6 +1045,7 @@ def enqueue_callback(state_change, name): def cancel(self): self.done = True self.zc.notify_all() + self.join() def run(self): while True: @@ -1204,10 +1205,24 @@ def request(self, zc, timeout): next = now + delay last = now + timeout result = False + + record_types_for_check_cache = [ + (_TYPE_SRV, _CLASS_IN), + (_TYPE_TXT, _CLASS_IN), + ] + if self.server is not None: + record_types_for_check_cache.append((_TYPE_A, _CLASS_IN)) + for record_type in record_types_for_check_cache: + cached = zc.cache.get_by_details(self.name, *record_type) + if cached: + self.update_record(zc, now, cached) + + if None not in (self.server, self.address, self.text): + return True + try: zc.add_listener(self, DNSQuestion(self.name, _TYPE_ANY, _CLASS_IN)) - while (self.server is None or self.address is None or - self.text is None): + while None in (self.server, self.address, self.text): if last <= now: return False if next <= now: @@ -1689,5 +1704,20 @@ def close(self): self.notify_all() self.engine.notify() self.unregister_all_services() - for s in [self._listen_socket] + self._respond_sockets: - s.close() + + class CloseThread(threading.Thread): + """Engine can take a while to shutdown, so shunt him off to + another thread. + """ + def __init__(self, zc): + super(CloseThread, self).__init__() + self.zc = zc + + def run(self): + self.zc.reaper.join() + self.zc.engine.join() + sockets = [self.zc._listen_socket] + self.zc._respond_sockets + for s in sockets: + s.close() + + CloseThread(self).start() From 8a110f58b02825100f5bdb56c119495ae42ae54c Mon Sep 17 00:00:00 2001 From: Stephen Rauch Date: Tue, 22 Mar 2016 15:46:05 -0700 Subject: [PATCH 3/8] Fix locking race condition in Engine.run() This fixes a race condition in which the receive engine was waiting against its condition variable under a different lock than the one it used to determine if it needed to wait. This was causing the code to sometimes take 5 seconds to do anything useful. When fixing the race condition, decided to also fix the other correctness issues in the loop which was likely causing the errors that led to the inclusion of the 'except Exception' catch all. This in turn allowed the use of EBADF error due to closing the socket during exit to be used to get out of the select in a timely manner. Finally, this allowed reorganizing the shutdown code to shutdown from the front to the back. That is to say, shutdown the recv socket first, which then allows a clean join with the engine thread. After the engine thread exits most everything else is inert as all callbacks have been unwound. --- zeroconf.py | 96 +++++++++++++++++++---------------------------------- 1 file changed, 34 insertions(+), 62 deletions(-) diff --git a/zeroconf.py b/zeroconf.py index bcc1fcffe..669508231 100644 --- a/zeroconf.py +++ b/zeroconf.py @@ -821,44 +821,36 @@ def __init__(self, zc): def run(self): while not _GLOBAL_DONE: - rs = self.get_readers() - if len(rs) == 0: - # No sockets to manage, but we wait for the timeout - # or addition of a socket - # - with self.condition: + with self.condition: + rs = self.readers.keys() + if len(rs) == 0: + # No sockets to manage, but we wait for the timeout + # or addition of a socket self.condition.wait(self.timeout) - else: + + if len(rs) != 0: try: rr, wr, er = select.select(rs, [], [], self.timeout) - if _GLOBAL_DONE: - break - for socket_ in rr: - try: - self.readers[socket_].handle_read(socket_) - except Exception as e: # TODO stop catching all Exceptions - log.exception('Unknown error, possibly benign: %r', e) - except Exception as e: # TODO stop catching all Exceptions - log.exception('Unknown error, possibly benign: %r', e) - - def get_readers(self): - result = [] + if not _GLOBAL_DONE: + for socket_ in rr: + reader = self.readers.get(socket_) + if reader: + reader.handle_read(socket_) + + except socket.error as e: + # If the socket was closed by another thread, during + # shutdown, ignore it and exit + if e.errno != socket.EBADF or not _GLOBAL_DONE: + raise + + def add_reader(self, reader, socket_): with self.condition: - result = self.readers.keys() - return result - - def add_reader(self, reader, socket): - with self.condition: - self.readers[socket] = reader + self.readers[socket_] = reader self.condition.notify() - def del_reader(self, socket): - with self.condition: - del self.readers[socket] - self.condition.notify() - - def notify(self): + def del_reader(self, socket_): with self.condition: + del self.readers[socket_] self.condition.notify() @@ -875,18 +867,8 @@ def __init__(self, zc): self.zc = zc def handle_read(self, socket_): - try: - data, (addr, port) = socket_.recvfrom(_MAX_MSG_ABSOLUTE) - except socket.error as e: - # If the socket was closed by another thread -- which happens - # regularly on shutdown -- an EBADF exception is thrown here. - # Ignore it. - if e.errno == socket.EBADF: - return - else: - raise e - else: - log.debug('Received %r from %r:%r', data, addr, port) + data, (addr, port) = socket_.recvfrom(_MAX_MSG_ABSOLUTE) + log.debug('Received %r from %r:%r', data, addr, port) self.data = data msg = DNSIncoming(data) @@ -1044,7 +1026,7 @@ def enqueue_callback(state_change, name): def cancel(self): self.done = True - self.zc.notify_all() + self.zc.remove_listener(self) self.join() def run(self): @@ -1701,23 +1683,13 @@ def close(self): global _GLOBAL_DONE if not _GLOBAL_DONE: _GLOBAL_DONE = True + # shutdown recv socket and thread + self.engine.del_reader(self._listen_socket) + self._listen_socket.close() + self.engine.join() + + # shutdown the rest self.notify_all() - self.engine.notify() self.unregister_all_services() - - class CloseThread(threading.Thread): - """Engine can take a while to shutdown, so shunt him off to - another thread. - """ - def __init__(self, zc): - super(CloseThread, self).__init__() - self.zc = zc - - def run(self): - self.zc.reaper.join() - self.zc.engine.join() - sockets = [self.zc._listen_socket] + self.zc._respond_sockets - for s in sockets: - s.close() - - CloseThread(self).start() + for s in self._respond_sockets: + s.close() From 7bbee590e553a1ff0e4dde3b1fdcf614b7e1ecd5 Mon Sep 17 00:00:00 2001 From: Stephen Rauch Date: Tue, 22 Mar 2016 15:53:04 -0700 Subject: [PATCH 4/8] Remove a now invalid test case With the restructure of shutdown, Listener() now needs to throw EBADF on a closed socket to allow a timely and graceful shutdown. --- test_zeroconf.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/test_zeroconf.py b/test_zeroconf.py index 6ecf6728a..97ac202c0 100644 --- a/test_zeroconf.py +++ b/test_zeroconf.py @@ -10,14 +10,12 @@ import unittest from threading import Event -from mock import Mock from six import indexbytes from six.moves import xrange import zeroconf as r from zeroconf import ( DNSText, - Listener, ServiceBrowser, ServiceInfo, ServiceStateChange, @@ -188,17 +186,6 @@ def on_service_state_change(zeroconf, service_type, state_change, name): zeroconf_browser.close() -def test_listener_handles_closed_socket_situation_gracefully(): - error = socket.error(socket.EBADF) - error.errno = socket.EBADF - - zeroconf = Mock() - zeroconf.socket.recvfrom.side_effect = error - - listener = Listener(zeroconf) - listener.handle_read(zeroconf.socket) - - def test_dnstext_repr_works(): # There was an issue on Python 3 that prevented DNSText's repr # from working when the text was longer than 10 bytes From ad3c248e4b67d5d2e9a4448a56b4e4648284ecd4 Mon Sep 17 00:00:00 2001 From: Stephen Rauch Date: Fri, 25 Mar 2016 17:35:03 -0700 Subject: [PATCH 5/8] Shutdown the service listeners in an organized fashion Also adds names to the various threads to make debugging easier. --- zeroconf.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/zeroconf.py b/zeroconf.py index 669508231..08f47e11c 100644 --- a/zeroconf.py +++ b/zeroconf.py @@ -811,7 +811,7 @@ class Engine(threading.Thread): """ def __init__(self, zc): - threading.Thread.__init__(self) + threading.Thread.__init__(self, name='zeroconf-Engine') self.daemon = True self.zc = zc self.readers = {} # maps socket to reader @@ -893,7 +893,7 @@ class Reaper(threading.Thread): have expired.""" def __init__(self, zc): - threading.Thread.__init__(self) + threading.Thread.__init__(self, name='zeroconf-Reaper') self.daemon = True self.zc = zc self.start() @@ -948,7 +948,8 @@ class ServiceBrowser(threading.Thread): def __init__(self, zc, type_, handlers=None, listener=None): """Creates a browser for a specific type""" assert handlers or listener, 'You need to specify at least one handler' - threading.Thread.__init__(self) + threading.Thread.__init__(self, + name='zeroconf-ServiceBrowser' + type_) self.daemon = True self.zc = zc self.type = type_ @@ -1380,7 +1381,7 @@ def __init__( self._respond_sockets.append(respond_socket) self.listeners = [] - self.browsers = [] + self.browsers = {} self.services = {} self.servicetypes = {} @@ -1418,14 +1419,18 @@ def add_service_listener(self, type, listener): will then have its update_record method called when information arrives for that type.""" self.remove_service_listener(listener) - self.browsers.append(ServiceBrowser(self, type, listener)) + self.browsers[listener] = ServiceBrowser(self, type, listener) def remove_service_listener(self, listener): """Removes a listener from the set that is currently listening.""" - for browser in self.browsers: - if browser.listener == listener: - browser.cancel() - del browser + if listener in self.browsers: + self.browsers[listener].cancel() + del self.browsers[listener] + + def remove_all_service_listeners(self): + """Removes a listener from the set that is currently listening.""" + for listener in self.browsers.keys(): + self.remove_service_listener(listener) def register_service(self, info, ttl=_DNS_TTL): """Registers service information to the network with a default TTL @@ -1691,5 +1696,6 @@ def close(self): # shutdown the rest self.notify_all() self.unregister_all_services() + self.remove_all_service_listeners() for s in self._respond_sockets: s.close() From 75232ccf28a820ee723db072951078eba31145a5 Mon Sep 17 00:00:00 2001 From: Stephen Rauch Date: Sat, 2 Apr 2016 13:46:30 -0700 Subject: [PATCH 6/8] Improve test coverage Add more needed shutdown cleanup found via additional test coverage. Force timeout calculation from milli to seconds to use floating point. --- .gitignore | 10 ++++++ setup.cfg | 1 + test_zeroconf.py | 84 ++++++++++++++++++++++++++++++++++++++++++++++-- zeroconf.py | 40 +++++++++++------------ 4 files changed, 111 insertions(+), 24 deletions(-) diff --git a/.gitignore b/.gitignore index dc84959d1..264bddcf2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,12 @@ build/ +*.pyc +*.pyo +Thumbs.db +.DS_Store +.project +.pydevproject +.settings +.idea +.vslick +.cache diff --git a/setup.cfg b/setup.cfg index 51017e121..24b129b75 100644 --- a/setup.cfg +++ b/setup.cfg @@ -5,3 +5,4 @@ universal = 1 show-source = 1 import-order-style=google application-import-names=zeroconf +max-line-length=110 diff --git a/test_zeroconf.py b/test_zeroconf.py index 97ac202c0..e9d287803 100644 --- a/test_zeroconf.py +++ b/test_zeroconf.py @@ -7,6 +7,7 @@ import logging import socket import struct +import time import unittest from threading import Event @@ -149,6 +150,84 @@ def test_launch_and_close(self): rv.close() +class Exceptions(unittest.TestCase): + + def test_bad_service_info_name(self): + browser = Zeroconf() + self.assertRaises(r.BadTypeInNameException, + browser.get_service_info, "type", "type_not") + browser.close() + + +class Listener(unittest.TestCase): + + def test_integration_with_listener_class(self): + + service_added = Event() + service_removed = Event() + + type_ = "_http._tcp.local." + name = "xxxyyy" + registration_name = "%s.%s" % (name, type_) + + class MyListener(object): + def add_service(self, zeroconf, type, name): + zeroconf.get_service_info(type, name) + service_added.set() + + def remove_service(self, zeroconf, type, name): + service_removed.set() + + zeroconf_browser = Zeroconf() + zeroconf_browser.add_service_listener(type_, MyListener()) + + properties = dict( + prop_none=None, + prop_string=b'a_prop', + prop_float=1.0, + prop_blank=b'a blanked string', + prop_true=1, + prop_false=0, + ) + + zeroconf_registrar = Zeroconf() + desc = {'path': '/~paulsm/'} + desc.update(properties) + info = ServiceInfo( + type_, registration_name, + socket.inet_aton("10.0.1.2"), 80, 0, 0, + desc, "ash-2.local.") + zeroconf_registrar.register_service(info) + + try: + service_added.wait(1) + assert service_added.is_set() + + # short pause to allow multicast timers to expire + time.sleep(2) + + # clear the answer cache to force query + for record in zeroconf_browser.cache.entries(): + zeroconf_browser.cache.remove(record) + + # get service info without answer cache + info = zeroconf_browser.get_service_info(type_, registration_name) + + assert info.properties[b'prop_none'] is False + assert info.properties[b'prop_string'] == properties['prop_string'] + assert info.properties[b'prop_float'] is False + assert info.properties[b'prop_blank'] == properties['prop_blank'] + assert info.properties[b'prop_true'] is True + assert info.properties[b'prop_false'] is False + + zeroconf_registrar.unregister_service(info) + service_removed.wait(1) + assert service_removed.is_set() + finally: + zeroconf_registrar.close() + zeroconf_browser.close() + + def test_integration(): service_added = Event() service_removed = Event() @@ -177,9 +256,8 @@ def on_service_state_change(zeroconf, service_type, state_change, name): try: service_added.wait(1) assert service_added.is_set() - zeroconf_registrar.unregister_service(info) - service_removed.wait(1) - assert service_removed.is_set() + # Don't remove service, allow close() to cleanup + finally: zeroconf_registrar.close() browser.cancel() diff --git a/zeroconf.py b/zeroconf.py index 08f47e11c..5afce10dd 100644 --- a/zeroconf.py +++ b/zeroconf.py @@ -64,10 +64,6 @@ def emit(self, record): if log.level == logging.NOTSET: log.setLevel(logging.WARN) -# hook for threads - -_GLOBAL_DONE = False - # Some timing constants _UNREGISTER_TIME = 125 @@ -292,7 +288,7 @@ def get_expiration_time(self, percent): def get_remaining_ttl(self, now): """Returns the remaining TTL in seconds.""" - return max(0, (self.get_expiration_time(100) - now) / 1000) + return max(0, (self.get_expiration_time(100) - now) / 1000.0) def is_expired(self, now): """Returns true if this record has expired.""" @@ -820,7 +816,7 @@ def __init__(self, zc): self.start() def run(self): - while not _GLOBAL_DONE: + while not self.zc._GLOBAL_DONE: with self.condition: rs = self.readers.keys() if len(rs) == 0: @@ -831,7 +827,7 @@ def run(self): if len(rs) != 0: try: rr, wr, er = select.select(rs, [], [], self.timeout) - if not _GLOBAL_DONE: + if not self.zc._GLOBAL_DONE: for socket_ in rr: reader = self.readers.get(socket_) if reader: @@ -840,7 +836,7 @@ def run(self): except socket.error as e: # If the socket was closed by another thread, during # shutdown, ignore it and exit - if e.errno != socket.EBADF or not _GLOBAL_DONE: + if e.errno != socket.EBADF or not self.zc._GLOBAL_DONE: raise def add_reader(self, reader, socket_): @@ -901,7 +897,7 @@ def __init__(self, zc): def run(self): while True: self.zc.wait(10 * 1000) - if _GLOBAL_DONE: + if self.zc._GLOBAL_DONE: return now = current_time_millis() for record in self.zc.cache.entries(): @@ -1035,7 +1031,7 @@ def run(self): now = current_time_millis() if len(self._handlers_to_call) == 0 and self.next_time > now: self.zc.wait(self.next_time - now) - if _GLOBAL_DONE or self.done: + if self.zc._GLOBAL_DONE or self.done: return now = current_time_millis() @@ -1049,7 +1045,7 @@ def run(self): self.next_time = now + self.delay self.delay = min(20 * 1000, self.delay * 2) - if len(self._handlers_to_call) > 0 and not _GLOBAL_DONE: + if len(self._handlers_to_call) > 0 and not self.zc._GLOBAL_DONE: handler = self._handlers_to_call.pop(0) handler(self.zc) @@ -1345,8 +1341,8 @@ def __init__( :type interfaces: :class:`InterfaceChoice` or sequence of ip addresses """ - global _GLOBAL_DONE - _GLOBAL_DONE = False + # hook for threads + self._GLOBAL_DONE = False self._listen_socket = new_socket() interfaces = normalize_interface_choice(interfaces, socket.AF_INET) @@ -1398,7 +1394,7 @@ def wait(self, timeout): """Calling thread waits for a given number of milliseconds or until notified.""" with self.condition: - self.condition.wait(timeout / 1000) + self.condition.wait(timeout / 1000.0) def notify_all(self): """Notifies all waiting threads""" @@ -1429,7 +1425,7 @@ def remove_service_listener(self, listener): def remove_all_service_listeners(self): """Removes a listener from the set that is currently listening.""" - for listener in self.browsers.keys(): + for listener in [k for k in self.browsers]: self.remove_service_listener(listener) def register_service(self, info, ttl=_DNS_TTL): @@ -1674,7 +1670,7 @@ def send(self, out, addr=_MDNS_ADDR, port=_MDNS_PORT): packet = out.packet() log.debug('Sending %r as %r...', out, packet) for s in self._respond_sockets: - if _GLOBAL_DONE: + if self._GLOBAL_DONE: return bytes_sent = s.sendto(packet, 0, (addr, port)) if bytes_sent != len(packet): @@ -1685,9 +1681,12 @@ def send(self, out, addr=_MDNS_ADDR, port=_MDNS_PORT): def close(self): """Ends the background threads, and prevent this instance from servicing further queries.""" - global _GLOBAL_DONE - if not _GLOBAL_DONE: - _GLOBAL_DONE = True + if not self._GLOBAL_DONE: + self._GLOBAL_DONE = True + # remove service listeners + self.remove_all_service_listeners() + self.unregister_all_services() + # shutdown recv socket and thread self.engine.del_reader(self._listen_socket) self._listen_socket.close() @@ -1695,7 +1694,6 @@ def close(self): # shutdown the rest self.notify_all() - self.unregister_all_services() - self.remove_all_service_listeners() + self.reaper.join() for s in self._respond_sockets: s.close() From d909942e2c9479819e9113ffb3a354b1d99d6814 Mon Sep 17 00:00:00 2001 From: Stephen Rauch Date: Sat, 2 Apr 2016 14:01:06 -0700 Subject: [PATCH 7/8] init ServiceInfo._properties --- zeroconf.py | 1 + 1 file changed, 1 insertion(+) diff --git a/zeroconf.py b/zeroconf.py index 5afce10dd..842173e3d 100644 --- a/zeroconf.py +++ b/zeroconf.py @@ -1080,6 +1080,7 @@ def __init__(self, type, name, address=None, port=None, weight=0, self.server = server else: self.server = name + self._properties = {} self._set_properties(properties) @property From cfbb1572e44c4d8af1b50cb62abc0d426fc8e3ea Mon Sep 17 00:00:00 2001 From: Stephen Rauch Date: Wed, 6 Apr 2016 12:48:01 -0700 Subject: [PATCH 8/8] Add query support and test case for _services._dns-sd._udp.local. --- README.rst | 7 +++++++ test_zeroconf.py | 28 ++++++++++++++++++++++++++++ zeroconf.py | 40 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 75 insertions(+) diff --git a/README.rst b/README.rst index 3e812c80b..e04f1470d 100644 --- a/README.rst +++ b/README.rst @@ -110,6 +110,13 @@ Here's an example: If you want to customize that you need to specify ``interfaces`` argument when constructing ``Zeroconf`` object (see the code for details). +If you don't know the name of the service you need to browse for, try: + +.. code-block:: python + + from zeroconf import ZeroconfServiceTypes + print('\n'.join(ZeroconfServiceTypes.find())) + See examples directory for more. Changelog diff --git a/test_zeroconf.py b/test_zeroconf.py index e9d287803..010876774 100644 --- a/test_zeroconf.py +++ b/test_zeroconf.py @@ -21,6 +21,7 @@ ServiceInfo, ServiceStateChange, Zeroconf, + ZeroconfServiceTypes, ) log = logging.getLogger('zeroconf') @@ -159,6 +160,33 @@ def test_bad_service_info_name(self): browser.close() +class ServiceTypesQuery(unittest.TestCase): + + def test_integration_with_listener(self): + + type_ = "_test_service_type._tcp.local." + name = "xxxyyy" + registration_name = "%s.%s" % (name, type_) + + zeroconf_registrar = Zeroconf(interfaces=['127.0.0.1']) + desc = {'path': '/~paulsm/'} + info = ServiceInfo( + type_, registration_name, + socket.inet_aton("10.0.1.2"), 80, 0, 0, + desc, "ash-2.local.") + zeroconf_registrar.register_service(info) + + try: + service_types = ZeroconfServiceTypes.find(timeout=0.5) + assert type_ in service_types + service_types = ZeroconfServiceTypes.find( + zc=zeroconf_registrar, timeout=0.5) + assert type_ in service_types + + finally: + zeroconf_registrar.close() + + class Listener(unittest.TestCase): def test_integration_with_listener_class(self): diff --git a/zeroconf.py b/zeroconf.py index 842173e3d..772652da6 100644 --- a/zeroconf.py +++ b/zeroconf.py @@ -1256,6 +1256,46 @@ def __repr__(self): ) +class ZeroconfServiceTypes(object): + """ + Return all of the advertised services on any local networks + """ + def __init__(self): + self.found_services = set() + + def add_service(self, zc, type_, name): + self.found_services.add(name) + + def remove_service(self, zc, type_, name): + pass + + @classmethod + def find(cls, zc=None, timeout=5): + """ + Return all of the advertised services on any local networks. + + :param zc: Zeroconf() instance. Pass in if already have an + instance running or if non-default interfaces are needed + :param timeout: seconds to wait for any responses + :return: tuple of service type strings + """ + local_zc = zc or Zeroconf() + listener = cls() + browser = ServiceBrowser( + local_zc, '_services._dns-sd._udp.local.', listener=listener) + + # wait for responses + time.sleep(timeout) + + # close down anything we opened + if zc is None: + local_zc.close() + else: + browser.cancel() + + return tuple(sorted(listener.found_services)) + + @enum.unique class InterfaceChoice(enum.Enum): Default = 1