Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ python:
- "pypy3.5"
- "pypy3"
install:
- pip install -r requirements-dev.txt
- pip install --upgrade -r requirements-dev.txt
# mypy can't be installed on pypy
- if [[ "${TRAVIS_PYTHON_VERSION}" != "pypy"* ]] ; then pip install mypy ; fi
- if [[ "${TRAVIS_PYTHON_VERSION}" != *"3.5"* && "${TRAVIS_PYTHON_VERSION}" != "pypy"* ]] ; then
pip install black ; fi
script:
# no IPv6 support in Travis :(
- make TEST_ARGS='-a "!IPv6"' ci
- SKIP_IPV6=1 make ci
after_success:
- coveralls
5 changes: 2 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
MAX_LINE_LENGTH=110
PYTHON_IMPLEMENTATION:=$(shell python -c "import sys;import platform;sys.stdout.write(platform.python_implementation())")
PYTHON_VERSION:=$(shell python -c "import sys;sys.stdout.write('%d.%d' % sys.version_info[:2])")
TEST_ARGS=

LINT_TARGETS:=flake8

Expand Down Expand Up @@ -40,10 +39,10 @@ mypy:
mypy examples/*.py zeroconf/*.py

test:
nosetests -v $(TEST_ARGS)
pytest -v zeroconf/test.py

test_coverage:
nosetests -v --with-coverage --cover-package=zeroconf $(TEST_ARGS)
pytest -v --cov=zeroconf --cov-branch --cov-report html --cov-report term-missing zeroconf/test.py

autopep8:
autopep8 --max-line-length=$(MAX_LINE_LENGTH) -i setup.py examples zeroconf
3 changes: 2 additions & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ coverage
flake8>=3.6.0
flake8-import-order
ifaddr
nose
pep8-naming!=0.6.0
pytest
pytest-cov
54 changes: 26 additions & 28 deletions zeroconf/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import copy
import logging
import os
import socket
import struct
import time
Expand All @@ -14,8 +15,6 @@
from typing import Dict, Optional # noqa # used in type hints
from typing import cast

from nose.plugins.attrib import attr

import zeroconf as r
from zeroconf import (
DNSHinfo,
Expand Down Expand Up @@ -169,17 +168,17 @@ def test_parse_own_packet_response(self):
0,
)
parsed = r.DNSIncoming(generated.packet())
self.assertEqual(len(generated.answers), 1)
self.assertEqual(len(generated.answers), len(parsed.answers))
assert len(generated.answers) == 1
assert len(generated.answers) == len(parsed.answers)

def test_match_question(self):
generated = r.DNSOutgoing(r._FLAGS_QR_QUERY)
question = r.DNSQuestion("testname.local.", r._TYPE_SRV, r._CLASS_IN)
generated.add_question(question)
parsed = r.DNSIncoming(generated.packet())
self.assertEqual(len(generated.questions), 1)
self.assertEqual(len(generated.questions), len(parsed.questions))
self.assertEqual(question, parsed.questions[0])
assert len(generated.questions) == 1
assert len(generated.questions) == len(parsed.questions)
assert question == parsed.questions[0]

def test_suppress_answer(self):
query_generated = r.DNSOutgoing(r._FLAGS_QR_QUERY)
Expand Down Expand Up @@ -253,8 +252,8 @@ def test_dns_hinfo(self):
generated.add_additional_answer(DNSHinfo('irrelevant', r._TYPE_HINFO, 0, 0, 'cpu', 'os'))
parsed = r.DNSIncoming(generated.packet())
answer = cast(r.DNSHinfo, parsed.answers[0])
self.assertEqual(answer.cpu, u'cpu')
self.assertEqual(answer.os, u'os')
assert answer.cpu == u'cpu'
assert answer.os == u'os'

generated = r.DNSOutgoing(0)
generated.add_additional_answer(DNSHinfo('irrelevant', r._TYPE_HINFO, 0, 0, 'cpu', 'x' * 257))
Expand All @@ -267,28 +266,28 @@ def test_transaction_id(self):
generated = r.DNSOutgoing(r._FLAGS_QR_QUERY)
bytes = generated.packet()
id = bytes[0] << 8 | bytes[1]
self.assertEqual(id, 0)
assert id == 0

def test_query_header_bits(self):
generated = r.DNSOutgoing(r._FLAGS_QR_QUERY)
bytes = generated.packet()
flags = bytes[2] << 8 | bytes[3]
self.assertEqual(flags, 0x0)
assert flags == 0x0

def test_response_header_bits(self):
generated = r.DNSOutgoing(r._FLAGS_QR_RESPONSE)
bytes = generated.packet()
flags = bytes[2] << 8 | bytes[3]
self.assertEqual(flags, 0x8000)
assert flags == 0x8000

def test_numbers(self):
generated = r.DNSOutgoing(r._FLAGS_QR_RESPONSE)
bytes = generated.packet()
(num_questions, num_answers, num_authorities, num_additionals) = struct.unpack('!4H', bytes[4:12])
self.assertEqual(num_questions, 0)
self.assertEqual(num_answers, 0)
self.assertEqual(num_authorities, 0)
self.assertEqual(num_additionals, 0)
assert num_questions == 0
assert num_answers == 0
assert num_authorities == 0
assert num_additionals == 0

def test_numbers_questions(self):
generated = r.DNSOutgoing(r._FLAGS_QR_RESPONSE)
Expand All @@ -297,10 +296,10 @@ def test_numbers_questions(self):
generated.add_question(question)
bytes = generated.packet()
(num_questions, num_answers, num_authorities, num_additionals) = struct.unpack('!4H', bytes[4:12])
self.assertEqual(num_questions, 10)
self.assertEqual(num_answers, 0)
self.assertEqual(num_authorities, 0)
self.assertEqual(num_additionals, 0)
assert num_questions == 10
assert num_answers == 0
assert num_authorities == 0
assert num_additionals == 0


class Names(unittest.TestCase):
Expand Down Expand Up @@ -502,15 +501,15 @@ def test_launch_and_close(self):
rv.close()

@unittest.skipIf(not socket.has_ipv6, 'Requires IPv6')
@attr('IPv6')
@unittest.skipIf(os.environ.get('SKIP_IPV6'), 'IPv6 tests disabled')
def test_launch_and_close_v4_v6(self):
rv = r.Zeroconf(interfaces=r.InterfaceChoice.All, ip_version=r.IPVersion.All)
rv.close()
rv = r.Zeroconf(interfaces=r.InterfaceChoice.Default, ip_version=r.IPVersion.All)
rv.close()

@unittest.skipIf(not socket.has_ipv6, 'Requires IPv6')
@attr('IPv6')
@unittest.skipIf(os.environ.get('SKIP_IPV6'), 'IPv6 tests disabled')
def test_launch_and_close_v6_only(self):
rv = r.Zeroconf(interfaces=r.InterfaceChoice.All, ip_version=r.IPVersion.V6Only)
rv.close()
Expand Down Expand Up @@ -826,7 +825,7 @@ def test_order(self):
cache.add(record2)
entry = r.DNSEntry('a', r._TYPE_SOA, r._CLASS_IN)
cached_record = cache.get(entry)
self.assertEqual(cached_record, record2)
assert cached_record == record2

def test_cache_empty_does_not_leak_memory_by_leaving_empty_list(self):
record1 = r.DNSAddress('a', r._TYPE_SOA, r._CLASS_IN, 1, b'a')
Expand Down Expand Up @@ -888,7 +887,7 @@ def test_integration_with_listener_v6_records(self):
zeroconf_registrar.close()

@unittest.skipIf(not socket.has_ipv6, 'Requires IPv6')
@attr('IPv6')
@unittest.skipIf(os.environ.get('SKIP_IPV6'), 'IPv6 tests disabled')
def test_integration_with_listener_ipv6(self):

type_ = "_test-srvc-type._tcp.local."
Expand All @@ -904,9 +903,9 @@ def test_integration_with_listener_ipv6(self):

try:
service_types = ZeroconfServiceTypes.find(ip_version=r.IPVersion.V6Only, timeout=0.5)
assert type_ in service_types, service_types
assert type_ in service_types
service_types = ZeroconfServiceTypes.find(zc=zeroconf_registrar, timeout=0.5)
assert type_ in service_types, service_types
assert type_ in service_types

finally:
zeroconf_registrar.close()
Expand Down Expand Up @@ -1016,8 +1015,7 @@ def update_service(self, zeroconf, type, name):
assert info.properties[b'prop_true'] == b'1'
assert info.properties[b'prop_false'] == b'0'
assert info.addresses == addresses[:1] # no V6 by default
all_addresses = info.addresses_by_version(r.IPVersion.All)
assert all_addresses == addresses, all_addresses
assert info.addresses_by_version(r.IPVersion.All) == addresses

info = zeroconf_browser.get_service_info(subtype, registration_name)
assert info is not None
Expand Down