@@ -215,7 +215,8 @@ def __init__(self, zc: 'Zeroconf') -> None:
215215 self .data : Optional [bytes ] = None
216216 self .last_time : float = 0
217217 self .transport : Optional [asyncio .DatagramTransport ] = None
218-
218+ self .sock_name : Optional [str ] = None
219+ self .sock_fileno : Optional [int ] = None
219220 self ._deferred : Dict [str , List [DNSIncoming ]] = {}
220221 self ._timers : Dict [str , asyncio .TimerHandle ] = {}
221222
@@ -294,15 +295,20 @@ def datagram_received(
294295 self .zc .handle_response (msg )
295296 return
296297
297- self .handle_query_or_defer (msg , addr , port , v6_flow_scope )
298+ self .handle_query_or_defer (msg , addr , port , self . transport , v6_flow_scope )
298299
299300 def handle_query_or_defer (
300- self , msg : DNSIncoming , addr : str , port : int , v6_flow_scope : Union [Tuple [()], Tuple [int , int ]] = ()
301+ self ,
302+ msg : DNSIncoming ,
303+ addr : str ,
304+ port : int ,
305+ transport : asyncio .DatagramTransport ,
306+ v6_flow_scope : Union [Tuple [()], Tuple [int , int ]] = (),
301307 ) -> None :
302308 """Deal with incoming query packets. Provides a response if
303309 possible."""
304310 if not msg .truncated :
305- self ._respond_query (msg , addr , port , v6_flow_scope )
311+ self ._respond_query (msg , addr , port , transport , v6_flow_scope )
306312 return
307313
308314 deferred = self ._deferred .setdefault (addr , [])
@@ -315,7 +321,7 @@ def handle_query_or_defer(
315321 assert self .zc .loop is not None
316322 self ._cancel_any_timers_for_addr (addr )
317323 self ._timers [addr ] = self .zc .loop .call_later (
318- delay , self ._respond_query , None , addr , port , v6_flow_scope
324+ delay , self ._respond_query , None , addr , port , transport , v6_flow_scope
319325 )
320326
321327 def _cancel_any_timers_for_addr (self , addr : str ) -> None :
@@ -328,6 +334,7 @@ def _respond_query(
328334 msg : Optional [DNSIncoming ],
329335 addr : str ,
330336 port : int ,
337+ transport : asyncio .DatagramTransport ,
331338 v6_flow_scope : Union [Tuple [()], Tuple [int , int ]] = (),
332339 ) -> None :
333340 """Respond to a query and reassemble any truncated deferred packets."""
@@ -336,15 +343,12 @@ def _respond_query(
336343 if msg :
337344 packets .append (msg )
338345
339- self .zc .handle_assembled_query (packets , addr , port , v6_flow_scope )
346+ self .zc .handle_assembled_query (packets , addr , port , transport , v6_flow_scope )
340347
341348 @property
342349 def _socket_description (self ) -> str :
343350 """A human readable description of the socket."""
344- assert self .transport is not None
345- fileno = self .transport .get_extra_info ('socket' ).fileno ()
346- sockname = self .transport .get_extra_info ('sockname' )
347- return f"{ fileno } ({ sockname } )"
351+ return f"{ self .sock_fileno } ({ self .sock_name } )"
348352
349353 def error_received (self , exc : Exception ) -> None :
350354 """Likely socket closed or IPv6."""
@@ -357,6 +361,8 @@ def error_received(self, exc: Exception) -> None:
357361
358362 def connection_made (self , transport : asyncio .BaseTransport ) -> None :
359363 self .transport = cast (asyncio .DatagramTransport , transport )
364+ self .sock_name = self .transport .get_extra_info ('sockname' )
365+ self .sock_fileno = self .transport .get_extra_info ('socket' ).fileno ()
360366
361367 def connection_lost (self , exc : Optional [Exception ]) -> None :
362368 """Handle connection lost."""
@@ -400,6 +406,7 @@ def __init__(
400406 if apple_p2p and sys .platform != 'darwin' :
401407 raise RuntimeError ('Option `apple_p2p` is not supported on non-Apple platforms.' )
402408
409+ self .unicast = unicast
403410 listen_socket , respond_sockets = create_sockets (interfaces , unicast , ip_version , apple_p2p = apple_p2p )
404411 log .debug ('Listen socket %s, respond sockets %s' , listen_socket , respond_sockets )
405412
@@ -732,6 +739,7 @@ def handle_assembled_query(
732739 packets : List [DNSIncoming ],
733740 addr : str ,
734741 port : int ,
742+ transport : asyncio .DatagramTransport ,
735743 v6_flow_scope : Union [Tuple [()], Tuple [int , int ]] = (),
736744 ) -> None :
737745 """Respond to a (re)assembled query.
@@ -749,7 +757,10 @@ def handle_assembled_query(
749757 questions = packets [0 ].questions
750758 id_ = packets [0 ].id
751759 out = construct_outgoing_unicast_answers (question_answers .ucast , ucast_source , questions , id_ )
752- self .async_send (out , addr , port , v6_flow_scope )
760+ # When sending unicast, only send back the reply
761+ # via the same socket that it was recieved from
762+ # as we know its reachable from that socket
763+ self .async_send (out , addr , port , v6_flow_scope , transport )
753764 if question_answers .mcast_now :
754765 self .async_send (construct_outgoing_multicast_answers (question_answers .mcast_now ))
755766 if question_answers .mcast_aggregate :
@@ -766,44 +777,64 @@ def send(
766777 addr : Optional [str ] = None ,
767778 port : int = _MDNS_PORT ,
768779 v6_flow_scope : Union [Tuple [()], Tuple [int , int ]] = (),
780+ transport : Optional [asyncio .DatagramTransport ] = None ,
769781 ) -> None :
770782 """Sends an outgoing packet threadsafe."""
771783 assert self .loop is not None
772- self .loop .call_soon_threadsafe (self .async_send , out , addr , port , v6_flow_scope )
784+ self .loop .call_soon_threadsafe (self .async_send , out , addr , port , v6_flow_scope , transport )
773785
774786 def async_send (
775787 self ,
776788 out : DNSOutgoing ,
777789 addr : Optional [str ] = None ,
778790 port : int = _MDNS_PORT ,
779791 v6_flow_scope : Union [Tuple [()], Tuple [int , int ]] = (),
792+ transport : Optional [asyncio .DatagramTransport ] = None ,
780793 ) -> None :
781794 """Sends an outgoing packet."""
782795 if self ._GLOBAL_DONE :
783796 return
784797
798+ # If no transport is specified, we send to all the ones
799+ # with the same address family
800+ transports = [transport ] if transport else self .engine .senders
801+
785802 for packet_num , packet in enumerate (out .packets ()):
786803 if len (packet ) > _MAX_MSG_ABSOLUTE :
787804 self .log_warning_once ("Dropping %r over-sized packet (%d bytes) %r" , out , len (packet ), packet )
788805 return
789- log .debug (
790- 'Sending to (%s, %d) (%d bytes #%d) %r as %r...' ,
791- addr ,
792- port ,
793- len (packet ),
794- packet_num + 1 ,
795- out ,
796- packet ,
797- )
798- for transport in self .engine .senders :
799- s = transport .get_extra_info ('socket' )
800- if addr is None :
801- real_addr = _MDNS_ADDR6 if s .family == socket .AF_INET6 else _MDNS_ADDR
802- elif not can_send_to (s , addr ):
803- continue
804- else :
805- real_addr = addr
806- transport .sendto (packet , (real_addr , port or _MDNS_PORT , * v6_flow_scope ))
806+ for send_transport in transports :
807+ self ._async_send_transport (send_transport , packet , packet_num , out , addr , port , v6_flow_scope )
808+
809+ def _async_send_transport (
810+ self ,
811+ transport : asyncio .DatagramTransport ,
812+ packet : bytes ,
813+ packet_num : int ,
814+ out : DNSOutgoing ,
815+ addr : Optional [str ],
816+ port : int ,
817+ v6_flow_scope : Union [Tuple [()], Tuple [int , int ]] = (),
818+ ) -> None :
819+ s = transport .get_extra_info ('socket' )
820+ if addr is None :
821+ real_addr = _MDNS_ADDR6 if s .family == socket .AF_INET6 else _MDNS_ADDR
822+ else :
823+ real_addr = addr
824+ if not can_send_to (s , real_addr ):
825+ return
826+ log .debug (
827+ 'Sending to (%s, %d) via [socket %s (%s)] (%d bytes #%d) %r as %r...' ,
828+ real_addr ,
829+ port or _MDNS_PORT ,
830+ s .fileno (),
831+ transport .get_extra_info ('sockname' ),
832+ len (packet ),
833+ packet_num + 1 ,
834+ out ,
835+ packet ,
836+ )
837+ transport .sendto (packet , (real_addr , port or _MDNS_PORT , * v6_flow_scope ))
807838
808839 def _close (self ) -> None :
809840 """Set global done and remove all service listeners."""
0 commit comments