import codecs import re import types import sys from .constants import EOF, spaceCharacters, asciiLetters, asciiUppercase from .constants import encodings, ReparseException #Non-unicode versions of constants for use in the pre-parser spaceCharactersBytes = [str(item) for item in spaceCharacters] asciiLettersBytes = [str(item) for item in asciiLetters] asciiUppercaseBytes = [str(item) for item in asciiUppercase] invalid_unicode_re = re.compile("[\u0001-\u0008\u000B\u000E-\u001F\u007F-\u009F\uD800-\uDFFF\uFDD0-\uFDDF\uFFFE\uFFFF\U0001FFFE\U0001FFFF\U0002FFFE\U0002FFFF\U0003FFFE\U0003FFFF\U0004FFFE\U0004FFFF\U0005FFFE\U0005FFFF\U0006FFFE\U0006FFFF\U0007FFFE\U0007FFFF\U0008FFFE\U0008FFFF\U0009FFFE\U0009FFFF\U000AFFFE\U000AFFFF\U000BFFFE\U000BFFFF\U000CFFFE\U000CFFFF\U000DFFFE\U000DFFFF\U000EFFFE\U000EFFFF\U000FFFFE\U000FFFFF\U0010FFFE\U0010FFFF]") ascii_punctuation_re = re.compile(r"[\u0009-\u000D\u0020-\u002F\u003A-\u0040\u005B-\u0060\u007B-\u007E]") # Cache for charsUntil() charsUntilRegEx = {} class BufferedStream: """Buffering for streams that do not have buffering of their own The buffer is implemented as a list of chunks on the assumption that joining many strings will be slow since it is O(n**2) """ def __init__(self, stream): self.stream = stream self.buffer = [] self.position = [-1,0] #chunk number, offset def tell(self): pos = 0 for chunk in self.buffer[:self.position[0]]: pos += len(chunk) pos += self.position[1] return pos def seek(self, pos): assert pos < self._bufferedBytes() offset = pos i = 0 while len(self.buffer[i]) < offset: offset -= pos i += 1 self.position = [i, offset] def read(self, bytes): if not self.buffer: return self._readStream(bytes) elif (self.position[0] == len(self.buffer) and self.position[1] == len(self.buffer[-1])): return self._readStream(bytes) else: return self._readFromBuffer(bytes) def _bufferedBytes(self): return sum([len(item) for item in self.buffer]) def _readStream(self, bytes): data = self.stream.read(bytes) self.buffer.append(data) self.position[0] += 1 self.position[1] = len(data) return data def _readFromBuffer(self, bytes): remainingBytes = bytes rv = [] bufferIndex = self.position[0] bufferOffset = self.position[1] while bufferIndex < len(self.buffer) and remainingBytes != 0: assert remainingBytes > 0 bufferedData = self.buffer[bufferIndex] if remainingBytes <= len(bufferedData) - bufferOffset: bytesToRead = remainingBytes self.position = [bufferIndex, bufferOffset + bytesToRead] else: bytesToRead = len(bufferedData) - bufferOffset self.position = [bufferIndex, len(bufferedData)] bufferIndex += 1 data = rv.append(bufferedData[bufferOffset: bufferOffset + bytesToRead]) remainingBytes -= bytesToRead bufferOffset = 0 if remainingBytes: rv.append(self._readStream(remainingBytes)) return "".join(rv) class HTMLInputStream: """Provides a unicode stream of characters to the HTMLTokenizer. This class takes care of character encoding and removing or replacing incorrect byte-sequences and also provides column and line tracking. """ _defaultChunkSize = 10240 def __init__(self, source, encoding=None, parseMeta=True, chardet=True): """Initialises the HTMLInputStream. HTMLInputStream(source, [encoding]) -> Normalized stream from source for use by html5lib. source can be either a file-object, local filename or a string. The optional encoding parameter must be a string that indicates the encoding. If specified, that encoding will be used, regardless of any BOM or later declaration (such as in a meta element) parseMeta - Look for a element containing encoding information """ # List of where new lines occur self.newLines = [0] self.charEncoding = (codecName(encoding), "certain") # Raw Stream - for string objects this will encode to utf-8 and set # self.charEncoding as appropriate self.rawStream = self.openStream(source) # Encoding Information #Number of bytes to use when looking for a meta element with #encoding information self.numBytesMeta = 512 #Number of bytes to use when using detecting encoding using chardet self.numBytesChardet = 100 #Encoding to use if no other information can be found self.defaultEncoding = "windows-1252" #Detect encoding iff no explicit "transport level" encoding is supplied if (self.charEncoding[0] is None): self.charEncoding = self.detectEncoding(parseMeta, chardet) self.reset() def reset(self): self.dataStream = codecs.getreader(self.charEncoding[0])(self.rawStream, 'replace') self.chunk = "" self.chunkSize = 0 self.chunkOffset = 0 self.errors = [] # Remember the current position in the document self.positionLine = 1 self.positionCol = 0 # Remember the length of the last line, so unget("\n") can restore # positionCol. (Only one character can be ungot at once, so we only # need to remember the single last line.) self.lastLineLength = None #Flag to indicate we may have a CR LF broken across a data chunk self._lastChunkEndsWithCR = False def openStream(self, source): """Produces a file object from source. source can be either a file object, local filename or a string. """ # Already a file object if hasattr(source, 'read'): #This is wrong. We need a generic way to tell the difference #between file-like objects that produce strings and those that #produce bytes. We also need a good way to deal with the ones #that produce strings, in particular getting the replacement #characters right. if not hasattr(source, 'encoding'): stream = source else: raise NotImplementedError("Files not opened in binary mode not yet supported") else: # Otherwise treat source as a string and convert to a file object if isinstance(source, str): source = source.encode('utf-8') self.charEncoding = ("utf-8", "certain") import io stream = io.BytesIO(bytes(source)) if (not(hasattr(stream, "tell") and hasattr(stream, "seek")) or stream is sys.stdin): stream = BufferedStream(stream) return stream def detectEncoding(self, parseMeta=True, chardet=True): #First look for a BOM #This will also read past the BOM if present encoding = self.detectBOM() confidence = "certain" #If there is no BOM need to look for meta elements with encoding #information if encoding is None and parseMeta: encoding = self.detectEncodingMeta() confidence = "tentative" #Guess with chardet, if avaliable if encoding is None and chardet: confidence = "tentative" try: from chardet.universaldetector import UniversalDetector buffers = [] detector = UniversalDetector() while not detector.done: buffer = self.rawStream.read(self.numBytesChardet) if not buffer: break buffers.append(buffer) detector.feed(buffer) detector.close() encoding = detector.result['encoding'] self.rawStream.seek(0) except ImportError: pass # If all else fails use the default encoding if encoding is None: confidence="tentative" encoding = self.defaultEncoding #Substitute for equivalent encodings: encodingSub = {"iso-8859-1":"windows-1252"} if encoding.lower() in encodingSub: encoding = encodingSub[encoding.lower()] return encoding, confidence def changeEncoding(self, newEncoding): newEncoding = codecName(newEncoding) if newEncoding in ("utf-16", "utf-16-be", "utf-16-le"): newEncoding = "utf-8" if newEncoding is None: return elif newEncoding == self.charEncoding[0]: self.charEncoding = (self.charEncoding[0], "certian") else: self.rawStream.seek(0) self.reset() self.charEncoding = (newEncoding, "certian") raise ReparseException("Encoding changed from %s to %s"%(self.charEncoding[0], newEncoding)) def detectBOM(self): """Attempts to detect at BOM at the start of the stream. If an encoding can be determined from the BOM return the name of the encoding otherwise return None""" bomDict = { codecs.BOM_UTF8: 'utf-8', codecs.BOM_UTF16_LE: 'utf-16-le', codecs.BOM_UTF16_BE: 'utf-16-be', codecs.BOM_UTF32_LE: 'utf-32-le', codecs.BOM_UTF32_BE: 'utf-32-be' } # Go to beginning of file and read in 4 bytes string = self.rawStream.read(4) # Try detecting the BOM using bytes from the string encoding = bomDict.get(string[:3]) # UTF-8 seek = 3 if not encoding: # Need to detect UTF-32 before UTF-16 encoding = bomDict.get(string) # UTF-32 seek = 4 if not encoding: encoding = bomDict.get(string[:2]) # UTF-16 seek = 2 # Set the read position past the BOM if one was found, otherwise # set it to the start of the stream self.rawStream.seek(encoding and seek or 0) return encoding def detectEncodingMeta(self): """Report the encoding declared by the meta element """ buffer = self.rawStream.read(self.numBytesMeta) parser = EncodingParser(buffer) self.rawStream.seek(0) encoding = parser.getEncoding() if encoding in ("utf-16", "utf-16-be", "utf-16-le"): encoding = "utf-8" return encoding def updatePosition(self, chars): # Update the position attributes to correspond to some sequence of # read characters # Find the last newline character idx = chars.rfind("\n") if idx == -1: # No newlines in chars self.positionCol += len(chars) else: # Find the last-but-one newline character idx2 = chars.rfind("\n", 0, idx) if idx2 == -1: # Only one newline in chars self.positionLine += 1 self.lastLineLength = self.positionCol + idx self.positionCol = len(chars) - (idx + 1) else: # At least two newlines in chars newlines = chars.count("\n") self.positionLine += newlines self.lastLineLength = idx - (idx2 + 1) self.positionCol = len(chars) - (idx + 1) def position(self): """Returns (line, col) of the current position in the stream.""" return (self.positionLine, self.positionCol) def char(self): """ Read one character from the stream or queue if available. Return EOF when EOF is reached. """ # Read a new chunk from the input stream if necessary if self.chunkOffset >= self.chunkSize: if not self.readChunk(): return EOF char = self.chunk[self.chunkOffset] self.chunkOffset += 1 # Update the position attributes if char == "\n": self.lastLineLength = self.positionCol self.positionCol = 0 self.positionLine += 1 elif char is not EOF: self.positionCol += 1 return char def readChunk(self, chunkSize=_defaultChunkSize): self.chunk = "" self.chunkSize = 0 self.chunkOffset = 0 data = self.dataStream.read(chunkSize) if not data: return False #Replace null characters for i in range(data.count("\u0000")): self.errors.append("null-character") for i in range(len(invalid_unicode_re.findall(data))): self.errors.append("invalid-codepoint") data = data.replace("\u0000", "\ufffd") #Check for CR LF broken across chunks if (self._lastChunkEndsWithCR and data[0] == "\n"): data = data[1:] # Stop if the chunk is now empty if not data: return False self._lastChunkEndsWithCR = data[-1] == "\r" data = data.replace("\r\n", "\n") data = data.replace("\r", "\n") self.chunk = data self.chunkSize = len(data) return True def charsUntil(self, characters, opposite = False): """ Returns a string of characters from the stream up to but not including any character in 'characters' or EOF. 'characters' must be a container that supports the 'in' method and iteration over its characters. """ # Use a cache of regexps to find the required characters try: chars = charsUntilRegEx[(characters, opposite)] except KeyError: for c in characters: assert(ord(c) < 128) regex = "".join(["\\x%02x" % ord(c) for c in characters]) if not opposite: regex = "^%s" % regex chars = charsUntilRegEx[(characters, opposite)] = re.compile("[%s]+" % regex) rv = [] while True: # Find the longest matching prefix m = chars.match(self.chunk, self.chunkOffset) if m is None: # If nothing matched, and it wasn't because we ran out of chunk, # then stop if self.chunkOffset != self.chunkSize: break else: end = m.end() # If not the whole chunk matched, return everything # up to the part that didn't match if end != self.chunkSize: rv.append(self.chunk[self.chunkOffset:end]) self.chunkOffset = end break # If the whole remainder of the chunk matched, # use it all and read the next chunk rv.append(self.chunk[self.chunkOffset:]) if not self.readChunk(): # Reached EOF break r = "".join(rv) self.updatePosition(r) return r def unget(self, char): # Only one character is allowed to be ungotten at once - it must # be consumed again before any further call to unget if char is not None: if self.chunkOffset == 0: # unget is called quite rarely, so it's a good idea to do # more work here if it saves a bit of work in the frequently # called char and charsUntil. # So, just prepend the ungotten character onto the current # chunk: self.chunk = char + self.chunk self.chunkSize += 1 else: self.chunkOffset -= 1 assert self.chunk[self.chunkOffset] == char # Update the position attributes if char == "\n": assert self.positionLine >= 1 assert self.lastLineLength is not None self.positionLine -= 1 self.positionCol = self.lastLineLength self.lastLineLength = None else: self.positionCol -= 1 class EncodingBytes(bytes): """Bytes-like object with an assosiated position and various extra methods If the position is ever greater than the string length then an exception is raised""" def __new__(self, value): return bytes.__new__(self, value) def __init__(self, value): self._position = -1 def __iter__(self): return self def __next__(self): self._position += 1 rv = self[self.position] return rv def setPosition(self, position): if self._position >= len(self): raise StopIteration self._position = position def getPosition(self): if self._position >= len(self): raise StopIteration if self._position >= 0: return self._position else: return None position = property(getPosition, setPosition) def getCurrentByte(self): return self[self.position] currentByte = property(getCurrentByte) def skip(self, chars=spaceCharactersBytes): """Skip past a list of characters""" while self.currentByte in chars: self.position += 1 def matchBytes(self, bytes, lower=False): """Look for a sequence of bytes at the start of a string. If the bytes are found return True and advance the position to the byte after the match. Otherwise return False and leave the position alone""" data = self[self.position:self.position+len(bytes)] if lower: data = data.lower() rv = data.startswith(bytes) if rv == True: self.position += len(bytes) return rv def jumpTo(self, bytes): """Look for the next sequence of bytes matching a given sequence. If a match is found advance the position to the last byte of the match""" newPosition = self[self.position:].find(bytes) if newPosition > -1: self._position += (newPosition + len(bytes)-1) return True else: raise StopIteration def findNext(self, byteList): """Move the pointer so it points to the next byte in a set of possible bytes""" while (self.currentByte not in byteList): self.position += 1 class EncodingParser(object): """Mini parser for detecting character encoding from meta elements""" def __init__(self, data): """data - the data to work on for encoding detection""" self.data = EncodingBytes(data) self.encoding = None def getEncoding(self): methodDispatch = ( (b"") def handleMeta(self): if self.data.currentByte not in spaceCharactersBytes: #if we have "]) if self.data.currentByte == b"<": #return to the first step in the overall "two step" algorithm #reprocessing the < byte self.data.position -= 1 else: #Read all attributes attr = self.getAttribute() while attr is not None: attr = self.getAttribute() return True def handleOther(self): return self.data.jumpTo(b">") def getAttribute(self): """Return a name,value pair for the next attribute in the stream, if one is found, or None""" self.data.skip(list(spaceCharactersBytes)+[b"/"]) if self.data.currentByte == b"<": self.data.position -= 1 return None elif self.data.currentByte == b">": return None attrName = [] attrValue = [] spaceFound = False #Step 5 attribute name while True: if self.data.currentByte == b"=" and attrName: break elif self.data.currentByte in spaceCharactersBytes: spaceFound=True break elif self.data.currentByte in (b"/", b"<", b">"): return b"".join(attrName), "" elif self.data.currentByte in asciiUppercaseBytes: attrName.extend(self.data.currentByte.lower()) else: attrName.extend(self.data.currentByte) #Step 6 self.data.position += 1 #Step 7 if spaceFound: self.data.skip() #Step 8 if self.data.currentByte != b"=": self.data.position -= 1 return b"".join(attrName), b"" #XXX need to advance position in both spaces and value case #Step 9 self.data.position += 1 #Step 10 self.data.skip() #Step 11 if self.data.currentByte in (b"'", b'"'): #11.1 quoteChar = self.data.currentByte while True: self.data.position+=1 #11.3 if self.data.currentByte == quoteChar: self.data.position += 1 return "".join(attrName), "".join(attrValue) #11.4 elif self.data.currentByte in asciiUppercaseBytes: attrValue.extend(self.data.currentByte.lower()) #11.5 else: attrValue.extend(self.data.currentByte) elif self.data.currentByte in (b">", b"<"): return b"".join(attrName), b"" elif self.data.currentByte in asciiUppercaseBytes: attrValue.extend(self.data.currentByte.lower()) else: attrValue.extend(self.data.currentByte) while True: self.data.position +=1 if self.data.currentByte in ( list(spaceCharactersBytes) + [b">", b"<"]): return b"".join(attrName), "".join(attrValue) elif self.data.currentByte in asciiUppercaseBytes: attrValue.extend(self.data.currentByte.lower()) else: attrValue.extend(self.data.currentByte) class ContentAttrParser(object): def __init__(self, data): self.data = data def parse(self): try: #Skip to the first ";" self.data.jumpTo(b";") self.data.position += 1 self.data.skip() #Check if the attr name is charset #otherwise return self.data.jumpTo(b"charset") self.data.position += 1 self.data.skip() if not self.data.currentByte == b"=": #If there is no = sign keep looking for attrs return None self.data.position += 1 self.data.skip() #Look for an encoding between matching quote marks if self.data.currentByte in (b'"', b"'"): quoteMark = self.data.currentByte self.data.position += 1 oldPosition = self.data.position self.data.jumpTo(quoteMark) return self.data[oldPosition:self.data.position] else: #Unquoted value oldPosition = self.data.position try: self.data.findNext(spaceCharactersBytes) return self.data[oldPosition:self.data.position] except StopIteration: #Return the whole remaining value return self.data[oldPosition:] except StopIteration: return None def codecName(encoding): """Return the python codec name corresponding to an encoding or None if the string doesn't correspond to a valid encoding.""" if type(encoding) == bytes: encoding = str(encoding, "ascii") if (encoding is not None) and (type(encoding) == str): canonicalName = ascii_punctuation_re.sub("", encoding).lower() return encodings.get(canonicalName, None) else: return None