3535import threading
3636import time
3737import warnings
38+ from collections import OrderedDict
3839from typing import Dict , List , Optional , Sequence , Union , cast
3940from typing import Any , Callable , Set , Tuple # noqa # used in type hints
4041
@@ -1121,8 +1122,9 @@ def __init__(self) -> None:
11211122
11221123 def add (self , entry : DNSRecord ) -> None :
11231124 """Adds an entry"""
1124- # Insert first in list so get returns newest entry
1125- self .cache .setdefault (entry .key , []).insert (0 , entry )
1125+ # Insert last in list, get will return newest entry
1126+ # iteration will result in last update winning
1127+ self .cache .setdefault (entry .key , []).append (entry )
11261128
11271129 def remove (self , entry : DNSRecord ) -> None :
11281130 """Removes an entry"""
@@ -1142,7 +1144,7 @@ def get(self, entry: DNSEntry) -> Optional[DNSRecord]:
11421144 matching entry."""
11431145 try :
11441146 list_ = self .cache [entry .key ]
1145- for cached_entry in list_ :
1147+ for cached_entry in reversed ( list_ ) :
11461148 if entry .__eq__ (cached_entry ):
11471149 return cached_entry
11481150 return None
@@ -1164,7 +1166,7 @@ def entries_with_name(self, name: str) -> List[DNSRecord]:
11641166
11651167 def current_entry_with_name_and_alias (self , name : str , alias : str ) -> Optional [DNSRecord ]:
11661168 now = current_time_millis ()
1167- for record in self .entries_with_name (name ):
1169+ for record in reversed ( self .entries_with_name (name ) ):
11681170 if (
11691171 record .type == _TYPE_PTR
11701172 and not record .is_expired (now )
@@ -1400,7 +1402,7 @@ def __init__(
14001402 self .services = {} # type: Dict[str, DNSRecord]
14011403 self .next_time = current_time_millis ()
14021404 self .delay = delay
1403- self ._handlers_to_call = [] # type: List[Callable[[Zeroconf], None] ]
1405+ self ._handlers_to_call = OrderedDict () # type: OrderedDict[str, ServiceStateChange ]
14041406
14051407 self ._service_state_changed = Signal ()
14061408
@@ -1445,14 +1447,30 @@ def service_state_changed(self) -> SignalRegistrationInterface:
14451447 def update_record (self , zc : 'Zeroconf' , now : float , record : DNSRecord ) -> None :
14461448 """Callback invoked by Zeroconf when new information arrives.
14471449
1448- Updates information required by browser in the Zeroconf cache."""
1450+ Updates information required by browser in the Zeroconf cache.
1451+
1452+ Ensures that there is are no unecessary duplicates in the list
1453+
1454+ """
14491455
14501456 def enqueue_callback (state_change : ServiceStateChange , name : str ) -> None :
1451- self ._handlers_to_call .append (
1452- lambda zeroconf : self ._service_state_changed .fire (
1453- zeroconf = zeroconf , service_type = self .type , name = name , state_change = state_change
1457+
1458+ # Code to ensure we only do a single update message
1459+ # Precedence is; Added, Remove, Update
1460+
1461+ if (
1462+ state_change is ServiceStateChange .Added
1463+ or (
1464+ state_change is ServiceStateChange .Removed
1465+ and (
1466+ self ._handlers_to_call .get (name ) is ServiceStateChange .Updated
1467+ or self ._handlers_to_call .get (name ) is ServiceStateChange .Added
1468+ or self ._handlers_to_call .get (name ) is None
1469+ )
14541470 )
1455- )
1471+ or (state_change is ServiceStateChange .Updated and name not in self ._handlers_to_call )
1472+ ):
1473+ self ._handlers_to_call [name ] = state_change
14561474
14571475 if record .type == _TYPE_PTR and record .name == self .type :
14581476 assert isinstance (record , DNSPointer )
@@ -1476,8 +1494,20 @@ def enqueue_callback(state_change: ServiceStateChange, name: str) -> None:
14761494 if expires < self .next_time :
14771495 self .next_time = expires
14781496
1479- elif record .type == _TYPE_TXT and record .name .endswith (self .type ):
1480- assert isinstance (record , DNSText )
1497+ elif record .type == _TYPE_A or record .type == _TYPE_AAAA :
1498+ assert isinstance (record , DNSAddress )
1499+
1500+ # Iterate through the DNSCache and callback any services that use this address
1501+ for service in zc .cache .entries ():
1502+ if (
1503+ isinstance (service , DNSService )
1504+ and service .name .endswith (self .type )
1505+ and service .server == record .name
1506+ and not record .is_expired (now )
1507+ ):
1508+ enqueue_callback (ServiceStateChange .Updated , service .name )
1509+
1510+ elif record .name .endswith (self .type ):
14811511 expired = record .is_expired (now )
14821512 if not expired :
14831513 enqueue_callback (ServiceStateChange .Updated , record .name )
@@ -1509,8 +1539,11 @@ def run(self) -> None:
15091539 self .delay = min (_BROWSER_BACKOFF_LIMIT * 1000 , self .delay * 2 )
15101540
15111541 if len (self ._handlers_to_call ) > 0 and not self .zc .done :
1512- handler = self ._handlers_to_call .pop (0 )
1513- handler (self .zc )
1542+ with self .zc ._handlers_lock :
1543+ handler = self ._handlers_to_call .popitem (False )
1544+ self ._service_state_changed .fire (
1545+ zeroconf = self .zc , service_type = self .type , name = handler [0 ], state_change = handler [1 ]
1546+ )
15141547
15151548
15161549class ServiceInfo (RecordUpdateListener ):
@@ -2173,6 +2206,8 @@ def __init__(
21732206
21742207 self .debug = None # type: Optional[DNSOutgoing]
21752208
2209+ self ._handlers_lock = threading .Lock () # ensure we process a full message in one go
2210+
21762211 @property
21772212 def done (self ) -> bool :
21782213 return self ._GLOBAL_DONE
@@ -2449,42 +2484,45 @@ def update_record(self, now: float, rec: DNSRecord) -> None:
24492484 def handle_response (self , msg : DNSIncoming ) -> None :
24502485 """Deal with incoming response packets. All answers
24512486 are held in the cache, and listeners are notified."""
2452- now = current_time_millis ()
2453- for record in msg .answers :
2454-
2455- updated = True
2456-
2457- if record .unique : # https://tools.ietf.org/html/rfc6762#section-10.2
2458- # Since the cache format is keyed on the lower case record name
2459- # we can avoid iterating everything in the cache and
2460- # only look though entries for the specific name.
2461- # entries_with_name will take care of converting to lowercase
2462- #
2463- # We make a copy of the list that entries_with_name returns
2464- # since we cannot iterate over something we might remove
2465- for entry in self .cache .entries_with_name (record .name ).copy ():
24662487
2467- if entry == record :
2468- updated = False
2488+ with self ._handlers_lock :
24692489
2470- # Check the time first because it is far cheaper
2471- # than the __eq__
2472- if (record .created - entry .created > 1000 ) and DNSEntry .__eq__ (entry , record ):
2473- self .cache .remove (entry )
2474-
2475- expired = record .is_expired (now )
2476- maybe_entry = self .cache .get (record )
2477- if not expired :
2478- if maybe_entry is not None :
2479- maybe_entry .reset_ttl (record )
2490+ now = current_time_millis ()
2491+ for record in msg .answers :
2492+
2493+ updated = True
2494+
2495+ if record .unique : # https://tools.ietf.org/html/rfc6762#section-10.2
2496+ # Since the cache format is keyed on the lower case record name
2497+ # we can avoid iterating everything in the cache and
2498+ # only look though entries for the specific name.
2499+ # entries_with_name will take care of converting to lowercase
2500+ #
2501+ # We make a copy of the list that entries_with_name returns
2502+ # since we cannot iterate over something we might remove
2503+ for entry in self .cache .entries_with_name (record .name ).copy ():
2504+
2505+ if entry == record :
2506+ updated = False
2507+
2508+ # Check the time first because it is far cheaper
2509+ # than the __eq__
2510+ if (record .created - entry .created > 1000 ) and DNSEntry .__eq__ (entry , record ):
2511+ self .cache .remove (entry )
2512+
2513+ expired = record .is_expired (now )
2514+ maybe_entry = self .cache .get (record )
2515+ if not expired :
2516+ if maybe_entry is not None :
2517+ maybe_entry .reset_ttl (record )
2518+ else :
2519+ self .cache .add (record )
2520+ if updated :
2521+ self .update_record (now , record )
24802522 else :
2481- self .cache .add (record )
2482- if updated :
2483- self .update_record (now , record )
2484- else :
2485- if maybe_entry is not None :
2486- self .update_record (now , record )
2487- self .cache .remove (maybe_entry )
2523+ if maybe_entry is not None :
2524+ self .update_record (now , record )
2525+ self .cache .remove (maybe_entry )
24882526
24892527 def handle_query (self , msg : DNSIncoming , addr : Optional [str ], port : int ) -> None :
24902528 """Deal with incoming query packets. Provides a response if
0 commit comments