@@ -385,18 +385,31 @@ def add_listener(
385385 answer the question(s)."""
386386 self .listeners .append (listener )
387387
388- if question is not None :
389- now = current_time_millis ()
390- records = []
391- questions = [question ] if isinstance (question , DNSQuestion ) else question
392- for single_question in questions :
393- for record in self .cache .entries_with_name (single_question .name ):
394- if single_question .answered_by (record ) and not record .is_expired (now ):
395- records .append (record )
396- if records :
397- listener .async_update_records (self .zc , now , records )
398- listener .async_update_records_complete ()
388+ if question is None :
389+ self .zc .notify_all ()
390+ return
399391
392+ questions = [question ] if isinstance (question , DNSQuestion ) else question
393+ assert self .zc .loop is not None
394+ self .zc .loop .call_soon_threadsafe (self ._async_update_matching_records , listener , questions )
395+
396+ def _async_update_matching_records (
397+ self , listener : RecordUpdateListener , questions : List [DNSQuestion ]
398+ ) -> None :
399+ """Calls back any existing entries in the cache that answer the question.
400+
401+ This function must be run from the event loop.
402+ """
403+ now = current_time_millis ()
404+ records = []
405+ for question in questions :
406+ for record in self .cache .async_entries_with_name (question .name ):
407+ if not record .is_expired (now ) and question .answered_by (record ):
408+ records .append (record )
409+ if not records :
410+ return
411+ listener .async_update_records (self .zc , now , records )
412+ listener .async_update_records_complete ()
400413 self .zc .notify_all ()
401414
402415 def remove_listener (self , listener : RecordUpdateListener ) -> None :
0 commit comments