From 7170a8282e0f9659ac93f11ff95d9aa46f8b57c8 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sat, 12 Jun 2021 23:24:21 -1000 Subject: [PATCH] Removed protected imports from zeroconf namespace - These protected items are not intended to be part of the public API --- tests/test_aio.py | 15 +-- tests/test_core.py | 61 ++++++++---- tests/test_dns.py | 211 +++++++++++++++++++++-------------------- tests/test_init.py | 130 ++++++++++++++----------- tests/test_services.py | 113 ++++++++++++---------- zeroconf/__init__.py | 54 ----------- 6 files changed, 291 insertions(+), 293 deletions(-) diff --git a/tests/test_aio.py b/tests/test_aio.py index 48a6ccc4..b1be151d 100644 --- a/tests/test_aio.py +++ b/tests/test_aio.py @@ -11,17 +11,12 @@ import pytest -from zeroconf import ( - BadTypeInNameException, - NonUniqueNameException, - ServiceInfo, - ServiceListener, - ServiceNameAlreadyRegistered, - Zeroconf, - _LISTENER_TIME, - current_time_millis, -) from zeroconf.aio import AsyncServiceInfo, AsyncServiceListener, AsyncZeroconf +from zeroconf.core import Zeroconf +from zeroconf.const import _LISTENER_TIME +from zeroconf.exceptions import BadTypeInNameException, NonUniqueNameException, ServiceNameAlreadyRegistered +from zeroconf.services import ServiceInfo, ServiceListener +from zeroconf.utils.time import current_time_millis @pytest.fixture(autouse=True) diff --git a/tests/test_core.py b/tests/test_core.py index 0d63a58c..0d2a2a06 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -15,6 +15,7 @@ import zeroconf as r from zeroconf import core +from zeroconf import const from . import has_working_ipv6, _inject_response @@ -39,8 +40,8 @@ def test_reaper(self): zeroconf = core.Zeroconf(interfaces=['127.0.0.1']) cache = zeroconf.cache original_entries = list(itertools.chain(*[cache.entries_with_name(name) for name in cache.names()])) - record_with_10s_ttl = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 10, b'a') - record_with_1s_ttl = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'b') + record_with_10s_ttl = r.DNSAddress('a', const._TYPE_SOA, const._CLASS_IN, 10, b'a') + record_with_1s_ttl = r.DNSAddress('a', const._TYPE_SOA, const._CLASS_IN, 1, b'b') zeroconf.cache.add(record_with_10s_ttl) zeroconf.cache.add(record_with_1s_ttl) entries_with_cache = list(itertools.chain(*[cache.entries_with_name(name) for name in cache.names()])) @@ -102,11 +103,18 @@ def test_launch_and_close_v6_only(self): def test_handle_response(self): def mock_incoming_msg(service_state_change: r.ServiceStateChange) -> r.DNSIncoming: ttl = 120 - generated = r.DNSOutgoing(r._FLAGS_QR_RESPONSE) + generated = r.DNSOutgoing(const._FLAGS_QR_RESPONSE) if service_state_change == r.ServiceStateChange.Updated: generated.add_answer_at_time( - r.DNSText(service_name, r._TYPE_TXT, r._CLASS_IN | r._CLASS_UNIQUE, ttl, service_text), 0 + r.DNSText( + service_name, + const._TYPE_TXT, + const._CLASS_IN | const._CLASS_UNIQUE, + ttl, + service_text, + ), + 0, ) return r.DNSIncoming(generated.packet()) @@ -114,22 +122,32 @@ def mock_incoming_msg(service_state_change: r.ServiceStateChange) -> r.DNSIncomi ttl = 0 generated.add_answer_at_time( - r.DNSPointer(service_type, r._TYPE_PTR, r._CLASS_IN, ttl, service_name), 0 + r.DNSPointer(service_type, const._TYPE_PTR, const._CLASS_IN, ttl, service_name), 0 ) generated.add_answer_at_time( r.DNSService( - service_name, r._TYPE_SRV, r._CLASS_IN | r._CLASS_UNIQUE, ttl, 0, 0, 80, service_server + service_name, + const._TYPE_SRV, + const._CLASS_IN | const._CLASS_UNIQUE, + ttl, + 0, + 0, + 80, + service_server, ), 0, ) generated.add_answer_at_time( - r.DNSText(service_name, r._TYPE_TXT, r._CLASS_IN | r._CLASS_UNIQUE, ttl, service_text), 0 + r.DNSText( + service_name, const._TYPE_TXT, const._CLASS_IN | const._CLASS_UNIQUE, ttl, service_text + ), + 0, ) generated.add_answer_at_time( r.DNSAddress( service_server, - r._TYPE_A, - r._CLASS_IN | r._CLASS_UNIQUE, + const._TYPE_A, + const._CLASS_IN | const._CLASS_UNIQUE, ttl, socket.inet_aton(service_address), ), @@ -141,12 +159,12 @@ def mock_incoming_msg(service_state_change: r.ServiceStateChange) -> r.DNSIncomi def mock_split_incoming_msg(service_state_change: r.ServiceStateChange) -> r.DNSIncoming: """Mock an incoming message for the case where the packet is split.""" ttl = 120 - generated = r.DNSOutgoing(r._FLAGS_QR_RESPONSE) + generated = r.DNSOutgoing(const._FLAGS_QR_RESPONSE) generated.add_answer_at_time( r.DNSAddress( service_server, - r._TYPE_A, - r._CLASS_IN | r._CLASS_UNIQUE, + const._TYPE_A, + const._CLASS_IN | const._CLASS_UNIQUE, ttl, socket.inet_aton(service_address), ), @@ -154,7 +172,14 @@ def mock_split_incoming_msg(service_state_change: r.ServiceStateChange) -> r.DNS ) generated.add_answer_at_time( r.DNSService( - service_name, r._TYPE_SRV, r._CLASS_IN | r._CLASS_UNIQUE, ttl, 0, 0, 80, service_server + service_name, + const._TYPE_SRV, + const._CLASS_IN | const._CLASS_UNIQUE, + ttl, + 0, + 0, + 80, + service_server, ), 0, ) @@ -171,10 +196,10 @@ def mock_split_incoming_msg(service_state_change: r.ServiceStateChange) -> r.DNS try: # service added _inject_response(zeroconf, mock_incoming_msg(r.ServiceStateChange.Added)) - dns_text = zeroconf.cache.get_by_details(service_name, r._TYPE_TXT, r._CLASS_IN) + dns_text = zeroconf.cache.get_by_details(service_name, const._TYPE_TXT, const._CLASS_IN) assert dns_text is not None assert cast(r.DNSText, dns_text).text == service_text # service_text is b'path=/~paulsm/' - all_dns_text = zeroconf.cache.get_all_by_details(service_name, r._TYPE_TXT, r._CLASS_IN) + all_dns_text = zeroconf.cache.get_all_by_details(service_name, const._TYPE_TXT, const._CLASS_IN) assert [dns_text] == all_dns_text # https://tools.ietf.org/html/rfc6762#section-10.2 @@ -188,7 +213,7 @@ def mock_split_incoming_msg(service_state_change: r.ServiceStateChange) -> r.DNS # service updated. currently only text record can be updated service_text = b'path=/~humingchun/' _inject_response(zeroconf, mock_incoming_msg(r.ServiceStateChange.Updated)) - dns_text = zeroconf.cache.get_by_details(service_name, r._TYPE_TXT, r._CLASS_IN) + dns_text = zeroconf.cache.get_by_details(service_name, const._TYPE_TXT, const._CLASS_IN) assert dns_text is not None assert cast(r.DNSText, dns_text).text == service_text # service_text is b'path=/~humingchun/' @@ -198,13 +223,13 @@ def mock_split_incoming_msg(service_state_change: r.ServiceStateChange) -> r.DNS # This should not evict TXT records from the cache _inject_response(zeroconf, mock_split_incoming_msg(r.ServiceStateChange.Updated)) time.sleep(1.1) - dns_text = zeroconf.cache.get_by_details(service_name, r._TYPE_TXT, r._CLASS_IN) + dns_text = zeroconf.cache.get_by_details(service_name, const._TYPE_TXT, const._CLASS_IN) assert dns_text is not None assert cast(r.DNSText, dns_text).text == service_text # service_text is b'path=/~humingchun/' # service removed _inject_response(zeroconf, mock_incoming_msg(r.ServiceStateChange.Removed)) - dns_text = zeroconf.cache.get_by_details(service_name, r._TYPE_TXT, r._CLASS_IN) + dns_text = zeroconf.cache.get_by_details(service_name, const._TYPE_TXT, const._CLASS_IN) assert dns_text is None finally: diff --git a/tests/test_dns.py b/tests/test_dns.py index de0e4932..db41693d 100644 --- a/tests/test_dns.py +++ b/tests/test_dns.py @@ -14,6 +14,7 @@ from typing import Dict, cast # noqa # used in type hints import zeroconf as r +from zeroconf import const from zeroconf import ( DNSHinfo, DNSText, @@ -46,46 +47,48 @@ def test_dns_text_repr(self): repr(text) def test_dns_hinfo_repr_eq(self): - hinfo = DNSHinfo('irrelevant', r._TYPE_HINFO, 0, 0, 'cpu', 'os') + hinfo = DNSHinfo('irrelevant', const._TYPE_HINFO, 0, 0, 'cpu', 'os') assert hinfo == hinfo repr(hinfo) def test_dns_pointer_repr(self): - pointer = r.DNSPointer('irrelevant', r._TYPE_PTR, r._CLASS_IN, r._DNS_OTHER_TTL, '123') + pointer = r.DNSPointer('irrelevant', const._TYPE_PTR, const._CLASS_IN, const._DNS_OTHER_TTL, '123') repr(pointer) def test_dns_address_repr(self): - address = r.DNSAddress('irrelevant', r._TYPE_SOA, r._CLASS_IN, 1, b'a') + address = r.DNSAddress('irrelevant', const._TYPE_SOA, const._CLASS_IN, 1, b'a') assert repr(address).endswith("b'a'") address_ipv4 = r.DNSAddress( - 'irrelevant', r._TYPE_SOA, r._CLASS_IN, 1, socket.inet_pton(socket.AF_INET, '127.0.0.1') + 'irrelevant', const._TYPE_SOA, const._CLASS_IN, 1, socket.inet_pton(socket.AF_INET, '127.0.0.1') ) assert repr(address_ipv4).endswith('127.0.0.1') address_ipv6 = r.DNSAddress( - 'irrelevant', r._TYPE_SOA, r._CLASS_IN, 1, socket.inet_pton(socket.AF_INET6, '::1') + 'irrelevant', const._TYPE_SOA, const._CLASS_IN, 1, socket.inet_pton(socket.AF_INET6, '::1') ) assert repr(address_ipv6).endswith('::1') def test_dns_question_repr(self): - question = r.DNSQuestion('irrelevant', r._TYPE_SRV, r._CLASS_IN | r._CLASS_UNIQUE) + question = r.DNSQuestion('irrelevant', const._TYPE_SRV, const._CLASS_IN | const._CLASS_UNIQUE) repr(question) assert not question != question def test_dns_service_repr(self): - service = r.DNSService('irrelevant', r._TYPE_SRV, r._CLASS_IN, r._DNS_HOST_TTL, 0, 0, 80, 'a') + service = r.DNSService( + 'irrelevant', const._TYPE_SRV, const._CLASS_IN, const._DNS_HOST_TTL, 0, 0, 80, 'a' + ) repr(service) def test_dns_record_abc(self): - record = r.DNSRecord('irrelevant', r._TYPE_SRV, r._CLASS_IN, r._DNS_HOST_TTL) + record = r.DNSRecord('irrelevant', const._TYPE_SRV, const._CLASS_IN, const._DNS_HOST_TTL) self.assertRaises(r.AbstractMethodException, record.__eq__, record) self.assertRaises(r.AbstractMethodException, record.write, None) def test_dns_record_reset_ttl(self): - record = r.DNSRecord('irrelevant', r._TYPE_SRV, r._CLASS_IN, r._DNS_HOST_TTL) + record = r.DNSRecord('irrelevant', const._TYPE_SRV, const._CLASS_IN, const._DNS_HOST_TTL) time.sleep(1) - record2 = r.DNSRecord('irrelevant', r._TYPE_SRV, r._CLASS_IN, r._DNS_HOST_TTL) + record2 = r.DNSRecord('irrelevant', const._TYPE_SRV, const._CLASS_IN, const._DNS_HOST_TTL) now = r.current_time_millis() assert record.created != record2.created @@ -131,7 +134,7 @@ def test_service_info_text_properties_not_given(self): repr(info) def test_dns_outgoing_repr(self): - dns_outgoing = r.DNSOutgoing(r._FLAGS_QR_QUERY) + dns_outgoing = r.DNSOutgoing(const._FLAGS_QR_QUERY) repr(dns_outgoing) @@ -145,22 +148,22 @@ def test_parse_own_packet_simple_unicast(self): r.DNSIncoming(generated.packet()) def test_parse_own_packet_flags(self): - generated = r.DNSOutgoing(r._FLAGS_QR_QUERY) + generated = r.DNSOutgoing(const._FLAGS_QR_QUERY) r.DNSIncoming(generated.packet()) def test_parse_own_packet_question(self): - generated = r.DNSOutgoing(r._FLAGS_QR_QUERY) - generated.add_question(r.DNSQuestion("testname.local.", r._TYPE_SRV, r._CLASS_IN)) + generated = r.DNSOutgoing(const._FLAGS_QR_QUERY) + generated.add_question(r.DNSQuestion("testname.local.", const._TYPE_SRV, const._CLASS_IN)) r.DNSIncoming(generated.packet()) def test_parse_own_packet_response(self): - generated = r.DNSOutgoing(r._FLAGS_QR_RESPONSE) + generated = r.DNSOutgoing(const._FLAGS_QR_RESPONSE) generated.add_answer_at_time( r.DNSService( "æøå.local.", - r._TYPE_SRV, - r._CLASS_IN | r._CLASS_UNIQUE, - r._DNS_HOST_TTL, + const._TYPE_SRV, + const._CLASS_IN | const._CLASS_UNIQUE, + const._DNS_HOST_TTL, 0, 0, 80, @@ -173,8 +176,8 @@ def test_parse_own_packet_response(self): assert len(generated.answers) == len(parsed.answers) def test_match_question(self): - generated = r.DNSOutgoing(r._FLAGS_QR_QUERY) - question = r.DNSQuestion("testname.local.", r._TYPE_SRV, r._CLASS_IN) + generated = r.DNSOutgoing(const._FLAGS_QR_QUERY) + question = r.DNSQuestion("testname.local.", const._TYPE_SRV, const._CLASS_IN) generated.add_question(question) parsed = r.DNSIncoming(generated.packet()) assert len(generated.questions) == 1 @@ -182,14 +185,14 @@ def test_match_question(self): assert question == parsed.questions[0] def test_suppress_answer(self): - query_generated = r.DNSOutgoing(r._FLAGS_QR_QUERY) - question = r.DNSQuestion("testname.local.", r._TYPE_SRV, r._CLASS_IN) + query_generated = r.DNSOutgoing(const._FLAGS_QR_QUERY) + question = r.DNSQuestion("testname.local.", const._TYPE_SRV, const._CLASS_IN) query_generated.add_question(question) answer1 = r.DNSService( "testname1.local.", - r._TYPE_SRV, - r._CLASS_IN | r._CLASS_UNIQUE, - r._DNS_HOST_TTL, + const._TYPE_SRV, + const._CLASS_IN | const._CLASS_UNIQUE, + const._DNS_HOST_TTL, 0, 0, 80, @@ -197,9 +200,9 @@ def test_suppress_answer(self): ) staleanswer2 = r.DNSService( "testname2.local.", - r._TYPE_SRV, - r._CLASS_IN | r._CLASS_UNIQUE, - r._DNS_HOST_TTL / 2, + const._TYPE_SRV, + const._CLASS_IN | const._CLASS_UNIQUE, + const._DNS_HOST_TTL / 2, 0, 0, 80, @@ -207,9 +210,9 @@ def test_suppress_answer(self): ) answer2 = r.DNSService( "testname2.local.", - r._TYPE_SRV, - r._CLASS_IN | r._CLASS_UNIQUE, - r._DNS_HOST_TTL, + const._TYPE_SRV, + const._CLASS_IN | const._CLASS_UNIQUE, + const._DNS_HOST_TTL, 0, 0, 80, @@ -220,7 +223,7 @@ def test_suppress_answer(self): query = r.DNSIncoming(query_generated.packet()) # Should be suppressed - response = r.DNSOutgoing(r._FLAGS_QR_RESPONSE) + response = r.DNSOutgoing(const._FLAGS_QR_RESPONSE) response.add_answer(query, answer1) assert len(response.answers) == 0 @@ -237,13 +240,13 @@ def test_suppress_answer(self): # Should not be suppressed, type is different tmp = copy.copy(answer1) - tmp.type = r._TYPE_A + tmp.type = const._TYPE_A response.add_answer(query, tmp) assert len(response.answers) == 3 # Should not be suppressed, class is different tmp = copy.copy(answer1) - tmp.class_ = r._CLASS_NONE + tmp.class_ = const._CLASS_NONE response.add_answer(query, tmp) assert len(response.answers) == 4 @@ -251,30 +254,30 @@ def test_suppress_answer(self): def test_dns_hinfo(self): generated = r.DNSOutgoing(0) - generated.add_additional_answer(DNSHinfo('irrelevant', r._TYPE_HINFO, 0, 0, 'cpu', 'os')) + generated.add_additional_answer(DNSHinfo('irrelevant', const._TYPE_HINFO, 0, 0, 'cpu', 'os')) parsed = r.DNSIncoming(generated.packet()) answer = cast(r.DNSHinfo, parsed.answers[0]) assert answer.cpu == u'cpu' assert answer.os == u'os' generated = r.DNSOutgoing(0) - generated.add_additional_answer(DNSHinfo('irrelevant', r._TYPE_HINFO, 0, 0, 'cpu', 'x' * 257)) + generated.add_additional_answer(DNSHinfo('irrelevant', const._TYPE_HINFO, 0, 0, 'cpu', 'x' * 257)) self.assertRaises(r.NamePartTooLongException, generated.packet) def test_many_questions(self): """Test many questions get seperated into multiple packets.""" - generated = r.DNSOutgoing(r._FLAGS_QR_QUERY) + generated = r.DNSOutgoing(const._FLAGS_QR_QUERY) questions = [] for i in range(100): - question = r.DNSQuestion(f"testname{i}.local.", r._TYPE_SRV, r._CLASS_IN) + question = r.DNSQuestion(f"testname{i}.local.", const._TYPE_SRV, const._CLASS_IN) generated.add_question(question) questions.append(question) assert len(generated.questions) == 100 packets = generated.packets() assert len(packets) == 2 - assert len(packets[0]) < r._MAX_MSG_TYPICAL - assert len(packets[1]) < r._MAX_MSG_TYPICAL + assert len(packets[0]) < const._MAX_MSG_TYPICAL + assert len(packets[1]) < const._MAX_MSG_TYPICAL parsed1 = r.DNSIncoming(packets[0]) assert len(parsed1.questions) == 85 @@ -286,15 +289,15 @@ def test_only_one_answer_can_by_large(self): https://datatracker.ietf.org/doc/html/rfc6762#section-17 """ - generated = r.DNSOutgoing(r._FLAGS_QR_RESPONSE) - query = r.DNSIncoming(r.DNSOutgoing(r._FLAGS_QR_QUERY).packet()) + generated = r.DNSOutgoing(const._FLAGS_QR_RESPONSE) + query = r.DNSIncoming(r.DNSOutgoing(const._FLAGS_QR_QUERY).packet()) for i in range(3): generated.add_answer( query, r.DNSText( "zoom._hap._tcp.local.", - r._TYPE_TXT, - r._CLASS_IN | r._CLASS_UNIQUE, + const._TYPE_TXT, + const._CLASS_IN | const._CLASS_UNIQUE, 1200, b'\x04ff=0\x04ci=2\x04sf=0\x0bsh=6fLM5A==' * 100, ), @@ -303,9 +306,9 @@ def test_only_one_answer_can_by_large(self): query, r.DNSService( "testname1.local.", - r._TYPE_SRV, - r._CLASS_IN | r._CLASS_UNIQUE, - r._DNS_HOST_TTL, + const._TYPE_SRV, + const._CLASS_IN | const._CLASS_UNIQUE, + const._DNS_HOST_TTL, 0, 0, 80, @@ -316,16 +319,16 @@ def test_only_one_answer_can_by_large(self): packets = generated.packets() assert len(packets) == 4 - assert len(packets[0]) <= r._MAX_MSG_ABSOLUTE - assert len(packets[0]) > r._MAX_MSG_TYPICAL + assert len(packets[0]) <= const._MAX_MSG_ABSOLUTE + assert len(packets[0]) > const._MAX_MSG_TYPICAL - assert len(packets[1]) <= r._MAX_MSG_ABSOLUTE - assert len(packets[1]) > r._MAX_MSG_TYPICAL + assert len(packets[1]) <= const._MAX_MSG_ABSOLUTE + assert len(packets[1]) > const._MAX_MSG_TYPICAL - assert len(packets[2]) <= r._MAX_MSG_ABSOLUTE - assert len(packets[2]) > r._MAX_MSG_TYPICAL + assert len(packets[2]) <= const._MAX_MSG_ABSOLUTE + assert len(packets[2]) > const._MAX_MSG_TYPICAL - assert len(packets[3]) <= r._MAX_MSG_TYPICAL + assert len(packets[3]) <= const._MAX_MSG_TYPICAL for packet in packets: parsed = r.DNSIncoming(packet) @@ -341,15 +344,15 @@ def test_questions_do_not_end_up_every_packet(self): questions and as many more Known-Answer records as will fit. """ - generated = r.DNSOutgoing(r._FLAGS_QR_QUERY) + generated = r.DNSOutgoing(const._FLAGS_QR_QUERY) for i in range(35): - question = r.DNSQuestion(f"testname{i}.local.", r._TYPE_SRV, r._CLASS_IN) + question = r.DNSQuestion(f"testname{i}.local.", const._TYPE_SRV, const._CLASS_IN) generated.add_question(question) answer = r.DNSService( f"testname{i}.local.", - r._TYPE_SRV, - r._CLASS_IN | r._CLASS_UNIQUE, - r._DNS_HOST_TTL, + const._TYPE_SRV, + const._CLASS_IN | const._CLASS_UNIQUE, + const._DNS_HOST_TTL, 0, 0, 80, @@ -362,8 +365,8 @@ def test_questions_do_not_end_up_every_packet(self): packets = generated.packets() assert len(packets) == 2 - assert len(packets[0]) <= r._MAX_MSG_TYPICAL - assert len(packets[1]) <= r._MAX_MSG_TYPICAL + assert len(packets[0]) <= const._MAX_MSG_TYPICAL + assert len(packets[1]) <= const._MAX_MSG_TYPICAL parsed1 = r.DNSIncoming(packets[0]) assert len(parsed1.questions) == 35 @@ -377,25 +380,25 @@ def test_questions_do_not_end_up_every_packet(self): class PacketForm(unittest.TestCase): def test_transaction_id(self): """ID must be zero in a DNS-SD packet""" - generated = r.DNSOutgoing(r._FLAGS_QR_QUERY) + generated = r.DNSOutgoing(const._FLAGS_QR_QUERY) bytes = generated.packet() id = bytes[0] << 8 | bytes[1] assert id == 0 def test_query_header_bits(self): - generated = r.DNSOutgoing(r._FLAGS_QR_QUERY) + generated = r.DNSOutgoing(const._FLAGS_QR_QUERY) bytes = generated.packet() flags = bytes[2] << 8 | bytes[3] assert flags == 0x0 def test_response_header_bits(self): - generated = r.DNSOutgoing(r._FLAGS_QR_RESPONSE) + generated = r.DNSOutgoing(const._FLAGS_QR_RESPONSE) bytes = generated.packet() flags = bytes[2] << 8 | bytes[3] assert flags == 0x8000 def test_numbers(self): - generated = r.DNSOutgoing(r._FLAGS_QR_RESPONSE) + generated = r.DNSOutgoing(const._FLAGS_QR_RESPONSE) bytes = generated.packet() (num_questions, num_answers, num_authorities, num_additionals) = struct.unpack('!4H', bytes[4:12]) assert num_questions == 0 @@ -404,8 +407,8 @@ def test_numbers(self): assert num_additionals == 0 def test_numbers_questions(self): - generated = r.DNSOutgoing(r._FLAGS_QR_RESPONSE) - question = r.DNSQuestion("testname.local.", r._TYPE_SRV, r._CLASS_IN) + generated = r.DNSOutgoing(const._FLAGS_QR_RESPONSE) + question = r.DNSQuestion("testname.local.", const._TYPE_SRV, const._CLASS_IN) for i in range(10): generated.add_question(question) bytes = generated.packet() @@ -427,7 +430,7 @@ def test_incoming_exception_handling(self): def test_incoming_unknown_type(self): generated = r.DNSOutgoing(0) - answer = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'a') + answer = r.DNSAddress('a', const._TYPE_SOA, const._CLASS_IN, 1, b'a') generated.add_additional_answer(answer) packet = generated.packet() parsed = r.DNSIncoming(packet) @@ -438,7 +441,7 @@ def test_incoming_ipv6(self): addr = "2606:2800:220:1:248:1893:25c8:1946" # example.com packed = socket.inet_pton(socket.AF_INET6, addr) generated = r.DNSOutgoing(0) - answer = r.DNSAddress('domain', r._TYPE_AAAA, r._CLASS_IN | r._CLASS_UNIQUE, 1, packed) + answer = r.DNSAddress('domain', const._TYPE_AAAA, const._CLASS_IN | const._CLASS_UNIQUE, 1, packed) generated.add_additional_answer(answer) packet = generated.packet() parsed = r.DNSIncoming(packet) @@ -449,18 +452,18 @@ def test_incoming_ipv6(self): class TestDNSCache(unittest.TestCase): def test_order(self): - record1 = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'a') - record2 = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'b') + record1 = r.DNSAddress('a', const._TYPE_SOA, const._CLASS_IN, 1, b'a') + record2 = r.DNSAddress('a', const._TYPE_SOA, const._CLASS_IN, 1, b'b') cache = r.DNSCache() cache.add(record1) cache.add(record2) - entry = r.DNSEntry('a', r._TYPE_SOA, r._CLASS_IN) + entry = r.DNSEntry('a', const._TYPE_SOA, const._CLASS_IN) cached_record = cache.get(entry) assert cached_record == record2 def test_cache_empty_does_not_leak_memory_by_leaving_empty_list(self): - record1 = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'a') - record2 = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'b') + record1 = r.DNSAddress('a', const._TYPE_SOA, const._CLASS_IN, 1, b'a') + record2 = r.DNSAddress('a', const._TYPE_SOA, const._CLASS_IN, 1, b'b') cache = r.DNSCache() cache.add(record1) cache.add(record2) @@ -470,8 +473,8 @@ def test_cache_empty_does_not_leak_memory_by_leaving_empty_list(self): assert 'a' not in cache.cache def test_cache_empty_multiple_calls_does_not_throw(self): - record1 = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'a') - record2 = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'b') + record1 = r.DNSAddress('a', const._TYPE_SOA, const._CLASS_IN, 1, b'a') + record2 = r.DNSAddress('a', const._TYPE_SOA, const._CLASS_IN, 1, b'b') cache = r.DNSCache() cache.add(record1) cache.add(record2) @@ -486,7 +489,7 @@ def test_cache_empty_multiple_calls_does_not_throw(self): def test_dns_compression_rollback_for_corruption(): """Verify rolling back does not lead to dns compression corruption.""" - out = r.DNSOutgoing(r._FLAGS_QR_RESPONSE | r._FLAGS_AA) + out = r.DNSOutgoing(const._FLAGS_QR_RESPONSE | const._FLAGS_AA) address = socket.inet_pton(socket.AF_INET, "192.168.208.5") additionals = [ @@ -589,9 +592,9 @@ def test_dns_compression_rollback_for_corruption(): out.add_answer_at_time( DNSText( "HASS Bridge W9DN 5B5CC5._hap._tcp.local.", - r._TYPE_TXT, - r._CLASS_IN | r._CLASS_UNIQUE, - r._DNS_OTHER_TTL, + const._TYPE_TXT, + const._CLASS_IN | const._CLASS_UNIQUE, + const._DNS_OTHER_TTL, b'\x13md=HASS Bridge W9DN\x06pv=1.0\x14id=11:8E:DB:5B:5C:C5\x05c#=12\x04s#=1' b'\x04ff=0\x04ci=2\x04sf=0\x0bsh=6fLM5A==', ), @@ -602,9 +605,9 @@ def test_dns_compression_rollback_for_corruption(): out.add_additional_answer( r.DNSService( record["name"], # type: ignore - r._TYPE_SRV, - r._CLASS_IN | r._CLASS_UNIQUE, - r._DNS_HOST_TTL, + const._TYPE_SRV, + const._CLASS_IN | const._CLASS_UNIQUE, + const._DNS_HOST_TTL, 0, 0, record["port"], # type: ignore @@ -614,18 +617,18 @@ def test_dns_compression_rollback_for_corruption(): out.add_additional_answer( r.DNSText( record["name"], # type: ignore - r._TYPE_TXT, - r._CLASS_IN | r._CLASS_UNIQUE, - r._DNS_OTHER_TTL, + const._TYPE_TXT, + const._CLASS_IN | const._CLASS_UNIQUE, + const._DNS_OTHER_TTL, record["text"], # type: ignore ) ) out.add_additional_answer( r.DNSAddress( record["name"], # type: ignore - r._TYPE_A, - r._CLASS_IN | r._CLASS_UNIQUE, - r._DNS_HOST_TTL, + const._TYPE_A, + const._CLASS_IN | const._CLASS_UNIQUE, + const._DNS_HOST_TTL, record["address"], # type: ignore ) ) @@ -639,17 +642,17 @@ def test_dns_compression_rollback_for_corruption(): def test_tc_bit_in_query_packet(): """Verify the TC bit is set when known answers exceed the packet size.""" - out = r.DNSOutgoing(r._FLAGS_QR_QUERY | r._FLAGS_AA) + out = r.DNSOutgoing(const._FLAGS_QR_QUERY | const._FLAGS_AA) type_ = "_hap._tcp.local." - out.add_question(r.DNSQuestion(type_, r._TYPE_PTR, r._CLASS_IN)) + out.add_question(r.DNSQuestion(type_, const._TYPE_PTR, const._CLASS_IN)) for i in range(30): out.add_answer_at_time( DNSText( ("HASS Bridge W9DN %s._hap._tcp.local." % i), - r._TYPE_TXT, - r._CLASS_IN | r._CLASS_UNIQUE, - r._DNS_OTHER_TTL, + const._TYPE_TXT, + const._CLASS_IN | const._CLASS_UNIQUE, + const._DNS_OTHER_TTL, b'\x13md=HASS Bridge W9DN\x06pv=1.0\x14id=11:8E:DB:5B:5C:C5\x05c#=12\x04s#=1' b'\x04ff=0\x04ci=2\x04sf=0\x0bsh=6fLM5A==', ), @@ -660,28 +663,28 @@ def test_tc_bit_in_query_packet(): assert len(packets) == 3 first_packet = r.DNSIncoming(packets[0]) - assert first_packet.flags & r._FLAGS_TC == r._FLAGS_TC + assert first_packet.flags & const._FLAGS_TC == const._FLAGS_TC assert first_packet.valid is True second_packet = r.DNSIncoming(packets[1]) - assert second_packet.flags & r._FLAGS_TC == r._FLAGS_TC + assert second_packet.flags & const._FLAGS_TC == const._FLAGS_TC assert second_packet.valid is True third_packet = r.DNSIncoming(packets[2]) - assert third_packet.flags & r._FLAGS_TC == 0 + assert third_packet.flags & const._FLAGS_TC == 0 assert third_packet.valid is True def test_tc_bit_not_set_in_answer_packet(): """Verify the TC bit is not set when there are no questions and answers exceed the packet size.""" - out = r.DNSOutgoing(r._FLAGS_QR_RESPONSE | r._FLAGS_AA) + out = r.DNSOutgoing(const._FLAGS_QR_RESPONSE | const._FLAGS_AA) for i in range(30): out.add_answer_at_time( DNSText( ("HASS Bridge W9DN %s._hap._tcp.local." % i), - r._TYPE_TXT, - r._CLASS_IN | r._CLASS_UNIQUE, - r._DNS_OTHER_TTL, + const._TYPE_TXT, + const._CLASS_IN | const._CLASS_UNIQUE, + const._DNS_OTHER_TTL, b'\x13md=HASS Bridge W9DN\x06pv=1.0\x14id=11:8E:DB:5B:5C:C5\x05c#=12\x04s#=1' b'\x04ff=0\x04ci=2\x04sf=0\x0bsh=6fLM5A==', ), @@ -692,13 +695,13 @@ def test_tc_bit_not_set_in_answer_packet(): assert len(packets) == 3 first_packet = r.DNSIncoming(packets[0]) - assert first_packet.flags & r._FLAGS_TC == 0 + assert first_packet.flags & const._FLAGS_TC == 0 assert first_packet.valid is True second_packet = r.DNSIncoming(packets[1]) - assert second_packet.flags & r._FLAGS_TC == 0 + assert second_packet.flags & const._FLAGS_TC == 0 assert second_packet.valid is True third_packet = r.DNSIncoming(packets[2]) - assert third_packet.flags & r._FLAGS_TC == 0 + assert third_packet.flags & const._FLAGS_TC == 0 assert third_packet.valid is True diff --git a/tests/test_init.py b/tests/test_init.py index 1ca4f0c1..f89f786e 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -17,12 +17,7 @@ import pytest import zeroconf as r -from zeroconf import ( - ServiceBrowser, - ServiceInfo, - Zeroconf, - ZeroconfServiceTypes, -) +from zeroconf import ServiceBrowser, ServiceInfo, Zeroconf, ZeroconfServiceTypes, const from . import has_working_ipv6, _clear_cache, _inject_response @@ -43,38 +38,38 @@ def teardown_module(): class Names(unittest.TestCase): def test_long_name(self): - generated = r.DNSOutgoing(r._FLAGS_QR_RESPONSE) + generated = r.DNSOutgoing(const._FLAGS_QR_RESPONSE) question = r.DNSQuestion( - "this.is.a.very.long.name.with.lots.of.parts.in.it.local.", r._TYPE_SRV, r._CLASS_IN + "this.is.a.very.long.name.with.lots.of.parts.in.it.local.", const._TYPE_SRV, const._CLASS_IN ) generated.add_question(question) r.DNSIncoming(generated.packet()) def test_exceedingly_long_name(self): - generated = r.DNSOutgoing(r._FLAGS_QR_RESPONSE) + generated = r.DNSOutgoing(const._FLAGS_QR_RESPONSE) name = "%slocal." % ("part." * 1000) - question = r.DNSQuestion(name, r._TYPE_SRV, r._CLASS_IN) + question = r.DNSQuestion(name, const._TYPE_SRV, const._CLASS_IN) generated.add_question(question) r.DNSIncoming(generated.packet()) def test_extra_exceedingly_long_name(self): - generated = r.DNSOutgoing(r._FLAGS_QR_RESPONSE) + generated = r.DNSOutgoing(const._FLAGS_QR_RESPONSE) name = "%slocal." % ("part." * 4000) - question = r.DNSQuestion(name, r._TYPE_SRV, r._CLASS_IN) + question = r.DNSQuestion(name, const._TYPE_SRV, const._CLASS_IN) generated.add_question(question) r.DNSIncoming(generated.packet()) def test_exceedingly_long_name_part(self): name = "%s.local." % ("a" * 1000) - generated = r.DNSOutgoing(r._FLAGS_QR_RESPONSE) - question = r.DNSQuestion(name, r._TYPE_SRV, r._CLASS_IN) + generated = r.DNSOutgoing(const._FLAGS_QR_RESPONSE) + question = r.DNSQuestion(name, const._TYPE_SRV, const._CLASS_IN) generated.add_question(question) self.assertRaises(r.NamePartTooLongException, generated.packet) def test_same_name(self): name = "paired.local." - generated = r.DNSOutgoing(r._FLAGS_QR_RESPONSE) - question = r.DNSQuestion(name, r._TYPE_SRV, r._CLASS_IN) + generated = r.DNSOutgoing(const._FLAGS_QR_RESPONSE) + question = r.DNSQuestion(name, const._TYPE_SRV, const._CLASS_IN) generated.add_question(question) generated.add_question(question) r.DNSIncoming(generated.packet()) @@ -99,7 +94,7 @@ def test_lots_of_names(self): longest_packet_len = 0 longest_packet = None # type: Optional[r.DNSOutgoing] - def send(out, addr=r._MDNS_ADDR, port=r._MDNS_PORT): + def send(out, addr=const._MDNS_ADDR, port=const._MDNS_PORT): """Sends an outgoing packet.""" for packet in out.packets(): nonlocal longest_packet_len, longest_packet @@ -123,7 +118,7 @@ def on_service_state_change(zeroconf, service_type, state_change, name): # we will never get to this large of a packet given the application-layer # splitting of packets, but we still want to track the longest_packet_len # for the debug message below - while sleep_count < 100 and longest_packet_len < r._MAX_MSG_ABSOLUTE - 100: + while sleep_count < 100 and longest_packet_len < const._MAX_MSG_ABSOLUTE - 100: sleep_count += 1 time.sleep(0.1) @@ -135,8 +130,8 @@ def on_service_state_change(zeroconf, service_type, state_change, name): zeroconf.log.debug('sleep_count %d, sized %d', sleep_count, longest_packet_len) # now the browser has sent at least one request, verify the size - assert longest_packet_len <= r._MAX_MSG_TYPICAL - assert longest_packet_len >= r._MAX_MSG_TYPICAL - 100 + assert longest_packet_len <= const._MAX_MSG_TYPICAL + assert longest_packet_len >= const._MAX_MSG_TYPICAL - 100 # mock zeroconf's logger warning() and debug() from unittest.mock import patch @@ -167,8 +162,8 @@ def on_service_state_change(zeroconf, service_type, state_change, name): # mock the zeroconf logger and check for the correct logging backoff call_counts = mocked_log_warn.call_count, mocked_log_debug.call_count # force receive on oversized packet - s.sendto(packet, 0, (r._MDNS_ADDR, r._MDNS_PORT)) - s.sendto(packet, 0, (r._MDNS_ADDR, r._MDNS_PORT)) + s.sendto(packet, 0, (const._MDNS_ADDR, const._MDNS_PORT)) + s.sendto(packet, 0, (const._MDNS_ADDR, const._MDNS_PORT)) time.sleep(2.0) zeroconf.log.debug( 'warn %d debug %d was %s', mocked_log_warn.call_count, mocked_log_debug.call_count, call_counts @@ -238,10 +233,21 @@ def generate_many_hosts(self, zc, type_, name, number_hosts): @staticmethod def generate_host(zc, host_name, type_): name = '.'.join((host_name, type_)) - out = r.DNSOutgoing(r._FLAGS_QR_RESPONSE | r._FLAGS_AA) - out.add_answer_at_time(r.DNSPointer(type_, r._TYPE_PTR, r._CLASS_IN, r._DNS_OTHER_TTL, name), 0) + out = r.DNSOutgoing(const._FLAGS_QR_RESPONSE | const._FLAGS_AA) + out.add_answer_at_time( + r.DNSPointer(type_, const._TYPE_PTR, const._CLASS_IN, const._DNS_OTHER_TTL, name), 0 + ) out.add_answer_at_time( - r.DNSService(type_, r._TYPE_SRV, r._CLASS_IN | r._CLASS_UNIQUE, r._DNS_HOST_TTL, 0, 0, 80, name), + r.DNSService( + type_, + const._TYPE_SRV, + const._CLASS_IN | const._CLASS_UNIQUE, + const._DNS_HOST_TTL, + 0, + 0, + 80, + name, + ), 0, ) zc.send(out) @@ -275,10 +281,10 @@ def test_ttl(self): def get_ttl(record_type): if expected_ttl is not None: return expected_ttl - elif record_type in [r._TYPE_A, r._TYPE_SRV]: - return r._DNS_HOST_TTL + elif record_type in [const._TYPE_A, const._TYPE_SRV]: + return const._DNS_HOST_TTL else: - return r._DNS_OTHER_TTL + return const._DNS_OTHER_TTL def _process_outgoing_packet(out): """Sends an outgoing packet.""" @@ -305,12 +311,12 @@ def _process_outgoing_packet(out): nbr_answers = nbr_additionals = nbr_authorities = 0 # query - query = r.DNSOutgoing(r._FLAGS_QR_QUERY | r._FLAGS_AA) + query = r.DNSOutgoing(const._FLAGS_QR_QUERY | const._FLAGS_AA) assert query.is_query() is True - query.add_question(r.DNSQuestion(info.type, r._TYPE_PTR, r._CLASS_IN)) - query.add_question(r.DNSQuestion(info.name, r._TYPE_SRV, r._CLASS_IN)) - query.add_question(r.DNSQuestion(info.name, r._TYPE_TXT, r._CLASS_IN)) - query.add_question(r.DNSQuestion(info.server, r._TYPE_A, r._CLASS_IN)) + query.add_question(r.DNSQuestion(info.type, const._TYPE_PTR, const._CLASS_IN)) + query.add_question(r.DNSQuestion(info.name, const._TYPE_SRV, const._CLASS_IN)) + query.add_question(r.DNSQuestion(info.name, const._TYPE_TXT, const._CLASS_IN)) + query.add_question(r.DNSQuestion(info.server, const._TYPE_A, const._CLASS_IN)) _process_outgoing_packet(zc.query_handler.response(r.DNSIncoming(query.packet()), False)) assert nbr_answers == 4 and nbr_additionals == 4 and nbr_authorities == 0 nbr_answers = nbr_additionals = nbr_authorities = 0 @@ -328,8 +334,8 @@ def _process_outgoing_packet(out): _process_outgoing_packet(zc.generate_service_query(info)) zc.registry.add(info) # register service with custom TTL - expected_ttl = r._DNS_HOST_TTL * 2 - assert expected_ttl != r._DNS_HOST_TTL + expected_ttl = const._DNS_HOST_TTL * 2 + assert expected_ttl != const._DNS_HOST_TTL for _ in range(3): _process_outgoing_packet(zc.generate_service_broadcast(info, expected_ttl)) assert nbr_answers == 12 and nbr_additionals == 0 and nbr_authorities == 3 @@ -337,11 +343,11 @@ def _process_outgoing_packet(out): # query expected_ttl = None - query = r.DNSOutgoing(r._FLAGS_QR_QUERY | r._FLAGS_AA) - query.add_question(r.DNSQuestion(info.type, r._TYPE_PTR, r._CLASS_IN)) - query.add_question(r.DNSQuestion(info.name, r._TYPE_SRV, r._CLASS_IN)) - query.add_question(r.DNSQuestion(info.name, r._TYPE_TXT, r._CLASS_IN)) - query.add_question(r.DNSQuestion(info.server, r._TYPE_A, r._CLASS_IN)) + query = r.DNSOutgoing(const._FLAGS_QR_QUERY | const._FLAGS_AA) + query.add_question(r.DNSQuestion(info.type, const._TYPE_PTR, const._CLASS_IN)) + query.add_question(r.DNSQuestion(info.name, const._TYPE_SRV, const._CLASS_IN)) + query.add_question(r.DNSQuestion(info.name, const._TYPE_TXT, const._CLASS_IN)) + query.add_question(r.DNSQuestion(info.server, const._TYPE_A, const._CLASS_IN)) _process_outgoing_packet(zc.query_handler.response(r.DNSIncoming(query.packet()), False)) assert nbr_answers == 4 and nbr_additionals == 4 and nbr_authorities == 0 nbr_answers = nbr_additionals = nbr_authorities = 0 @@ -405,8 +411,8 @@ def test_register_and_lookup_type_by_uppercase_name(self): info.load_from_cache(zc) assert info.addresses == [] - out = r.DNSOutgoing(r._FLAGS_QR_QUERY) - out.add_question(r.DNSQuestion(type_.upper(), r._TYPE_PTR, r._CLASS_IN)) + out = r.DNSOutgoing(const._FLAGS_QR_QUERY) + out.add_question(r.DNSQuestion(type_.upper(), const._TYPE_PTR, const._CLASS_IN)) zc.send(out) time.sleep(0.5) info = ServiceInfo(type_, registration_name) @@ -786,7 +792,7 @@ def update_service(self, zc, type_, name) -> None: def mock_incoming_msg(service_state_change: r.ServiceStateChange) -> r.DNSIncoming: - generated = r.DNSOutgoing(r._FLAGS_QR_RESPONSE) + generated = r.DNSOutgoing(const._FLAGS_QR_RESPONSE) assert generated.is_response() is True if service_state_change == r.ServiceStateChange.Removed: @@ -795,12 +801,22 @@ def mock_incoming_msg(service_state_change: r.ServiceStateChange) -> r.DNSIncomi ttl = 120 generated.add_answer_at_time( - r.DNSText(service_name, r._TYPE_TXT, r._CLASS_IN | r._CLASS_UNIQUE, ttl, service_text), 0 + r.DNSText( + service_name, const._TYPE_TXT, const._CLASS_IN | const._CLASS_UNIQUE, ttl, service_text + ), + 0, ) generated.add_answer_at_time( r.DNSService( - service_name, r._TYPE_SRV, r._CLASS_IN | r._CLASS_UNIQUE, ttl, 0, 0, 80, service_server + service_name, + const._TYPE_SRV, + const._CLASS_IN | const._CLASS_UNIQUE, + ttl, + 0, + 0, + 80, + service_server, ), 0, ) @@ -812,8 +828,8 @@ def mock_incoming_msg(service_state_change: r.ServiceStateChange) -> r.DNSIncomi generated.add_answer_at_time( r.DNSAddress( service_server, - r._TYPE_AAAA, - r._CLASS_IN | r._CLASS_UNIQUE, + const._TYPE_AAAA, + const._CLASS_IN | const._CLASS_UNIQUE, ttl, socket.inet_pton(socket.AF_INET6, service_v6_address), ), @@ -822,8 +838,8 @@ def mock_incoming_msg(service_state_change: r.ServiceStateChange) -> r.DNSIncomi generated.add_answer_at_time( r.DNSAddress( service_server, - r._TYPE_AAAA, - r._CLASS_IN | r._CLASS_UNIQUE, + const._TYPE_AAAA, + const._CLASS_IN | const._CLASS_UNIQUE, ttl, socket.inet_pton(socket.AF_INET6, service_v6_second_address), ), @@ -832,8 +848,8 @@ def mock_incoming_msg(service_state_change: r.ServiceStateChange) -> r.DNSIncomi generated.add_answer_at_time( r.DNSAddress( service_server, - r._TYPE_A, - r._CLASS_IN | r._CLASS_UNIQUE, + const._TYPE_A, + const._CLASS_IN | const._CLASS_UNIQUE, ttl, socket.inet_aton(service_address), ), @@ -841,7 +857,7 @@ def mock_incoming_msg(service_state_change: r.ServiceStateChange) -> r.DNSIncomi ) generated.add_answer_at_time( - r.DNSPointer(service_type, r._TYPE_PTR, r._CLASS_IN, ttl, service_name), 0 + r.DNSPointer(service_type, const._TYPE_PTR, const._CLASS_IN, ttl, service_name), 0 ) return r.DNSIncoming(generated.packet()) @@ -1003,19 +1019,19 @@ def test_ptr_optimization(): nbr_answers = nbr_additionals = nbr_authorities = 0 # query - query = r.DNSOutgoing(r._FLAGS_QR_QUERY | r._FLAGS_AA) - query.add_question(r.DNSQuestion(info.type, r._TYPE_PTR, r._CLASS_IN)) + query = r.DNSOutgoing(const._FLAGS_QR_QUERY | const._FLAGS_AA) + query.add_question(r.DNSQuestion(info.type, const._TYPE_PTR, const._CLASS_IN)) out = zc.query_handler.response(r.DNSIncoming(query.packet()), False) assert out is not None nbr_answers += len(out.answers) nbr_authorities += len(out.authorities) for answer in out.additionals: nbr_additionals += 1 - if answer.type == r._TYPE_SRV: + if answer.type == const._TYPE_SRV: has_srv = True - elif answer.type == r._TYPE_TXT: + elif answer.type == const._TYPE_TXT: has_txt = True - elif answer.type == r._TYPE_A: + elif answer.type == const._TYPE_A: has_a = True assert nbr_answers == 1 and nbr_additionals == 3 and nbr_authorities == 0 assert has_srv and has_txt and has_a diff --git a/tests/test_services.py b/tests/test_services.py index 7cf476b4..07554eb6 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -14,6 +14,7 @@ import pytest import zeroconf as r +from zeroconf import const import zeroconf.services as s from zeroconf.core import Zeroconf from zeroconf.services import ( @@ -75,8 +76,8 @@ def test_service_info_rejects_non_matching_updates(self): now, r.DNSText( service_name, - r._TYPE_TXT, - r._CLASS_IN | r._CLASS_UNIQUE, + const._TYPE_TXT, + const._CLASS_IN | const._CLASS_UNIQUE, ttl, b'\x04ff=0\x04ci=2\x04sf=0\x0bsh=6fLM5A==', ), @@ -87,8 +88,8 @@ def test_service_info_rejects_non_matching_updates(self): now, r.DNSService( service_name, - r._TYPE_SRV, - r._CLASS_IN | r._CLASS_UNIQUE, + const._TYPE_SRV, + const._CLASS_IN | const._CLASS_UNIQUE, ttl, 0, 0, @@ -104,8 +105,8 @@ def test_service_info_rejects_non_matching_updates(self): now, r.DNSAddress( 'ASH-2.local.', - r._TYPE_A, - r._CLASS_IN | r._CLASS_UNIQUE, + const._TYPE_A, + const._CLASS_IN | const._CLASS_UNIQUE, ttl, new_address, ), @@ -117,8 +118,8 @@ def test_service_info_rejects_non_matching_updates(self): now, r.DNSText( "incorrect.name.", - r._TYPE_TXT, - r._CLASS_IN | r._CLASS_UNIQUE, + const._TYPE_TXT, + const._CLASS_IN | const._CLASS_UNIQUE, ttl, b'\x04ff=0\x04ci=3\x04sf=0\x0bsh=6fLM5A==', ), @@ -129,8 +130,8 @@ def test_service_info_rejects_non_matching_updates(self): now, r.DNSService( "incorrect.name.", - r._TYPE_SRV, - r._CLASS_IN | r._CLASS_UNIQUE, + const._TYPE_SRV, + const._CLASS_IN | const._CLASS_UNIQUE, ttl, 0, 0, @@ -146,8 +147,8 @@ def test_service_info_rejects_non_matching_updates(self): now, r.DNSAddress( "incorrect.name.", - r._TYPE_A, - r._CLASS_IN | r._CLASS_UNIQUE, + const._TYPE_A, + const._CLASS_IN | const._CLASS_UNIQUE, ttl, new_address, ), @@ -174,8 +175,8 @@ def test_service_info_rejects_expired_records(self): now, r.DNSText( service_name, - r._TYPE_TXT, - r._CLASS_IN | r._CLASS_UNIQUE, + const._TYPE_TXT, + const._CLASS_IN | const._CLASS_UNIQUE, ttl, b'\x04ff=0\x04ci=2\x04sf=0\x0bsh=6fLM5A==', ), @@ -184,8 +185,8 @@ def test_service_info_rejects_expired_records(self): # Expired record expired_record = r.DNSText( service_name, - r._TYPE_TXT, - r._CLASS_IN | r._CLASS_UNIQUE, + const._TYPE_TXT, + const._CLASS_IN | const._CLASS_UNIQUE, ttl, b'\x04ff=0\x04ci=3\x04sf=0\x0bsh=6fLM5A==', ) @@ -211,7 +212,7 @@ def test_get_info_partial(self): last_sent = None # type: Optional[r.DNSOutgoing] - def send(out, addr=r._MDNS_ADDR, port=r._MDNS_PORT): + def send(out, addr=const._MDNS_ADDR, port=const._MDNS_PORT): """Sends an outgoing packet.""" nonlocal last_sent @@ -223,7 +224,7 @@ def send(out, addr=r._MDNS_ADDR, port=r._MDNS_PORT): def mock_incoming_msg(records) -> r.DNSIncoming: - generated = r.DNSOutgoing(r._FLAGS_QR_RESPONSE) + generated = r.DNSOutgoing(const._FLAGS_QR_RESPONSE) for record in records: generated.add_answer_at_time(record, 0) @@ -247,10 +248,10 @@ def get_service_info_helper(zc, type, name): send_event.wait(wait_time) assert last_sent is not None assert len(last_sent.questions) == 4 - assert r.DNSQuestion(service_name, r._TYPE_SRV, r._CLASS_IN) in last_sent.questions - assert r.DNSQuestion(service_name, r._TYPE_TXT, r._CLASS_IN) in last_sent.questions - assert r.DNSQuestion(service_name, r._TYPE_A, r._CLASS_IN) in last_sent.questions - assert r.DNSQuestion(service_name, r._TYPE_AAAA, r._CLASS_IN) in last_sent.questions + assert r.DNSQuestion(service_name, const._TYPE_SRV, const._CLASS_IN) in last_sent.questions + assert r.DNSQuestion(service_name, const._TYPE_TXT, const._CLASS_IN) in last_sent.questions + assert r.DNSQuestion(service_name, const._TYPE_A, const._CLASS_IN) in last_sent.questions + assert r.DNSQuestion(service_name, const._TYPE_AAAA, const._CLASS_IN) in last_sent.questions assert service_info is None # Expext query for SRV, A, AAAA @@ -259,15 +260,23 @@ def get_service_info_helper(zc, type, name): _inject_response( zc, mock_incoming_msg( - [r.DNSText(service_name, r._TYPE_TXT, r._CLASS_IN | r._CLASS_UNIQUE, ttl, service_text)] + [ + r.DNSText( + service_name, + const._TYPE_TXT, + const._CLASS_IN | const._CLASS_UNIQUE, + ttl, + service_text, + ) + ] ), ) send_event.wait(wait_time) assert last_sent is not None assert len(last_sent.questions) == 3 - assert r.DNSQuestion(service_name, r._TYPE_SRV, r._CLASS_IN) in last_sent.questions - assert r.DNSQuestion(service_name, r._TYPE_A, r._CLASS_IN) in last_sent.questions - assert r.DNSQuestion(service_name, r._TYPE_AAAA, r._CLASS_IN) in last_sent.questions + assert r.DNSQuestion(service_name, const._TYPE_SRV, const._CLASS_IN) in last_sent.questions + assert r.DNSQuestion(service_name, const._TYPE_A, const._CLASS_IN) in last_sent.questions + assert r.DNSQuestion(service_name, const._TYPE_AAAA, const._CLASS_IN) in last_sent.questions assert service_info is None # Expext query for A, AAAA @@ -279,8 +288,8 @@ def get_service_info_helper(zc, type, name): [ r.DNSService( service_name, - r._TYPE_SRV, - r._CLASS_IN | r._CLASS_UNIQUE, + const._TYPE_SRV, + const._CLASS_IN | const._CLASS_UNIQUE, ttl, 0, 0, @@ -293,8 +302,8 @@ def get_service_info_helper(zc, type, name): send_event.wait(wait_time) assert last_sent is not None assert len(last_sent.questions) == 2 - assert r.DNSQuestion(service_server, r._TYPE_A, r._CLASS_IN) in last_sent.questions - assert r.DNSQuestion(service_server, r._TYPE_AAAA, r._CLASS_IN) in last_sent.questions + assert r.DNSQuestion(service_server, const._TYPE_A, const._CLASS_IN) in last_sent.questions + assert r.DNSQuestion(service_server, const._TYPE_AAAA, const._CLASS_IN) in last_sent.questions last_sent = None assert service_info is None @@ -307,8 +316,8 @@ def get_service_info_helper(zc, type, name): [ r.DNSAddress( service_server, - r._TYPE_A, - r._CLASS_IN | r._CLASS_UNIQUE, + const._TYPE_A, + const._CLASS_IN | const._CLASS_UNIQUE, ttl, socket.inet_pton(socket.AF_INET, service_address), ) @@ -340,7 +349,7 @@ def test_get_info_single(self): last_sent = None # type: Optional[r.DNSOutgoing] - def send(out, addr=r._MDNS_ADDR, port=r._MDNS_PORT): + def send(out, addr=const._MDNS_ADDR, port=const._MDNS_PORT): """Sends an outgoing packet.""" nonlocal last_sent @@ -352,7 +361,7 @@ def send(out, addr=r._MDNS_ADDR, port=r._MDNS_PORT): def mock_incoming_msg(records) -> r.DNSIncoming: - generated = r.DNSOutgoing(r._FLAGS_QR_RESPONSE) + generated = r.DNSOutgoing(const._FLAGS_QR_RESPONSE) for record in records: generated.add_answer_at_time(record, 0) @@ -376,10 +385,10 @@ def get_service_info_helper(zc, type, name): send_event.wait(wait_time) assert last_sent is not None assert len(last_sent.questions) == 4 - assert r.DNSQuestion(service_name, r._TYPE_SRV, r._CLASS_IN) in last_sent.questions - assert r.DNSQuestion(service_name, r._TYPE_TXT, r._CLASS_IN) in last_sent.questions - assert r.DNSQuestion(service_name, r._TYPE_A, r._CLASS_IN) in last_sent.questions - assert r.DNSQuestion(service_name, r._TYPE_AAAA, r._CLASS_IN) in last_sent.questions + assert r.DNSQuestion(service_name, const._TYPE_SRV, const._CLASS_IN) in last_sent.questions + assert r.DNSQuestion(service_name, const._TYPE_TXT, const._CLASS_IN) in last_sent.questions + assert r.DNSQuestion(service_name, const._TYPE_A, const._CLASS_IN) in last_sent.questions + assert r.DNSQuestion(service_name, const._TYPE_AAAA, const._CLASS_IN) in last_sent.questions assert service_info is None # Expext no further queries @@ -390,12 +399,16 @@ def get_service_info_helper(zc, type, name): mock_incoming_msg( [ r.DNSText( - service_name, r._TYPE_TXT, r._CLASS_IN | r._CLASS_UNIQUE, ttl, service_text + service_name, + const._TYPE_TXT, + const._CLASS_IN | const._CLASS_UNIQUE, + ttl, + service_text, ), r.DNSService( service_name, - r._TYPE_SRV, - r._CLASS_IN | r._CLASS_UNIQUE, + const._TYPE_SRV, + const._CLASS_IN | const._CLASS_UNIQUE, ttl, 0, 0, @@ -404,8 +417,8 @@ def get_service_info_helper(zc, type, name): ), r.DNSAddress( service_server, - r._TYPE_A, - r._CLASS_IN | r._CLASS_UNIQUE, + const._TYPE_A, + const._CLASS_IN | const._CLASS_UNIQUE, ttl, socket.inet_pton(socket.AF_INET, service_address), ), @@ -449,9 +462,9 @@ def remove_service(self, zc, type_, name) -> None: def mock_incoming_msg( service_state_change: r.ServiceStateChange, service_type: str, service_name: str, ttl: int ) -> r.DNSIncoming: - generated = r.DNSOutgoing(r._FLAGS_QR_RESPONSE) + generated = r.DNSOutgoing(const._FLAGS_QR_RESPONSE) generated.add_answer_at_time( - r.DNSPointer(service_type, r._TYPE_PTR, r._CLASS_IN, ttl, service_name), 0 + r.DNSPointer(service_type, const._TYPE_PTR, const._CLASS_IN, ttl, service_name), 0 ) return r.DNSIncoming(generated.packet()) @@ -476,7 +489,7 @@ def mock_incoming_msg( def _mock_get_expiration_time(self, percent): nonlocal called_with_refresh_time_check - if percent == r._EXPIRE_REFRESH_TIME_PERCENT: + if percent == const._EXPIRE_REFRESH_TIME_PERCENT: called_with_refresh_time_check = True return 0 return self.created + (percent * self.ttl * 10) @@ -541,7 +554,7 @@ def current_time_millis(): """Current system time in milliseconds""" return start_time + time_offset * 1000 - def send(out, addr=r._MDNS_ADDR, port=r._MDNS_PORT): + def send(out, addr=const._MDNS_ADDR, port=const._MDNS_PORT): """Sends an outgoing packet.""" got_query.set() old_send(out, addr=addr, port=port) @@ -619,11 +632,11 @@ def current_time_millis(): """Current system time in milliseconds""" return time.time() * 1000 + time_offset * 1000 - expected_ttl = r._DNS_HOST_TTL + expected_ttl = const._DNS_HOST_TTL nbr_answers = 0 - def send(out, addr=r._MDNS_ADDR, port=r._MDNS_PORT): + def send(out, addr=const._MDNS_ADDR, port=const._MDNS_PORT): """Sends an outgoing packet.""" pout = r.DNSIncoming(out.packet()) nonlocal nbr_answers @@ -693,7 +706,7 @@ def test_legacy_record_update_listener(): with pytest.raises(RuntimeError): r.RecordUpdateListener().update_record( - zc, 0, r.DNSRecord('irrelevant', r._TYPE_SRV, r._CLASS_IN, r._DNS_HOST_TTL) + zc, 0, r.DNSRecord('irrelevant', const._TYPE_SRV, const._CLASS_IN, const._DNS_HOST_TTL) ) updates = [] diff --git a/zeroconf/__init__.py b/zeroconf/__init__.py index 8242c4e1..043de013 100644 --- a/zeroconf/__init__.py +++ b/zeroconf/__init__.py @@ -22,57 +22,6 @@ import sys -from .const import ( # noqa # import needed for backwards compat - _BROWSER_BACKOFF_LIMIT, - _BROWSER_TIME, - _CACHE_CLEANUP_INTERVAL, - _CHECK_TIME, - _CLASSES, - _CLASS_IN, - _CLASS_NONE, - _CLASS_MASK, - _CLASS_UNIQUE, - _DNS_HOST_TTL, - _DNS_OTHER_TTL, - _DNS_PORT, - _EXPIRE_FULL_TIME_PERCENT, - _EXPIRE_REFRESH_TIME_PERCENT, - _EXPIRE_STALE_TIME_PERCENT, - _FLAGS_AA, - _FLAGS_QR_MASK, - _FLAGS_QR_QUERY, - _FLAGS_QR_RESPONSE, - _FLAGS_TC, - _HAS_ASCII_CONTROL_CHARS, - _HAS_A_TO_Z, - _HAS_ONLY_A_TO_Z_NUM_HYPHEN, - _HAS_ONLY_A_TO_Z_NUM_HYPHEN_UNDERSCORE, - _IPPROTO_IPV6, - _LISTENER_TIME, - _LOCAL_TRAILER, - _MAX_MSG_ABSOLUTE, - _MAX_MSG_TYPICAL, - _MDNS_ADDR, - _MDNS_ADDR6, - _MDNS_ADDR6_BYTES, - _MDNS_ADDR_BYTES, - _MDNS_PORT, - _NONTCP_PROTOCOL_LOCAL_TRAILER, - _REGISTER_TIME, - _SERVICE_TYPE_ENUMERATION_NAME, - _TCP_PROTOCOL_LOCAL_TRAILER, - _TYPES, - _TYPE_A, - _TYPE_AAAA, - _TYPE_ANY, - _TYPE_CNAME, - _TYPE_HINFO, - _TYPE_PTR, - _TYPE_SOA, - _TYPE_SRV, - _TYPE_TXT, - _UNREGISTER_TIME, -) from .core import NotifyListener, Zeroconf # noqa # import needed for backwards compat from .dns import ( # noqa # import needed for backwards compat DNSAddress, @@ -102,7 +51,6 @@ Signal, SignalRegistrationInterface, RecordUpdateListener, - _ServiceBrowserBase, ServiceBrowser, ServiceInfo, ServiceListener, @@ -120,8 +68,6 @@ InterfaceChoice, InterfacesType, IPVersion, - _is_v6_address, - _encode_address, get_all_addresses, ) from .utils.struct import int2byte # noqa # import needed for backwards compat