2828from ._dns import DNSAddress , DNSHinfo , DNSNsec , DNSPointer , DNSQuestion , DNSRecord , DNSService , DNSText
2929from ._exceptions import IncomingDecodeError , NamePartTooLongException
3030from ._logger import QuietLogger , log
31- from ._utils .struct import int2byte
3231from ._utils .time import current_time_millis
3332from .const import (
3433 _CLASS_UNIQUE ,
6463class DNSMessage :
6564 """A base class for DNS messages."""
6665
66+ __slots__ = ('flags' ,)
67+
6768 def __init__ (self , flags : int ) -> None :
6869 """Construct a DNS message."""
6970 self .flags = flags
@@ -128,11 +129,9 @@ def __repr__(self) -> str:
128129 ]
129130 )
130131
131- def unpack (self , format_ : bytes ) -> tuple :
132- length = struct .calcsize (format_ )
133- info = struct .unpack (format_ , self .data [self .offset : self .offset + length ])
132+ def unpack (self , format_ : bytes , length : int ) -> tuple :
134133 self .offset += length
135- return info
134+ return struct . unpack ( format_ , self . data [ self . offset - length : self . offset ])
136135
137136 def read_header (self ) -> None :
138137 """Reads header portion of packet"""
@@ -143,16 +142,14 @@ def read_header(self) -> None:
143142 self .num_answers ,
144143 self .num_authorities ,
145144 self .num_additionals ,
146- ) = self .unpack (b'!6H' )
145+ ) = self .unpack (b'!6H' , 12 )
147146
148147 def read_questions (self ) -> None :
149148 """Reads questions section of packet"""
150149 for _ in range (self .num_questions ):
151150 name = self .read_name ()
152- type_ , class_ = self .unpack (b'!HH' )
153-
154- question = DNSQuestion (name , type_ , class_ )
155- self .questions .append (question )
151+ type_ , class_ = self .unpack (b'!HH' , 4 )
152+ self .questions .append (DNSQuestion (name , type_ , class_ ))
156153
157154 def read_character_string (self ) -> bytes :
158155 """Reads a character string from the packet"""
@@ -168,15 +165,15 @@ def read_string(self, length: int) -> bytes:
168165
169166 def read_unsigned_short (self ) -> int :
170167 """Reads an unsigned short from the packet"""
171- return cast (int , self .unpack (b'!H' )[0 ])
168+ return cast (int , self .unpack (b'!H' , 2 )[0 ])
172169
173170 def read_others (self ) -> None :
174171 """Reads the answers, authorities and additionals section of the
175172 packet"""
176173 n = self .num_answers + self .num_authorities + self .num_additionals
177174 for _ in range (n ):
178175 domain = self .read_name ()
179- type_ , class_ , ttl , length = self .unpack (b'!HHiH' )
176+ type_ , class_ , ttl , length = self .unpack (b'!HHiH' , 10 )
180177 end = self .offset + length
181178 rec = None
182179 try :
@@ -266,8 +263,7 @@ def read_name(self) -> str:
266263 labels : List [str ] = []
267264 self .seen_pointers .clear ()
268265 self .offset = self ._decode_labels_at_offset (self .offset , labels )
269- labels .append ("" )
270- name = "." .join (labels )
266+ name = "." .join (labels ) + "."
271267 if len (name ) > MAX_NAME_LENGTH :
272268 raise IncomingDecodeError (f"DNS name { name } exceeds maximum length of { MAX_NAME_LENGTH } " )
273269 return name
@@ -440,7 +436,7 @@ def _pack(self, format_: Union[bytes, str], value: Any) -> None:
440436
441437 def _write_byte (self , value : int ) -> None :
442438 """Writes a single byte to the packet"""
443- self ._pack (b'!c' , int2byte ( value ))
439+ self ._pack (b'!c' , bytes (( value ,) ))
444440
445441 def _insert_short_at_start (self , value : int ) -> None :
446442 """Inserts an unsigned short at the start of the packet"""
0 commit comments