diff --git a/tests/test_dns.py b/tests/test_dns.py index 8117339d..de0e4932 100644 --- a/tests/test_dns.py +++ b/tests/test_dns.py @@ -416,6 +416,74 @@ def test_numbers_questions(self): assert num_additionals == 0 +class TestDnsIncoming(unittest.TestCase): + def test_incoming_exception_handling(self): + generated = r.DNSOutgoing(0) + packet = generated.packet() + packet = packet[:8] + b'deadbeef' + packet[8:] + parsed = r.DNSIncoming(packet) + parsed = r.DNSIncoming(packet) + assert parsed.valid is False + + def test_incoming_unknown_type(self): + generated = r.DNSOutgoing(0) + answer = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'a') + generated.add_additional_answer(answer) + packet = generated.packet() + parsed = r.DNSIncoming(packet) + assert len(parsed.answers) == 0 + assert parsed.is_query() != parsed.is_response() + + def test_incoming_ipv6(self): + addr = "2606:2800:220:1:248:1893:25c8:1946" # example.com + packed = socket.inet_pton(socket.AF_INET6, addr) + generated = r.DNSOutgoing(0) + answer = r.DNSAddress('domain', r._TYPE_AAAA, r._CLASS_IN | r._CLASS_UNIQUE, 1, packed) + generated.add_additional_answer(answer) + packet = generated.packet() + parsed = r.DNSIncoming(packet) + record = parsed.answers[0] + assert isinstance(record, r.DNSAddress) + assert record.address == packed + + +class TestDNSCache(unittest.TestCase): + def test_order(self): + record1 = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'a') + record2 = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'b') + cache = r.DNSCache() + cache.add(record1) + cache.add(record2) + entry = r.DNSEntry('a', r._TYPE_SOA, r._CLASS_IN) + cached_record = cache.get(entry) + assert cached_record == record2 + + def test_cache_empty_does_not_leak_memory_by_leaving_empty_list(self): + record1 = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'a') + record2 = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'b') + cache = r.DNSCache() + cache.add(record1) + cache.add(record2) + assert 'a' in cache.cache + cache.remove(record1) + cache.remove(record2) + assert 'a' not in cache.cache + + def test_cache_empty_multiple_calls_does_not_throw(self): + record1 = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'a') + record2 = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'b') + cache = r.DNSCache() + cache.add(record1) + cache.add(record2) + assert 'a' in cache.cache + cache.remove(record1) + cache.remove(record2) + # Ensure multiple removes does not throw + cache.remove(record1) + cache.remove(record2) + assert 'a' not in cache.cache + + def test_dns_compression_rollback_for_corruption(): """Verify rolling back does not lead to dns compression corruption.""" out = r.DNSOutgoing(r._FLAGS_QR_RESPONSE | r._FLAGS_AA) diff --git a/tests/test_init.py b/tests/test_init.py index f64443f0..1ca4f0c1 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -247,37 +247,6 @@ def generate_host(zc, host_name, type_): zc.send(out) -class TestDnsIncoming(unittest.TestCase): - def test_incoming_exception_handling(self): - generated = r.DNSOutgoing(0) - packet = generated.packet() - packet = packet[:8] + b'deadbeef' + packet[8:] - parsed = r.DNSIncoming(packet) - parsed = r.DNSIncoming(packet) - assert parsed.valid is False - - def test_incoming_unknown_type(self): - generated = r.DNSOutgoing(0) - answer = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'a') - generated.add_additional_answer(answer) - packet = generated.packet() - parsed = r.DNSIncoming(packet) - assert len(parsed.answers) == 0 - assert parsed.is_query() != parsed.is_response() - - def test_incoming_ipv6(self): - addr = "2606:2800:220:1:248:1893:25c8:1946" # example.com - packed = socket.inet_pton(socket.AF_INET6, addr) - generated = r.DNSOutgoing(0) - answer = r.DNSAddress('domain', r._TYPE_AAAA, r._CLASS_IN | r._CLASS_UNIQUE, 1, packed) - generated.add_additional_answer(answer) - packet = generated.packet() - parsed = r.DNSIncoming(packet) - record = parsed.answers[0] - assert isinstance(record, r.DNSAddress) - assert record.address == packed - - class TestRegistrar(unittest.TestCase): def test_ttl(self): @@ -484,43 +453,6 @@ def test_lookups(self): assert registry.get_types() == [type_] -class TestDNSCache(unittest.TestCase): - def test_order(self): - record1 = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'a') - record2 = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'b') - cache = r.DNSCache() - cache.add(record1) - cache.add(record2) - entry = r.DNSEntry('a', r._TYPE_SOA, r._CLASS_IN) - cached_record = cache.get(entry) - assert cached_record == record2 - - def test_cache_empty_does_not_leak_memory_by_leaving_empty_list(self): - record1 = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'a') - record2 = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'b') - cache = r.DNSCache() - cache.add(record1) - cache.add(record2) - assert 'a' in cache.cache - cache.remove(record1) - cache.remove(record2) - assert 'a' not in cache.cache - - def test_cache_empty_multiple_calls_does_not_throw(self): - record1 = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'a') - record2 = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'b') - cache = r.DNSCache() - cache.add(record1) - cache.add(record2) - assert 'a' in cache.cache - cache.remove(record1) - cache.remove(record2) - # Ensure multiple removes does not throw - cache.remove(record1) - cache.remove(record2) - assert 'a' not in cache.cache - - class ServiceTypesQuery(unittest.TestCase): def test_integration_with_listener(self):