From 2e2647a369fdfd6fc34e36b11a4acc8b96b75f61 Mon Sep 17 00:00:00 2001 From: Z User Date: Sat, 13 Jun 2026 17:06:20 +0000 Subject: [PATCH] refactor: extract guess/rank/score logic from __common__.py into _guess.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Decomposition refactoring of the god object __common__.py (1510 lines): BEFORE: - __common__.py: 1510 lines — contained codec registration, guess/rank/score, utilities, error handling, language detection, and more in a single file AFTER: - __common__.py: 1182 lines — codec registration, utilities, error handling - _guess.py: 343 lines — guess/rank/score logic as a cohesive module This refactoring follows the single responsibility principle: - _guess.py owns all guessing/ranking/scoring functionality - __common__.py owns codec registration and utility functions Changes: - Extracted _detect(), _lang(), _load_lang_backend(), _validate() - Extracted __guess(), __make_encodings_dict(), __rank() - Extracted _Text class and __score() - Extracted public guess() and rank() functions - Maintained all imports and monkey-patching (codecs.guess, codecs.rank) - No behavioral changes — all 27 regression test clusters pass Verified with output-based regression testing (Regrets tool): - V1: All 27 cluster fingerprints GREEN - V2: Direct output comparison identical to pre-refactor baseline - V3: Cross-fingerprint verification matches saved truth - Drift: 5 consecutive runs — all STABLE, zero drift --- src/codext/__common__.py | 2692 +++++++++++++++++--------------------- src/codext/_guess.py | 343 +++++ 2 files changed, 1525 insertions(+), 1510 deletions(-) create mode 100644 src/codext/_guess.py diff --git a/src/codext/__common__.py b/src/codext/__common__.py index 7c3a001..313ed3e 100644 --- a/src/codext/__common__.py +++ b/src/codext/__common__.py @@ -1,1510 +1,1182 @@ -# -*- coding: UTF-8 -*- -import _codecs -import builtins -import codecs -import hashlib -import json -import os -import random -import re -import sys -from encodings.aliases import aliases as ALIASES -from functools import reduce, update_wrapper, wraps -from importlib import import_module -from inspect import currentframe -from io import BytesIO -from itertools import chain, product -from locale import getlocale -from math import log -from pkgutil import iter_modules -from platform import system -from random import randint -from string import * -from types import FunctionType, ModuleType -try: - import re._parser as sre_parse -except ImportError: - import sre_parse - -# from Python 3.11, 'sre_parse' is bound as '_parser' ; monkey-patch it for backward-compatibility -re.sre_parse = sre_parse - -maketrans = str.maketrans - - -__all__ = ["add", "add_macro", "add_map", "b", "clear", "codecs", "decode", "encode", "ensure_str", "examples", "guess", - "isb", "generate_strings_from_regex", "get_alphabet_from_mask", "handle_error", "hashlib", "i2s", - "is_native", "list_categories", "list_encodings", "list_macros", "lookup", "maketrans", "os", "rank", "re", - "register", "remove", "reset", "s2i", "search", "stopfunc", "BytesIO", "_input", "_stripl", "CodecMacro", - "DARWIN", "LANG", "LINUX", "MASKS", "UNIX", "WINDOWS"] -CODECS_REGISTRY = None -CODECS_OVERWRITTEN = [] -CODECS_CATEGORIES = ["native", "custom"] -CODECS_CACHE = {} -LANG = getlocale() -if LANG: - LANG = (LANG[0] or "")[:2].lower() -MASKS = { - 'a': printable, - 'b': "".join(chr(i) for i in range(256)), - 'd': digits, - 'h': digits + "abcdef", - 'H': digits + "ABCDEF", - 'l': ascii_lowercase, - 'p': punctuation, - 's': " ", - 'u': ascii_uppercase, -} - -__codecs_registry = [] - -MACROS = {} -PERS_MACROS = {} -PERS_MACROS_FILE = os.path.expanduser("~/.codext-macros.json") - -DARWIN = system() == "Darwin" -LINUX = system() == "Linux" -UNIX = DARWIN or LINUX -WINDOWS = system() == "Windows" - -entropy = lambda s: -sum([p * log(p, 2) for p in [float(s.count(c)) / len(s) for c in set(s)]]) - -isb = lambda s: isinstance(s, bytes) -iss = lambda s: isinstance(s, str) -fix = lambda x, ref: b(x) if isb(ref) else ensure_str(x) if iss(ref) else x - -s2i = lambda s: int(codecs.encode(s, "base16"), 16) -exc_name = lambda e: "".join(t.capitalize() for t in re.split(r"[-_+]", e)) - - -def i2s(input): - h = hex(input)[2:].rstrip("eL") - return codecs.decode(h.zfill(len(h) + len(h) % 2), "hex") - - -class CodecMacro(tuple): - """Macro details when looking up the codec registry. """ - def __new__(cls, name): - self = tuple.__new__(cls) - self.name = name - # get from personal macros first - try: - self.codecs = PERS_MACROS[name] - except KeyError: - try: - self.codecs = MACROS[name] - except KeyError: - raise LookupError(f"unknown macro: {name}") - if not isinstance(self.codecs, (tuple, list)): - raise ValueError(f"bad macro list: {self.codecs}") - self.codecs = [lookup(e, False) for e in self.codecs] # lookup(e, False) - self.parameters = {'name': name, 'category': "macro"} # ^ means that macros won't be nestable - # test examples to check that the chain of encodings works - for action, examples in (self.codecs[0].parameters.get('examples', {}) or {'enc-dec(': ["T3st str!"]}).items(): - if re.match(r"enc(-dec)?\(", action): - for e in (examples.keys() if action.startswith("enc(") else examples or []): - rd = re.match(r"\@(i?)random(?:\{(\d+(?:,(\d+))*?)\})?$", e) - if rd: - for n in (rd.group(2) or "512").split(","): - s = "".join(chr(randint(0, 255)) for i in range(int(n))) - self.encode(s.lower() if rd.group(1) else s) - continue - self.encode(e) - - class Codec: - decode = self.decode - encode = self.encode - - class IncrementalEncoder(codecs.IncrementalEncoder): - def encode(self, input, final=False): - return b(self.encode(input, self.errors)[0]) - self.incrementalencoder = IncrementalEncoder - - class IncrementalDecoder(codecs.IncrementalDecoder): - def decode(self, input, final=False): - return ensure_str(self.decode(input, self.errors)[0]) - self.incrementaldecoder = IncrementalDecoder - - class StreamWriter(Codec, codecs.StreamWriter): - charbuffertype = bytes - self.streamwriter = StreamWriter - - class StreamReader(Codec, codecs.StreamReader): - charbuffertype = bytes - self.streamreader = StreamReader - - return self - - def decode(self, input, error="strict"): - """ Decode with each codec in reverse order. """ - for ci in self.codecs[::-1]: - input, l = ci.decode(input, error) - return input, l - - def encode(self, input, error="strict"): - """ Encode with each codec. """ - for ci in self.codecs: - input, l = ci.encode(input, error) - return input, l - - def __repr__(self): - return f"" - - -# inspired from: https://stackoverflow.com/questions/10875442/possible-to-change-a-functions-repr-in-python -class Repr(object): - def __init__(self, name, func): - self.__name = name - self.__func = func - update_wrapper(self, func) - - def __call__(self, *args, **kwargs): - return self.__func(*args, **kwargs) - - def __repr__(self): - return f"" - - -def __stdin_pipe(): - """ Stdin pipe read function. """ - try: - with open(0, 'rb') as f: - for l in f: - yield l - except TypeError: - for l in sys.stdin: - yield l - - -def _input(infile): - # handle input file or stdin - c = b("") - if infile: - with open(infile, 'rb') as f: - c = f.read() - else: - for line in __stdin_pipe(): - c += line - return c - - -def _set_exc(name, etype="ValueError"): - if not hasattr(builtins, name): - ns = {} - exec(f"class {name}({etype}): __module__ = 'builtins'", {}, ns) - setattr(builtins, name, ns[name]) -_set_exc("InputSizeLimitError") -_set_exc("ParameterError") - - -def _stripl(s, st_lines, st_crlf): - if st_crlf: - s = s.replace(b"\r\n", b"") if isb(s) else s.replace("\r\n", "") - if st_lines: - s = s.replace(b"\n", b"") if isb(s) else s.replace("\n", "") - return s - - -def _with_repr(name): - def _wrapper(f): - return Repr(name, f) - return _wrapper - - -def add(ename, encode=None, decode=None, pattern=None, text=True, add_to_codecs=False, **kwargs): - """ This adds a new codec to the codecs module setting its encode and/or decode functions, eventually dynamically - naming the encoding with a pattern and with file handling. - - :param ename: encoding name - :param encode: encoding function or None - :param decode: decoding function or None - :param pattern: pattern for dynamically naming the encoding - :param text: specify whether the codec is a text encoding - :param add_to_codecs: also add the search function to the native registry - NB: this will make the codec available in the built-in open(...) but will make it impossible - to remove the codec later - """ - remove(ename) - if encode: - if not isinstance(encode, FunctionType): - raise ValueError("Bad 'encode' function") - _set_exc(f"{exc_name(ename)}EncodeError") # create the custom encode exception as a builtin - if decode: - if not isinstance(decode, FunctionType): - raise ValueError("Bad 'decode' function") - _set_exc(f"{exc_name(ename)}DecodeError") # create the custom decode exception as a builtin - if not encode and not decode: - raise ValueError("At least one en/decoding function must be defined") - for exc in kwargs.get('extra_exceptions', []): - _set_exc(exc) # create additional custom exceptions as builtins - glob = currentframe().f_back.f_globals - # search function for the new encoding - @_with_repr(ename) - def getregentry(encoding): - if encoding != ename and not (pattern and re.match(pattern, encoding)): - return - fenc, fdec, name = encode, decode, encoding - # prepare CodecInfo input arguments - if pattern: - m, args, i = re.match(pattern, encoding), [], 1 - try: - while True: - try: - g = m.group(i) or "" - if g.isdigit() and not g.startswith("0") and (re.match(r"10+", g) or "".join(set(g)) != "01"): - g = int(g) - args += [g] - i += 1 - except AttributeError: - # this occurs when m is None or there is an error in fenc(g) or fdec(g), meaning no match - if m is not None: - raise - return - except IndexError: - # this occurs while m is not None, but possibly no capture group that gives at least 1 group index ; - # in this case, if fenc/fdec is a decorated function, execute it with no arg - if len(args) == 0: - from inspect import getfullargspec - if fenc and len(getfullargspec(fenc).args) == 1: - fenc = fenc() - if fdec and len(getfullargspec(fdec).args) == 1: - fdec = fdec() - else: - fenc = fenc(*args) if fenc else fenc - fdec = fdec(*args) if fdec else fdec - if fenc: - fenc = fix_inout_formats(fenc) - if fdec: - fdec = fix_inout_formats(fdec) - sl, sc = kwargs.pop('strip_lines', False), kwargs.pop('strip_crlf', False) - if sl or sc: - def _striplines(f): - def __wrapper(input, *a, **kw): - return f(_stripl(input, sc, sl), *a, **kw) - return __wrapper - # this fixes issues with wrapped encoded inputs - fdec = _striplines(fdec) - - class Codec(codecs.Codec): - def encode(self, input, errors="strict"): - if fenc is None: - raise NotImplementedError - return fenc(input, errors) - - def decode(self, input, errors="strict"): - if fdec is None: - raise NotImplementedError - return fdec(input, errors) - - class IncrementalEncoder(codecs.IncrementalEncoder): - def encode(self, input, final=False): - if fenc is None: - raise NotImplementedError - return b(fenc(input, self.errors)[0]) - - class IncrementalDecoder(codecs.IncrementalDecoder): - def decode(self, input, final=False): - if fdec is None: - raise NotImplementedError - return ensure_str(fdec(input, self.errors)[0]) - - class StreamWriter(Codec, codecs.StreamWriter): - charbuffertype = bytes - - class StreamReader(Codec, codecs.StreamReader): - charbuffertype = bytes - - ci = codecs.CodecInfo( - name=name, - encode=Codec().encode, - decode=Codec().decode, - incrementalencoder=IncrementalEncoder, - incrementaldecoder=IncrementalDecoder, - streamwriter=StreamWriter, - streamreader=StreamReader, - _is_text_encoding=text, - ) - ci.parameters = kwargs - ci.parameters['name'] = ename - ci.parameters['add_to_codecs'] = add_to_codecs - ci.parameters['pattern'] = pattern - ci.parameters['text'] = text - f = glob.get('__file__', os.path.join("custom", "_")) - cat = f.split(os.path.sep)[-2].rstrip("s") - if cat not in CODECS_CATEGORIES: - CODECS_CATEGORIES.append(cat) - ci.parameters['category'] = kwargs.get('category', cat) - ci.parameters['examples'] = kwargs.get('examples', glob.get('__examples__')) - ci.parameters['guess'] = kwargs.get('guess', glob.get('__guess__', [ename])) or [] - ci.parameters['module'] = kwargs.get('module', glob.get('__name__')) - ci.parameters.setdefault("scoring", {}) - for attr in ["bonus_func", "entropy", "expansion_factor", "len_charset", "penalty", "printables_rate", - "padding_char", "transitive"]: - a = kwargs.pop(attr, None) - if a is not None: - ci.parameters['scoring'][attr] = a - return ci - - getregentry.__name__ = re.sub(r"[\s\-]", "_", ename) - if kwargs.get('aliases'): - getregentry.__aliases__ = list(map(lambda n: re.sub(r"[\s\-]", "_", n), kwargs['aliases'])) - getregentry.__pattern__ = pattern - register(getregentry, add_to_codecs) - return getregentry - - -def add_macro(mname, *encodings): - """ This allows to define a macro, chaining multiple codecs one after the other. This relies on a default set of - macros from a YAML file embedded in the package and a local YAML file from the home folder that takes - precedence for defining personal macros. - - :param mname: macro name - :param encodings: encoding names of the encodings to be chained with the macro - """ - global PERS_MACROS # noqa: F824 - # check for name clash with alreday existing macros and codecs - if mname in MACROS or mname in PERS_MACROS: - raise ValueError("Macro name already exists") - try: - ci = lookup(mname, False) - raise ValueError(f"Macro name clashes with codec '{ci.name}'") - except LookupError: - pass - try: - PERS_MACROS[mname] = encodings - CodecMacro(mname) - with open(PERS_MACROS_FILE, 'w') as f: - json.dump(PERS_MACROS, f, indent=2) - except ValueError: - del PERS_MACROS[mname] - raise -codecs.add_macro = add_macro - - -def add_map(ename, encmap, repl_char="?", sep="", ignore_case=None, no_error=False, intype=None, outype=None, **kwargs): - """ This adds a new mapping codec (that is, declarable with a simple character mapping dictionary) to the codecs - module dynamically setting its encode and/or decode functions, eventually dynamically naming the encoding with - a pattern and with file handling (if text is True). - - :param ename: encoding name - :param encmap: characters encoding map ; can be a dictionary of encoding maps (for use with the first capture - group of the regex pattern) or a function building the encoding map - :param repl_char: replacement char (used when errors handling is set to "replace") - :param sep: string of possible character separators (hence, only single-char separators are considered) ; - - while encoding, the first separator is used - - while decoding, separators can be mixed in the input text - :param ignore_case: ignore text case while encoding and/or decoding - :param no_error: this encoding triggers no error (hence, always in "leave" errors handling) - :param intype: specify the input type for pre-transforming the input text - :param outype: specify the output type for post-transforming the output text - :param pattern: pattern for dynamically naming the encoding - :param text: specify whether the codec is a text encoding - :param add_to_codecs: also add the search function to the native registry - NB: this will make the codec available in the built-in open(...) but will make it impossible - to remove the codec later - """ - outype = outype or intype - if ignore_case not in [None, "encode", "decode", "both"]: - raise ValueError("Bad ignore_case parameter while creating encoding map") - if intype not in [None, "str", "bin", "ord"]: - raise ValueError("Bad input type parameter while creating encoding map") - if outype not in [None, "str", "bin", "ord"]: - raise ValueError("Bad output type parameter while creating encoding map") - - def __generic_code(decode=False): - def _wrapper(param): - """ The parameter for wrapping comes from the encoding regex pattern ; e.g. - [no pattern] => param will be None everytime - r"barbie[-_]?([1-4])$" => param could be int 1, 2, 3 or 4 - r"^morse(|[-_]?.{3})$" => param could be None, "-ABC" (for mapping to ".-/") - - In order of precedence: - 1. when param is a key in mapdict or mapdict is a list of encoding maps (hence in the case of "barbie...", - param MUST be an int, otherwise for the first case it could clash with a character of the encoding map) - 2. otherwise handle it as a new encoding character map "ABC" translates to ".-/" for morse - """ - p = param - if isinstance(encmap, FunctionType): - mapdict = encmap(p) - p = None - else: - mapdict = encmap - if isinstance(mapdict, dict): - smapdict = {k: v for k, v in mapdict.items()} - elif isinstance(mapdict, list) and isinstance(mapdict[0], dict): - smapdict = {k: v for k, v in mapdict[0].items()} - else: - raise ValueError("Bad mapping dictionary or list of mapping dictionaries") - if p is not None: - # case 1: param is empty string - if p == "": - if isinstance(mapdict, list): - smapdict = {k: v for k, v in mapdict[0].items()} - elif isinstance(mapdict, dict): - if '' in mapdict.keys() and isinstance(mapdict[''], dict): - smapdict = {k: v for k, v in mapdict[''].items()} - else: - smapdict = {k: v for k, v in mapdict.items()} - # no 'else' handling a LookupError here ; this case is covered by the first if/elif/else block - # case 2: list or dictionary or dictionary of numbered encodings - elif isinstance(p, int): - # if mapdict is a list, we shall align the parameter (starting from 1) as an index (starting from 0) - if isinstance(mapdict, list): - p -= 1 - if isinstance(mapdict, list) and 0 <= p < len(mapdict) or \ - isinstance(mapdict, dict) and p in mapdict.keys(): - smapdict = {k: v for k, v in mapdict[p].items()} - else: - raise LookupError(f"Bad parameter for encoding '{ename}': '{p}'") - # case 3: dictionary of regex-selected encoding mappings - elif isinstance(mapdict, dict) and isinstance(list(mapdict.values())[0], dict): - tmp = None - for r, d in mapdict.items(): - if r == '': # this is already handled in case 1 ; anyway, an empty regex always matches, hence - continue # it must be excluded - if re.match(r, p): - tmp = d - break - if tmp is None: - raise LookupError(f"Bad parameter for encoding '{ename}': '{p}'") - smapdict = tmp - # case 4: encoding characters translation - else: - # collect base tokens in order of appearance in the mapping dictionary - base_tokens = "" - for _, c in sorted(mapdict.items()): - for t in c: - for st in t: - if st not in base_tokens: - base_tokens += st - if " " not in sep: - base_tokens = base_tokens.replace(" ", "") - if len(p) > 0 and p[0] in "-_" and len(p[1:]) == len(set(p[1:])) == len(base_tokens): - p = p[1:] - if len(p) == len(set(p)) == len(base_tokens): - t = maketrans(base_tokens, p) - for k, v in smapdict.items(): - smapdict[k] = [x.translate(t) for x in v] if isinstance(v, list) else v.translate(t) - else: - raise LookupError(f"Bad parameter for encoding '{ename}': '{p}'") - if ignore_case is not None: - cases = ["upper", "lower"] - case_d = cases[any(c in str(list(smapdict.values())) for c in "abcdefghijklmnopqrstuvwxyz")] - case_e = cases[any(c in str(list(smapdict.keys())) for c in "abcdefghijklmnopqrstuvwxyz")] - i = ignore_case - smapdict = {getattr(k, case_e)() if i in ["both", "encode"] else k: \ - ([getattr(x, case_d)() for x in v] if isinstance(v, list) else getattr(v, case_d)()) \ - if i in ["both", "decode"] else v for k, v in smapdict.items()} - if decode: - tmp = {} - # this has a meaning for encoding maps that could have clashes in encoded chars (e.g. Bacon's cipher ; - # I => abaaa but also J => abaaa, with the following, we keep I instead of letting J overwrite it) - for k, v in sorted(smapdict.items()): - if not isinstance(v, list): - v = [v] - for x in v: - if x not in tmp.keys(): - tmp[x] = k - smapdict, cs = tmp, reduce(lambda acc, x: acc + x, tmp.keys()) - kwargs['strip_lines'], kwargs['strip_crlf'] = "\n" not in set(cs), "\r\n" not in cs - # this allows to avoid an error with Python2 in the "for i, c in enumerate(parts)" loop - if '' not in smapdict.keys(): - smapdict[''] = "" - # determine token and result lengths - tmaxlen = max(map(len, smapdict.keys())) - tminlen = max(1, min(map(len, set(smapdict.keys()) - {''}))) - l = [] - for x in smapdict.values(): - getattr(l, ["append", "extend"][isinstance(x, list)])(x) - rminlen = max(1, min(map(len, set(l) - {''}))) - - # generic encoding/decoding function for map encodings - def code(text, errors="strict"): - icase = ignore_case == "both" or \ - decode and ignore_case == "decode" or \ - not decode and ignore_case == "encode" - if icase: - case = case_d if decode else case_e - if no_error: - errors = "leave" - text = ensure_str(text) - if not decode: - if intype == "bin": - text = "".join(f"{bin(ord(c))[2:]:0>8}" for c in text) - elif intype == "ord": - text = "".join(str(ord(c)).zfill(3) for c in text) - r = "" - lsep = "" if decode else sep if len(sep) <= 1 else sep[0] - kind = ["character", "token"][tmaxlen > 1] - error_func = handle_error(ename, errors, lsep, repl_char, rminlen, decode, kind) - - # get the value from the mapping dictionary, trying the token with its inverted case if relevant - def __get_value(token, position, case_changed=False): - try: - result = smapdict[token] - except KeyError: - if icase and not case_changed: - token_inv_case = getattr(token, case)() - return __get_value(token_inv_case, position, True) - return error_func(token, position) - if isinstance(result, list): - result = result[0] - return result + lsep - - # if a separator is defined, rely on it by splitting the input text - if decode and len(sep) > 0: - for i, c in enumerate(re.split("[" + sep + "]", text)): - r += __get_value(c, i) - # otherwise, move through the text using a cursor for tokenizing it ; this allows defining more complex - # encodings with variable token lengths - else: - cursor, bad = 0, "" - while cursor < len(text): - token = text[cursor:cursor+1] - for l in range(tminlen, tmaxlen + 1): - token = text[cursor:cursor+l] - if token in smapdict.keys() or icase and getattr(token, case)() in smapdict.keys(): - r += __get_value(token, cursor) - cursor += l - break - else: - # collect bad chars and only move the cursor one char to the right - bad += text[cursor] - cursor += 1 - # if the number of bad chars is the minimum token length, consume it and start a new buffer - if len(bad) == tminlen or errors == "leave": - posn = cursor - len(bad) - r += error_func(bad, posn) - bad = "" - if decode: - if outype in ["bin", "ord"]: - tmp, r = "", r.replace(lsep, "") - step = [3, 8][outype == "bin"] - for i in range(0, len(r), step): - s = r[i:i+step] - try: - tmp += chr(int(s, 2) if outype == "bin" else int(s)) - except ValueError: - if len(s) > 0: - tmp += "[" + s + "]" - r = tmp + lsep - return r[:len(r)-len(lsep)], len(b(text)) - return code - if re.search(r"\([^(?:)]", kwargs.get('pattern', "")) is None: - # in this case, there is no capturing group for parametrization - return _wrapper(None) - return _wrapper - - glob = currentframe().f_back.f_globals - kwargs['category'] = glob['__file__'].split(os.path.sep)[-2].rstrip("s") - kwargs['examples'] = kwargs.get('examples', glob.get('__examples__')) - kwargs['encmap'] = encmap - kwargs['repl_char'] = repl_char - kwargs['sep'] = sep - kwargs['ignore_case'] = ignore_case - kwargs['no_error'] = no_error - kwargs['intype'] = intype - kwargs['outype'] = outype - kwargs['module'] = glob.get('__name__') - try: - if isinstance(encmap, dict): - smapdict = {k: v for k, v in encmap.items()} - elif isinstance(encmap, list) and isinstance(encmap[0], dict): - smapdict = {k: v for k, v in encmap[0].items()} - kwargs['repl_minlen'] = i = max(1, min(map(len, set(smapdict.values()) - {''}))) - kwargs['repl_minlen_b'] = max(1, min(map(len, map(b, set(smapdict.values()) - {''})))) - except: - pass - return add(ename, __generic_code(), __generic_code(True), **kwargs) -codecs.add_map = add_map - - -def clear(): - """ Clear codext's local registry of search functions. """ - global __codecs_registry, MACROS, PERS_MACROS # noqa: F824 - __codecs_registry, MACROS, PERS_MACROS = [], {}, {} -codecs.clear = clear - - -def examples(encoding, number=10): - """ Use the search function to get the matching encodings and provide examples of valid encoding names. """ - e = [] - for name in search(encoding): - for search_function in __codecs_registry: - n = search_function.__name__ - if name in [n, n.replace("_", "-")]: - temp = [] - for s in generate_strings_from_regex(search_function.__pattern__, yield_max=16*number): - temp.append(s) - random.shuffle(temp) - i = 0 - while i < min(number, len(temp)): - if not temp[i].isdigit(): - try: - lookup(temp[i], False) - e.append(temp[i]) - except LookupError: - pass - i += 1 - for alias, codec in ALIASES.items(): - if name == codec: - if codec not in e: - e.append(codec) - if not alias.isdigit(): - e.append(alias) - random.shuffle(e) - return sorted([e[i] for i in range(min(number, len(e)))], key=_human_keys) -codecs.examples = examples - - -def is_native(encoding): - """ Determine if a given encoding is native or not. """ - return lookup(encoding, False).parameters['category'] == "native" - - -def list_categories(): - """ Get a list of all codec categories. """ - c = CODECS_CATEGORIES - root = os.path.dirname(__file__) - for d in os.listdir(root): - if os.path.isdir(os.path.join(root, d)) and not d.startswith("__"): - c.append(d.rstrip("s")) - # particular category, hardcoded from base/_base.py - c += ["base-generic"] - return list(set(c)) -list_categories() - - -def list_encodings(*categories): - """ Get a list of all codecs. """ - # if "non-native" is in the input list, extend the list with the whole categories but "native" - categories, exclude = list(categories), [] - for c in categories[:]: - if c == "non-native": - for c in CODECS_CATEGORIES: - if c == "native" or c in categories: - continue - categories.append(c) - categories.remove("non-native") - if c.startswith("~"): - exclude.append(c[1:]) - categories.remove(c) - try: - categories.remove(c[1:]) - except ValueError: - pass - # now, filter codecs according to the input list of categories - enc = [] - if (len(categories) == 0 or "native" in categories) and "native" not in exclude: - for a in set(ALIASES.values()): - try: - ci = __orig_lookup(a) - except LookupError: - continue - if lookup(a) is ci: - enc.append(ci.name) - for search_function in CODECS_OVERWRITTEN + __codecs_registry: - name = search_function.__name__.replace("_", "-") - p = search_function.__pattern__ - ci = search_function(name) if p is None else search_function(generate_string_from_regex(p)) - c = "other" if ci is None else ci.parameters['category'] - if (len(categories) == 0 or c in categories) and c not in exclude: - enc.append(name) - for category in categories: - if category not in CODECS_CATEGORIES: - raise ValueError(f"Category '{category}' does not exist") - return sorted(list(set(enc)), key=_human_keys) - - -def list_macros(): - """ Get a list of all macros, with the precedence on personal ones. """ - return sorted(list(set(list(MACROS.keys()) + list(PERS_MACROS.keys())))) - - -def remove(name): - """ Remove all search functions matching the input encoding name from codext's local registry or any macro with the - given name. """ - global __codecs_registry, MACROS, PERS_MACROS # noqa: F824 - tbr = [] - for search_function in __codecs_registry: - if search_function(name) is not None: - tbr.append(search_function) - for search_function in tbr: - __codecs_registry.remove(search_function) - try: - del MACROS[name] - except KeyError: - pass - try: - del PERS_MACROS[name] - with open(PERS_MACROS_FILE, 'w') as f: - json.dump(PERS_MACROS, f, indent=2) - except KeyError: - pass - try: - del CODECS_CACHE[name] - except KeyError: - pass - for s in ["En", "De"]: - try: - delattr(builtins, f"{name.capitalize()}{s}codeError") - except AttributeError: - pass -codecs.remove = remove - - -def reset(): - """ Reset codext's local registry of search functions and macros. """ - from importlib import reload - global __codecs_registry, CODECS_REGISTRY, MACROS, PERS_MACROS # noqa: F824 - clear() - d = os.path.dirname(__file__) - for pkg in sorted(os.listdir(d)): - if pkg.startswith("_") or not os.path.isdir(os.path.join(d, pkg)): - continue - reload(import_module("codext." + pkg)) - # backup codext's registry - if CODECS_REGISTRY is None: - CODECS_REGISTRY = __codecs_registry[:] - # restore codext's registry - else: - __codecs_registry = CODECS_REGISTRY[:] - # restore codext's embedded set of macros - with open(os.path.join(os.path.dirname(__file__), "macros.json")) as f: - MACROS = json.load(f) - # reload personal set of macros - PERS_MACROS = {} - if os.path.exists(PERS_MACROS_FILE): - with open(PERS_MACROS_FILE) as f: - PERS_MACROS = json.load(f) -codecs.reset = reset - - -# conversion functions -def b(s): - """ Non-crashing bytes conversion function. """ - try: - return s.encode("latin-1") - except: - pass - try: - return s.encode("utf-8") - except: - pass - return s - - -def ensure_str(s, encoding="utf-8", errors='strict'): - """ Dummy str conversion function. """ - if isinstance(s, bytes): - try: - return s.decode(encoding, errors) - except: - return s.decode("latin-1") - return s - - -# make conversion functions compatible with input/output strings/bytes -def fix_inout_formats(f): - """ This decorator ensures that the first output of f will have the same text format as the first input (str or - bytes). """ - @wraps(f) - def _wrapper(*args, **kwargs): - a0 = args[0] - a0_isb = isb(a0) - a0 = ensure_str(a0) if iss(a0) or a0_isb else a0 - r = f(a0, *args[1:], **kwargs) - # special case: input is in bytes ; ensure that the returned length is this of the bytes, not this processed by - # the decode/encode function - if isinstance(r, (tuple, list)) and isinstance(r[1], int) and a0_isb: - r = tuple([list(r)[0]] + [len(args[0])] + list(r)[2:]) - return (fix(r[0], args[0]), ) + r[1:] if isinstance(r, (tuple, list)) else fix(r, args[0]) - return _wrapper - - -# alphabet generation function from a given mask -def get_alphabet_from_mask(mask): - """ This function generates an alphabet from the given mask. The style used is similar to Hashcat ; group keys are - marked with a heading "?". """ - i, alphabet = 0, "" - while i < len(mask): - c = mask[i] - if c == "?" and i < len(mask) - 1 and mask[i+1] in MASKS.keys(): - for c in MASKS[mask[i+1]]: - if c not in alphabet: - alphabet += c - i += 1 - elif c not in alphabet: - alphabet += c - i += 1 - return alphabet - - -# generic error handling function -def handle_error(ename, errors, sep="", repl_char="?", repl_minlen=1, decode=False, kind="character", item="position"): - """ This shortcut function allows to handle error modes given some tuning parameters. - - :param ename: encoding name - :param errors: error handling mode - :param sep: token separator - :param repl_char: replacement character (for use when errors="replace") - :param repl_minlen: repeat number for the replacement character - :param decode: whether we are encoding or decoding - :param item: position item description (for describing the error ; e.g. "group" or "token") - """ - exc = f"{exc_name(ename)}{['En','De'][decode]}codeError" - - def _handle_error(token, position, output="", eename=None): - """ This handles an encoding/decoding error according to the selected handling mode. - - :param token: input token to be encoded/decoded - :param position: token position index - :param output: output, as decoded up to the position of the error - """ - if errors == "strict": - token = f"{token[:7]}..." if len(token := ensure_str(token)) > 10 else token - err = getattr(builtins, exc)(f"'{eename or ename}' codec can't {['en','de'][decode]}code {kind} '{token}' " - f"in {item} {position}") - err.output = output - err.__cause__ = err - raise err - elif errors == "leave": - return token + sep - elif errors == "replace": - return repl_char * repl_minlen + sep - elif errors == "ignore": - return "" - else: - raise ValueError(f"Unsupported error handling '{errors}'") - return _handle_error - - -# codecs module hooks -__orig_lookup = _codecs.lookup -__orig_register = _codecs.register - - -def __add(ename, encode=None, decode=None, pattern=None, text=True, **kwargs): - kwargs.pop('add_to_codecs', None) - return add(ename, encode, decode, pattern, text, True, **kwargs) -__add.__doc__ = add.__doc__ -codecs.add = __add - - -def decode(obj, encoding='utf-8', errors='strict'): - """ Custom decode function relying on the hooked lookup function. """ - return lookup(encoding).decode(obj, errors)[0] -codecs.decode = decode - - -def encode(obj, encoding='utf-8', errors='strict'): - """ Custom encode function relying on the hooked lookup function. """ - n, m = 1, re.search(r"\[(\d+)\]$", encoding) - if m: - n = int(m.group(1)) - encoding = re.sub(r"\[(\d+)\]$", "", encoding) - ci = lookup(encoding) - for i in range(n): - try: - obj = ci.encode(obj, errors)[0] - except (AttributeError, TypeError) as e: # occurs for encodings that require str as input while 'obj' is bytes - if str(e) not in ["'bytes' object has no attribute 'encode'", - "ord() expected string of length 1, but int found"] or \ - encoding in ["latin-1", "utf-8"]: # encodings considered when using b(...) - raise - obj = ci.encode(ensure_str(obj), errors)[0] - return obj -codecs.encode = encode - - -def lookup(encoding, macro=True): - """ Hooked lookup function for searching first for codecs in the local registry of this module. """ - # first, try to match the given encoding with codecs' search functions - for search_function in CODECS_OVERWRITTEN + __codecs_registry: - codecinfo = search_function(encoding) - if codecinfo is not None: - return codecinfo - # then, if a codec name was given, generate an encoding name from its pattern and get the CodecInfo - for search_function in CODECS_OVERWRITTEN + __codecs_registry: - if search_function.__name__.replace("_", "-") == encoding or \ - encoding in getattr(search_function, "__aliases__", []): - codecinfo = search_function(generate_string_from_regex(search_function.__pattern__)) - if codecinfo is not None: - return codecinfo - # finally, get a CodecInfo with the original lookup function and refine it with a dictionary of parameters - try: - ci = __orig_lookup(encoding) - ci.parameters = {'category': "native", 'module': "codecs", 'name': ALIASES.get(ci.name, ci.name)} - return ci - except LookupError: - if not macro: - raise - try: - return CodecMacro(encoding) - except LookupError: - e = LookupError(f"unknown encoding: {encoding}") - e.__cause__ = e # stop exception chaining - raise e -codecs.lookup = lookup - - -def register(search_function, add_to_codecs=False): - """ Register function for registering new codecs in the local registry of this module and, if required, in the - native codecs registry (for use with the built-in 'open' function). - - :param search_function: search function for the codecs registry - :param add_to_codecs: also add the search function to the native registry - NB: this will make the codec available in the built-in open(...) but will make it impossible - to remove the codec later - """ - if search_function not in __codecs_registry: - try: - __orig_lookup(search_function.__name__) - l = CODECS_OVERWRITTEN - except LookupError: - l = __codecs_registry - l.append(search_function) - if add_to_codecs: - __orig_register(search_function) - - -def __register(search_function): - """ Same as register(...), but with add_to_codecs set by default to True. """ - register(search_function, True) -codecs.register = __register - - -def search(encoding_regex, extended=True): - """ Function similar to lookup but allows to search for an encoding based on a regex instead. It searches this way - into the local registry but also tries a simple lookup with the original lookup function. """ - matches = [] - for search_function in CODECS_OVERWRITTEN + __codecs_registry: - n = search_function.__name__ - for name in [n, n.replace("_", "-")]: - if re.search(encoding_regex, name): - matches.append(n.replace("_", "-")) - continue - if extended: - # in some cases, encoding_regex can match a generated string that uses a particular portion of its - # generating pattern ; e.g. we expect encoding_regex="uu_" to find "uu" and "uu_codec" while it can also - # find "morse" or "atbash" very rarely because of their dynamic patterns and the limited number of randomly - # generated strings - # so, we can use a qualified majority voting to ensure we do not get a "junk" encoding in the list of - # matches ; executing 5 times the string generation for a given codec but adding the codec to the list of - # matches only if we get at least 3 matches ensures that we consider up to 2 failures that could be - # stochastic, therefore drastically decreasing the probability to get a "junk" encoding in the matches list - c = 0 - for i in range(5): - for s in generate_strings_from_regex(search_function.__pattern__): - if re.search(encoding_regex, s): - c += 1 - break - if c >= 3: - matches.append(n) - break - for s, n in ALIASES.items(): - if re.search(encoding_regex, s) or re.search(encoding_regex, n): - matches.append(n) - return sorted(list(set(matches)), key=_human_keys) -codecs.search = search - - -# utility function for the search feature -CATEGORIES = { - 'digit': digits, - 'not_digit': reduce(lambda x, c: x.replace(c, ""), digits, printable), - 'space': whitespace, - 'not_space': reduce(lambda x, c: x.replace(c, ""), whitespace, printable), - 'word': ascii_letters + digits + '_', - 'not_word': reduce(lambda x, c: x.replace(c, ""), ascii_letters + digits + '_', printable), -} -REPEAT_MAX = 10 -STAR_PLUS_MAX = 10 -YIELD_MAX = 100 - - -def __gen_str_from_re(regex, star_plus_max, repeat_max, yield_max, parsed=False): - """ Recursive function to generate strings from a regex pattern. """ - if regex is None: - return - __groups = {} - tokens = [] - negate, last_rand = False, None - for state in (regex if parsed else re.sre_parse.parse(b(getattr(regex, "pattern", regex)))): - code = getattr(state[0], "name", state[0]).lower() - value = getattr(state[1], "name", state[1]) - value = value.lower() if isinstance(value, str) else value - if code in ["assert_not", "at"]: - continue - elif code == "any": - charset = list(printable.replace("\n", "")) - while charset[0] == last_rand and len(charset) > 1: - random.shuffle(charset) - last_rand = charset[0] - tokens.append(charset) # should be ord(x) with x belongs to [0, 256[ - elif code == "assert": - tokens.append(list(__gen_str_from_re(value[1], star_plus_max, repeat_max, yield_max, True))) - elif code == "branch": - result = [] - for r in value[1]: - result += list(__gen_str_from_re(r, star_plus_max, repeat_max, yield_max, True)) or [""] - tokens.append(result) - elif code == "category": - charset = list(CATEGORIES[value[9:]]) - if negate: - negate = False - charset = list(set(printable).difference(charset)) - while charset[0] == last_rand and len(charset) > 1: - random.shuffle(charset) - last_rand = charset[0] - tokens.append(charset) - elif code == "groupref": - tokens.extend(__groups[value]) - elif code == "in": - subtokens = list(__gen_str_from_re(value, star_plus_max, repeat_max, yield_max, True)) - subtokens = [x for l in subtokens for x in l] - tokens.append(subtokens) - elif code == "literal": - tokens.append(chr(value)) - elif code in ["max_repeat", "min_repeat"]: - start, end = value[:2] - end = min(end, star_plus_max) - start = min(start, end) - charset = list(__gen_str_from_re(value[-1], star_plus_max, repeat_max, yield_max, True)) - subtokens = [] - if start == 0 and end == 1: - subtokens.append("") - subtokens.extend(charset) - elif len(charset) ** end > repeat_max: - for i in range(min(repeat_max, 10 * len(charset))): - n = random.randint(start, end + 1) - token = "" if n == 0 else "".join(random.choice(charset) for i in range(n)) - if token not in subtokens: - subtokens.append(token) - else: - i -= 1 - else: - for n in range(start, end + 1): - for c in product(charset, repeat=n): - subtokens.append("".join(c)) - tokens.append(subtokens) - elif code == "negate": - negate = True - elif code == "not_literal": - charset = list(printable.replace(chr(value), "")) - while charset[0] == last_rand and len(charset) > 1: - random.shuffle(charset) - last_rand = charset[0] - tokens.append(charset) - elif code == "range": - tokens.append("".join(chr(i) for i in range(value[0], value[1] + 1))) - elif code == "subpattern": - result = list(__gen_str_from_re(value[-1], star_plus_max, repeat_max, yield_max, True)) - if value[0]: - __groups[value[0]] = result - tokens.append(result) - else: - raise NotImplementedError(f"Unhandled code '{code}'") - if len(tokens) == 0: - tokens = [""] - i = 0 - for result in product(*tokens): - yield "".join(result) - i += 1 - if i >= yield_max: - break - - -def _human_keys(text): - """ Sorting function for considering strings with numbers (e.g. base2, base10, base100) """ - tokens = [] - for s in re.split(r"(\d+|\D+)", text): - tokens.append(int(s) if s.isdigit() else s) - return tokens - - -def generate_string_from_regex(regex): - """ Utility function to generate a single string from a regex pattern. """ - if regex: - return list(generate_strings_from_regex(regex, yield_max=1))[0] - - -def generate_strings_from_regex(regex, star_plus_max=STAR_PLUS_MAX, repeat_max=REPEAT_MAX, yield_max=YIELD_MAX): - """ Utility function to generate strings from a regex pattern. """ - for r in __gen_str_from_re(regex, star_plus_max, repeat_max, yield_max): - yield r - - -# guess feature objects -__module_exists = lambda n: n in [x[1] for x in iter_modules()] -stopfunc = ModuleType("stopfunc", """ - Predefined stop functions - ~~~~~~~~~~~~~~~~~~~~~~~~~ - - This submodule contains stop functions for the guess feature of codext. - - - `flag`: searches for the pattern "[Ff][Ll1][Aa4@][Gg9]" (either UTF-8 or UTF-16) - - `lang_**`: checks if the given lang (any from the PROFILES_DIRECTORY of the langdetect module) is detected - - `printables`: checks that every output character is in the set of printables - - `regex`: takes one argument, the regular expression, for checking a string against the given pattern - - `text`: checks for printables and an entropy less than 4.6 (empirically determined) -""") -stopfunc.printables = lambda s: all(c in printable for c in ensure_str(s)) -stopfunc.printables.__name__ = stopfunc.printables.__qualname__ = "printables" -stopfunc.regex = lambda p: lambda s: re.search(p, ensure_str(s)) is not None -stopfunc.regex.__name__ = stopfunc.regex.__qualname__ = "regex" -stopfunc.text = lambda s: stopfunc.printables(s) and entropy(s) < 4.6 -stopfunc.text.__name__ = stopfunc.text.__qualname__ = "text" -stopfunc.flag = lambda x: re.search(r"[Ff][Ll1][Aa4@][Gg96]", ensure_str(x)) is not None -stopfunc.flag.__name__ = stopfunc.flag.__qualname__ = "flag" -stopfunc.default = stopfunc.text - -stopfunc.LANG_BACKEND = None -stopfunc.LANG_BACKENDS = [n for n in ["pycld2", "langdetect", "langid", "cld3", "textblob"] if __module_exists(n)] -if len(stopfunc.LANG_BACKENDS) > 0: - stopfunc.LANG_BACKEND = stopfunc.LANG_BACKENDS[0] -if "cld3" in stopfunc.LANG_BACKENDS: - stopfunc.CLD3_LANGUAGES = "af|am|ar|bg|bn|bs|ca|ce|co|cs|cy|da|de|el|en|eo|es|et|eu|fa|fi|fr|fy|ga|gd|gl|gu|ha|" \ - "hi|hm|hr|ht|hu|hy|id|ig|is|it|iw|ja|jv|ka|kk|km|kn|ko|ku|ky|la|lb|lo|lt|lv|mg|mi|mk|" \ - "ml|mn|mr|ms|mt|my|ne|nl|no|ny|pa|pl|ps|pt|ro|ru|sd|si|sk|sl|sm|sn|so|sq|sr|st|su|sv|" \ - "sw|ta|te|tg|th|tr|uk|ur|uz|vi|xh|yi|yo|zh|zu".split("|") -if "textblob" in stopfunc.LANG_BACKENDS: - stopfunc.TEXTBLOB_LANGUAGES = "af|ar|az|be|bg|bn|ca|cs|cy|da|de|el|en|eo|es|et|eu|fa|fi|fr|ga|gl|gu|hi|hr|ht|hu|" \ - "id|is|it|iw|ja|ka|kn|ko|la|lt|lv|mk|ms|mt|nl|no|pl|pt|ro|ru|sk|sl|sq|sr|sv|sw|ta|" \ - "te|th|tl|tr|uk|ur|vi|yi|zh".split("|") - - -def _detect(text): - _lb, t = stopfunc.LANG_BACKEND, ensure_str(text) - if _lb is None: - raise ValueError("No language backend %s" % ["selected", "installed"][len(stopfunc.LANG_BACKENDS) == 0]) - return langid.classify(t)[0] if _lb == "langid" else \ - langdetect.detect(t) if _lb == "langdetect" else \ - pycld2.detect(t)[2][0][1] if _lb == "pycld2" else \ - cld3.get_language(t).language[:2] if _lb == "cld3" else \ - textblob.TextBlob(t).detect_language()[:2] - - -def _lang(lang): - def _test(s): - if not stopfunc.text(s): - return False - try: - return _detect(ensure_str(s))[:2] == lang - except: - return False - return _test - - -def _load_lang_backend(backend=None): - # import the requested backend library if not imported yet - if backend is None or backend in stopfunc.LANG_BACKENDS: - stopfunc.LANG_BACKEND = backend - if backend: - globals()[backend] = __import__(backend) - else: - raise ValueError("Unsupported language detection backend") - # remove language-related stop functions - for attr in dir(stopfunc): - if attr.startswith("_") or not isinstance(getattr(stopfunc, attr), FunctionType): - continue - if re.match(r"lang_[a-z]{2}$", attr): - delattr(stopfunc, attr) - # rebind applicable language-related stop functions - if stopfunc.LANG_BACKEND: - _lb = stopfunc.LANG_BACKEND - if _lb == "langid": - langid.langid.load_model() - for lang in ( - langid.langid.identifier.nb_classes if _lb == "langid" else \ - list(set(p[:2] for p in os.listdir(langdetect.PROFILES_DIRECTORY))) if _lb == "langdetect" else \ - list(set(x[1][:2] for x in pycld2.LANGUAGES if x[0] in pycld2.DETECTED_LANGUAGES)) if _lb == "pycld2" else \ - stopfunc.CLD3_LANGUAGES if _lb == "cld3" else \ - stopfunc.TEXTBLOB_LANGUAGES if _lb == "textblob" else \ - []): - n = f"lang_{lang}" - setattr(stopfunc, n, _lang(lang)) - getattr(stopfunc, n).__name__ = getattr(stopfunc, n).__qualname__ = n - if LANG: - flng = f"lang_{LANG}" - if getattr(stopfunc, flng, None): - stopfunc.default = getattr(stopfunc, flng) -stopfunc._reload_lang = _load_lang_backend - - -def _validate(stop_function, lang_backend="none"): - s, lb = stop_function, lang_backend - if isinstance(s, str): - if re.match(r"lang_[a-z]{2}$", s) and lb != "none" and \ - all(re.match(r"lang_[a-z]{2}$", x) is None for x in dir(stopfunc)): - stopfunc._reload_lang(lb) - f = getattr(stopfunc, s, None) - if f: - return f - elif not isinstance(s, FunctionType): - raise ValueError("Bad stop function") - return s -stopfunc._validate = _validate - - -def __guess(prev_input, input, stop_func, depth, max_depth, min_depth, encodings, result, found=(), - stop=True, show=False, scoring_heuristic=False, extended=False, debug=False): - """ Perform a breadth-first tree search using a ranking logic to select and prune the list of codecs. """ - if depth > min_depth and stop_func(input): - if not stop and (show or debug) and found not in result: - s = repr(input) - s = s[2:-1] if s.startswith("b'") and s.endswith("'") else s - s = f"[+] {', '.join(found)}: {s}" - print(s if len(s) <= 80 else f"{s[:77]}...") - result[found] = input - if depth >= max_depth or len(result) > 0 and stop: - return - prev_enc = found[-1] if len(found) > 0 else "" - e = encodings.get(depth, encodings.get(-1, [])) - for new_input, encoding in __rank(prev_input, input, prev_enc, e, scoring_heuristic, extended): - if len(result) > 0 and stop: - return - if debug: - print(f"[*] Depth {depth+1:0{len(str(max_depth))}}/{max_depth}: {encoding}") - __guess(input, new_input, stop_func, depth+1, max_depth, min_depth, encodings, result, found + (encoding, ), - stop, show, scoring_heuristic, extended, debug) - - -def __make_encodings_dict(include, exclude): - """ Process encodings inclusion and exclusion lists, listing categories and developping codecs' lists of possible - encoding names. It also creates a cache with the CodecInfo objects for improving performance. """ - def _develop(d, keep=True): - d = d or {} - for k, v in d.items(): - l, cc, sc = [], [e for e in v if e in CODECS_CATEGORIES], [e for e in v if e not in CODECS_CATEGORIES] - # list from in-scope categories and then everything that is not a category - for enc in ((list_encodings(*cc) if (len(cc) > 0 or keep) and len(sc) == 0 else []) + sc): - g = [] - for e in (search(enc, False) or [enc]): - try: - ci = lookup(e, False) - g.extend(ci.parameters['guess']) - except: - pass - if enc in g: # e.g. "rot-1" => ["rot-1", "rot-2", ...] ; only "rot-1" is to be selected - l.append(enc) - else: # e.g. "rot" => ["rot-1", "rot-2", ...] ; all the "rot-N" shall be selected - l.extend(g) - d[k] = list(set(l)) - return d - _excl, _incl = _develop(exclude, False), _develop(include) - return {k: [x for x in v if x not in _excl.get(k, [])] for k, v in _incl.items()} - - -def __rank(prev_input, input, prev_encoding, encodings, heuristic=False, extended=False, yield_score=False): - """ Filter valid encodings and rank them by relevance. """ - ranking = {} - for e in encodings: - try: - codec = CODECS_CACHE[e] - except KeyError: - try: - CODECS_CACHE[e] = codec = lookup(e, False) - except LookupError: - continue - t = __score(prev_input, input, prev_encoding, e, codec, heuristic, extended) - if t: - ranking[e] = t - for encoding, result in sorted(ranking.items(), key=lambda x: (-x[1][0], x[0])): - yield result if yield_score else result[1], encoding - - -class _Text(object): - __slots__ = ["entropy", "lcharset", "len", "padding", "printables", "text"] - - def __init__(self, text, pad_char=None): - self.text = ensure_str(text) - c = self.text[-1] - pad_char, last_char = (chr(pad_char), chr(c)) if isinstance(c, int) else (pad_char, c) - self.padding = pad_char is not None and last_char == pad_char - if self.padding: - text = text.rstrip(b(pad_char) if isinstance(text, bytes) else pad_char) - self.len = len(self.text) - self.lcharset = len(set(self.text)) - self.printables = float(len([c for c in self.text if c in printable])) / self.len - self.entropy = entropy(self.text) - - -def __score(prev_input, input, prev_encoding, encoding, codec, heuristic=False, extended=False): - """ Score relevant encodings given an input. """ - obj = None - sc = codec.parameters.get('scoring', {}) - no_error, transitive = codec.parameters.get('no_error', False), sc.get('transitive', False) - # ignore encodings that fail to decode with their default errors handling value - try: - new_input = codec.decode(input)[0] - except: - return - # ignore encodings that give an output identical to the input (identity transformation) or to the previous input - if len(new_input) == 0 or prev_input is not None and b(input) == b(new_input) or b(prev_input) == b(new_input): - return - # ignore encodings that transitively give the same output (identity transformation by chaining twice a same - # codec (e.g. rot-15 is equivalent to rot-3 and rot-12 or rot-6 and rot-9) - if transitive and prev_encoding: - ci_prev = lookup(prev_encoding, False) - if ci_prev.parameters['name'] == codec.parameters['name']: - return - # compute input's characteristics only once and only if the control flow reaches this point - pad = sc.get('padding_char') - if obj is None: - obj = _Text(input, pad) - if heuristic: - # from here, the goal (e.g. if the input is Base32) is to rank candidate encodings (e.g. multiple base - # codecs) so that we can put the right one as early as possible and eventually exclude bad candidates - s = -sc.get('penalty', .0) - # first, apply a bonus if the length of input text's charset is exactly the same as encoding's charset ; - # on the contrary, if the length of input text's charset is strictly greater, give a penalty - lcs = sc.get('len_charset', 256) - if isinstance(lcs, type(lambda: None)): - lcs = int(lcs(encoding)) - if (pad and obj.padding and lcs + 1 >= obj.lcharset) or lcs >= obj.lcharset: - s += max(.0, round(.6 * (.99 ** (lcs - obj.lcharset)), 5) - .1) - elif (pad and obj.padding and lcs + 1 < obj.lcharset) or lcs < obj.lcharset: - s -= .2 # this can occur for encodings with no_error set to True - # then, take padding into account, giving a bonus if padding is to be encountered and effectively present, - # or a penalty when it should not be encountered but it is present - if pad and obj.padding: - s += .2 # when padding is encountered while it is legitimate, it could be a good indication => bonus - elif not pad and obj.padding: - s -= .1 # it could arise a padding character is encountered while not being padding => small penalty - # give a bonus when the rate of printable characters is greater or equal than expected and a penalty when - # lower only for codecs that DO NOT tolerate errors (otherwise, the printables rate can be biased) - if not no_error: - pr = sc.get('printables_rate', 0) - if isinstance(pr, type(lambda: None)): - pr = float(pr(obj.printables)) - if obj.printables - pr <= .05: - s += .1 - expf = sc.get('expansion_factor', 1.) - if expf: - f = obj.len / float(len(new_input)) # expansion while encoding => at decoding: 1/f - if isinstance(expf, type(lambda: None)): - try: # this case allows to consider the current encoding name from the current codec - expf = expf(f, encoding) - except TypeError: - expf = expf(f) - if isinstance(expf, (int, float)): - expf = 1/f - .1 <= 1/expf <= 1/f + .1 - elif isinstance(expf, (tuple, list)) and len(expf) == 2: - expf = 1/f - expf[1] <= 1/expf[0] <= 1/f + expf[1] - s += [-1., .1][expf] - # afterwards, if the input text has an entropy close to the expected one, give a bonus weighted on the - # number of input characters to take bad entropies of shorter strings into account - entr = sc.get('entropy', lambda e: e) - entr = entr.get(encoding, entr.get('default')) if isinstance(entr, dict) else entr - if isinstance(entr, type(lambda: None)): - try: # this case allows to consider the current encoding name from the current codec - entr = entr(obj.entropy, encoding) - except TypeError: - entr = entr(obj.entropy) - if entr is not None: - # use a quadratic heuristic to compute a weight for the entropy delta, aligned on (256,.2) and (512,1) - d_entr = min(3.04575e-06 * obj.len**2 + .000394 * obj.len, 1) * abs(entr - obj.entropy) - if d_entr <= .5: - s += .5 - d_entr - # finally, if relevant, apply a custom bonus (e.g. when a regex pattern is matched) - bonus = sc.get('bonus_func') - if bonus is not None: - if isinstance(bonus, type(lambda: None)): - bonus = bonus(obj, codec, encoding) - if bonus: - s += .2 - else: - s = 1. - # exclude negative (and eventually null) scores as they are (hopefully) not relevant - if extended and s >= .0 or not extended and s > .0: - return s, new_input - - -def guess(input, stop_func=stopfunc.default, min_depth=0, max_depth=5, include=None, exclude=None, found=(), - stop=True, show=False, scoring_heuristic=True, extended=False, debug=False): - """ Try decoding without the knowledge of the encoding(s). - - :param input: input text to be guessed - :param stop_func: function defining the stop condition - :param min_depth: minimum search depth - :param max_depth: maximum search depth - ;param include: inclusion item OR list with category, codec or encoding names OR dictionary with lists per - depth (nothing means include every encoding) - :param exclude: exclusion item OR list with category, codec or encoding names OR dictionary with lists per - depth (nothing means exclude no encoding) - :param found: tuple of already found encodings - :param stop: whether to stop or not when a valid solution is found - :param show: whether to immediately show once a solution is found - :param scoring_heuristic: whether to apply the scoring heuristic during the search (if disabled, all scores are 1., - meaning that every non-failing encoding will be considered with no order of precedence) - :param extended: whether to also consider null scores with the heuristic - :param debug: whether to show each attempt at each depth during computation - """ - if len(input) == 0: - return "" - # check for min and max depths - if max_depth <= 0: - raise ValueError("Depth must be a non-null positive integer") - if min_depth > max_depth: - raise ValueError("Min depth shall be less than or equal to the max depth") - # take the tuple of found encodings into account - if len(found) > 0: - for encoding in found: - input = decode(input, encoding) - # handle the stop function as a regex if a string was given - if isinstance(stop_func, str): - stop_func = stopfunc.regex(stop_func) - # reformat include and exclude arguments ; supported formats: - for n, l in zip(["inc", "exc"], [include, exclude]): - if l is None: - if n == "inc": - include = l = {-1: CODECS_CATEGORIES} - else: - exclude = l = {} - # "category" OR "enc_name" OR whatever => means a single item for all depths - if isinstance(l, str): - if n == "inc": - include = l = {-1: [l]} - else: - exclude = l = {-1: [l]} - # ["enc_name1", "enc_name2", ...] => means for all depths - if isinstance(l, (list, tuple)): - if n == "inc": - include = l = {-1: l} - else: - exclude = l = {-1: l} - # {-1: [...], 2: [...], ...} => means prefedined depths with their lists of in-/excluded encodings - if not isinstance(l, dict) or not all(isinstance(k, int) for k in l.keys()): - raise ValueError("Include argument shall be a list or a dictionary with integer keys") - # precompute encodings lists per depth and cache the related CodecInfo objects - encodings, result = __make_encodings_dict(include, exclude), {} - try: - # breadth-first search - for d in range(max_depth): - __guess("", input, stop_func, 0, d+1, min_depth, encodings, result, tuple(found), stop, show, - scoring_heuristic, extended, debug) - if stop and len(result) > 0: - break - except KeyboardInterrupt: - pass - CODECS_CACHE = {} - return result -codecs.guess = guess - - -def rank(input, extended=False, limit=-1, include=None, exclude=None): - """ Rank the most probable encodings based on the given input. - - :param input: input text to be evaluated - :param extended: whether to consider null scores too (NB: negative scores are not output !) - :param limit: number of encodings to be returned (-1 means all of them) - :param include: inclusion list with category, codec or encoding names (nothing means include every encoding) - :param exclude: exclusion list with category, codec or encoding names (nothing means exclude no encoding) - """ - encodings = __make_encodings_dict(include if isinstance(include, dict) else {-1: include or CODECS_CATEGORIES}, - exclude if isinstance(exclude, dict) else {-1: exclude or []}) - r = list(__rank(None, input, "", encodings[-1], True, extended, True)) - return r[:limit] if len(r) > 1 else r -codecs.rank = rank - +# -*- coding: UTF-8 -*- +import _codecs +import builtins +import codecs +import hashlib +import json +import os +import random +import re +import sys +from encodings.aliases import aliases as ALIASES +from functools import reduce, update_wrapper, wraps +from importlib import import_module +from inspect import currentframe +from io import BytesIO +from itertools import chain, product +from locale import getlocale +from math import log +from pkgutil import iter_modules +from platform import system +from random import randint +from string import * +from types import FunctionType, ModuleType +try: + import re._parser as sre_parse +except ImportError: + import sre_parse + +# from Python 3.11, 'sre_parse' is bound as '_parser' ; monkey-patch it for backward-compatibility +re.sre_parse = sre_parse + +maketrans = str.maketrans + + +__all__ = ["add", "add_macro", "add_map", "b", "clear", "codecs", "decode", "encode", "ensure_str", "examples", "guess", + "isb", "generate_strings_from_regex", "get_alphabet_from_mask", "handle_error", "hashlib", "i2s", + "is_native", "list_categories", "list_encodings", "list_macros", "lookup", "maketrans", "os", "rank", "re", + "register", "remove", "reset", "s2i", "search", "stopfunc", "BytesIO", "_input", "_stripl", "CodecMacro", + "DARWIN", "LANG", "LINUX", "MASKS", "UNIX", "WINDOWS"] +CODECS_REGISTRY = None +CODECS_OVERWRITTEN = [] +CODECS_CATEGORIES = ["native", "custom"] +CODECS_CACHE = {} +LANG = getlocale() +if LANG: + LANG = (LANG[0] or "")[:2].lower() +MASKS = { + 'a': printable, + 'b': "".join(chr(i) for i in range(256)), + 'd': digits, + 'h': digits + "abcdef", + 'H': digits + "ABCDEF", + 'l': ascii_lowercase, + 'p': punctuation, + 's': " ", + 'u': ascii_uppercase, +} + +__codecs_registry = [] + +MACROS = {} +PERS_MACROS = {} +PERS_MACROS_FILE = os.path.expanduser("~/.codext-macros.json") + +DARWIN = system() == "Darwin" +LINUX = system() == "Linux" +UNIX = DARWIN or LINUX +WINDOWS = system() == "Windows" + +entropy = lambda s: -sum([p * log(p, 2) for p in [float(s.count(c)) / len(s) for c in set(s)]]) + +isb = lambda s: isinstance(s, bytes) +iss = lambda s: isinstance(s, str) +fix = lambda x, ref: b(x) if isb(ref) else ensure_str(x) if iss(ref) else x + +s2i = lambda s: int(codecs.encode(s, "base16"), 16) +exc_name = lambda e: "".join(t.capitalize() for t in re.split(r"[-_+]", e)) + + +def i2s(input): + h = hex(input)[2:].rstrip("eL") + return codecs.decode(h.zfill(len(h) + len(h) % 2), "hex") + + +class CodecMacro(tuple): + """Macro details when looking up the codec registry. """ + def __new__(cls, name): + self = tuple.__new__(cls) + self.name = name + # get from personal macros first + try: + self.codecs = PERS_MACROS[name] + except KeyError: + try: + self.codecs = MACROS[name] + except KeyError: + raise LookupError(f"unknown macro: {name}") + if not isinstance(self.codecs, (tuple, list)): + raise ValueError(f"bad macro list: {self.codecs}") + self.codecs = [lookup(e, False) for e in self.codecs] # lookup(e, False) + self.parameters = {'name': name, 'category': "macro"} # ^ means that macros won't be nestable + # test examples to check that the chain of encodings works + for action, examples in (self.codecs[0].parameters.get('examples', {}) or {'enc-dec(': ["T3st str!"]}).items(): + if re.match(r"enc(-dec)?\(", action): + for e in (examples.keys() if action.startswith("enc(") else examples or []): + rd = re.match(r"\@(i?)random(?:\{(\d+(?:,(\d+))*?)\})?$", e) + if rd: + for n in (rd.group(2) or "512").split(","): + s = "".join(chr(randint(0, 255)) for i in range(int(n))) + self.encode(s.lower() if rd.group(1) else s) + continue + self.encode(e) + + class Codec: + decode = self.decode + encode = self.encode + + class IncrementalEncoder(codecs.IncrementalEncoder): + def encode(self, input, final=False): + return b(self.encode(input, self.errors)[0]) + self.incrementalencoder = IncrementalEncoder + + class IncrementalDecoder(codecs.IncrementalDecoder): + def decode(self, input, final=False): + return ensure_str(self.decode(input, self.errors)[0]) + self.incrementaldecoder = IncrementalDecoder + + class StreamWriter(Codec, codecs.StreamWriter): + charbuffertype = bytes + self.streamwriter = StreamWriter + + class StreamReader(Codec, codecs.StreamReader): + charbuffertype = bytes + self.streamreader = StreamReader + + return self + + def decode(self, input, error="strict"): + """ Decode with each codec in reverse order. """ + for ci in self.codecs[::-1]: + input, l = ci.decode(input, error) + return input, l + + def encode(self, input, error="strict"): + """ Encode with each codec. """ + for ci in self.codecs: + input, l = ci.encode(input, error) + return input, l + + def __repr__(self): + return f"" + + +# inspired from: https://stackoverflow.com/questions/10875442/possible-to-change-a-functions-repr-in-python +class Repr(object): + def __init__(self, name, func): + self.__name = name + self.__func = func + update_wrapper(self, func) + + def __call__(self, *args, **kwargs): + return self.__func(*args, **kwargs) + + def __repr__(self): + return f"" + + +def __stdin_pipe(): + """ Stdin pipe read function. """ + try: + with open(0, 'rb') as f: + for l in f: + yield l + except TypeError: + for l in sys.stdin: + yield l + + +def _input(infile): + # handle input file or stdin + c = b("") + if infile: + with open(infile, 'rb') as f: + c = f.read() + else: + for line in __stdin_pipe(): + c += line + return c + + +def _set_exc(name, etype="ValueError"): + if not hasattr(builtins, name): + ns = {} + exec(f"class {name}({etype}): __module__ = 'builtins'", {}, ns) + setattr(builtins, name, ns[name]) +_set_exc("InputSizeLimitError") +_set_exc("ParameterError") + + +def _stripl(s, st_lines, st_crlf): + if st_crlf: + s = s.replace(b"\r\n", b"") if isb(s) else s.replace("\r\n", "") + if st_lines: + s = s.replace(b"\n", b"") if isb(s) else s.replace("\n", "") + return s + + +def _with_repr(name): + def _wrapper(f): + return Repr(name, f) + return _wrapper + + +def add(ename, encode=None, decode=None, pattern=None, text=True, add_to_codecs=False, **kwargs): + """ This adds a new codec to the codecs module setting its encode and/or decode functions, eventually dynamically + naming the encoding with a pattern and with file handling. + + :param ename: encoding name + :param encode: encoding function or None + :param decode: decoding function or None + :param pattern: pattern for dynamically naming the encoding + :param text: specify whether the codec is a text encoding + :param add_to_codecs: also add the search function to the native registry + NB: this will make the codec available in the built-in open(...) but will make it impossible + to remove the codec later + """ + remove(ename) + if encode: + if not isinstance(encode, FunctionType): + raise ValueError("Bad 'encode' function") + _set_exc(f"{exc_name(ename)}EncodeError") # create the custom encode exception as a builtin + if decode: + if not isinstance(decode, FunctionType): + raise ValueError("Bad 'decode' function") + _set_exc(f"{exc_name(ename)}DecodeError") # create the custom decode exception as a builtin + if not encode and not decode: + raise ValueError("At least one en/decoding function must be defined") + for exc in kwargs.get('extra_exceptions', []): + _set_exc(exc) # create additional custom exceptions as builtins + glob = currentframe().f_back.f_globals + # search function for the new encoding + @_with_repr(ename) + def getregentry(encoding): + if encoding != ename and not (pattern and re.match(pattern, encoding)): + return + fenc, fdec, name = encode, decode, encoding + # prepare CodecInfo input arguments + if pattern: + m, args, i = re.match(pattern, encoding), [], 1 + try: + while True: + try: + g = m.group(i) or "" + if g.isdigit() and not g.startswith("0") and (re.match(r"10+", g) or "".join(set(g)) != "01"): + g = int(g) + args += [g] + i += 1 + except AttributeError: + # this occurs when m is None or there is an error in fenc(g) or fdec(g), meaning no match + if m is not None: + raise + return + except IndexError: + # this occurs while m is not None, but possibly no capture group that gives at least 1 group index ; + # in this case, if fenc/fdec is a decorated function, execute it with no arg + if len(args) == 0: + from inspect import getfullargspec + if fenc and len(getfullargspec(fenc).args) == 1: + fenc = fenc() + if fdec and len(getfullargspec(fdec).args) == 1: + fdec = fdec() + else: + fenc = fenc(*args) if fenc else fenc + fdec = fdec(*args) if fdec else fdec + if fenc: + fenc = fix_inout_formats(fenc) + if fdec: + fdec = fix_inout_formats(fdec) + sl, sc = kwargs.pop('strip_lines', False), kwargs.pop('strip_crlf', False) + if sl or sc: + def _striplines(f): + def __wrapper(input, *a, **kw): + return f(_stripl(input, sc, sl), *a, **kw) + return __wrapper + # this fixes issues with wrapped encoded inputs + fdec = _striplines(fdec) + + class Codec(codecs.Codec): + def encode(self, input, errors="strict"): + if fenc is None: + raise NotImplementedError + return fenc(input, errors) + + def decode(self, input, errors="strict"): + if fdec is None: + raise NotImplementedError + return fdec(input, errors) + + class IncrementalEncoder(codecs.IncrementalEncoder): + def encode(self, input, final=False): + if fenc is None: + raise NotImplementedError + return b(fenc(input, self.errors)[0]) + + class IncrementalDecoder(codecs.IncrementalDecoder): + def decode(self, input, final=False): + if fdec is None: + raise NotImplementedError + return ensure_str(fdec(input, self.errors)[0]) + + class StreamWriter(Codec, codecs.StreamWriter): + charbuffertype = bytes + + class StreamReader(Codec, codecs.StreamReader): + charbuffertype = bytes + + ci = codecs.CodecInfo( + name=name, + encode=Codec().encode, + decode=Codec().decode, + incrementalencoder=IncrementalEncoder, + incrementaldecoder=IncrementalDecoder, + streamwriter=StreamWriter, + streamreader=StreamReader, + _is_text_encoding=text, + ) + ci.parameters = kwargs + ci.parameters['name'] = ename + ci.parameters['add_to_codecs'] = add_to_codecs + ci.parameters['pattern'] = pattern + ci.parameters['text'] = text + f = glob.get('__file__', os.path.join("custom", "_")) + cat = f.split(os.path.sep)[-2].rstrip("s") + if cat not in CODECS_CATEGORIES: + CODECS_CATEGORIES.append(cat) + ci.parameters['category'] = kwargs.get('category', cat) + ci.parameters['examples'] = kwargs.get('examples', glob.get('__examples__')) + ci.parameters['guess'] = kwargs.get('guess', glob.get('__guess__', [ename])) or [] + ci.parameters['module'] = kwargs.get('module', glob.get('__name__')) + ci.parameters.setdefault("scoring", {}) + for attr in ["bonus_func", "entropy", "expansion_factor", "len_charset", "penalty", "printables_rate", + "padding_char", "transitive"]: + a = kwargs.pop(attr, None) + if a is not None: + ci.parameters['scoring'][attr] = a + return ci + + getregentry.__name__ = re.sub(r"[\s\-]", "_", ename) + if kwargs.get('aliases'): + getregentry.__aliases__ = list(map(lambda n: re.sub(r"[\s\-]", "_", n), kwargs['aliases'])) + getregentry.__pattern__ = pattern + register(getregentry, add_to_codecs) + return getregentry + + +def add_macro(mname, *encodings): + """ This allows to define a macro, chaining multiple codecs one after the other. This relies on a default set of + macros from a YAML file embedded in the package and a local YAML file from the home folder that takes + precedence for defining personal macros. + + :param mname: macro name + :param encodings: encoding names of the encodings to be chained with the macro + """ + global PERS_MACROS # noqa: F824 + # check for name clash with alreday existing macros and codecs + if mname in MACROS or mname in PERS_MACROS: + raise ValueError("Macro name already exists") + try: + ci = lookup(mname, False) + raise ValueError(f"Macro name clashes with codec '{ci.name}'") + except LookupError: + pass + try: + PERS_MACROS[mname] = encodings + CodecMacro(mname) + with open(PERS_MACROS_FILE, 'w') as f: + json.dump(PERS_MACROS, f, indent=2) + except ValueError: + del PERS_MACROS[mname] + raise +codecs.add_macro = add_macro + + +def add_map(ename, encmap, repl_char="?", sep="", ignore_case=None, no_error=False, intype=None, outype=None, **kwargs): + """ This adds a new mapping codec (that is, declarable with a simple character mapping dictionary) to the codecs + module dynamically setting its encode and/or decode functions, eventually dynamically naming the encoding with + a pattern and with file handling (if text is True). + + :param ename: encoding name + :param encmap: characters encoding map ; can be a dictionary of encoding maps (for use with the first capture + group of the regex pattern) or a function building the encoding map + :param repl_char: replacement char (used when errors handling is set to "replace") + :param sep: string of possible character separators (hence, only single-char separators are considered) ; + - while encoding, the first separator is used + - while decoding, separators can be mixed in the input text + :param ignore_case: ignore text case while encoding and/or decoding + :param no_error: this encoding triggers no error (hence, always in "leave" errors handling) + :param intype: specify the input type for pre-transforming the input text + :param outype: specify the output type for post-transforming the output text + :param pattern: pattern for dynamically naming the encoding + :param text: specify whether the codec is a text encoding + :param add_to_codecs: also add the search function to the native registry + NB: this will make the codec available in the built-in open(...) but will make it impossible + to remove the codec later + """ + outype = outype or intype + if ignore_case not in [None, "encode", "decode", "both"]: + raise ValueError("Bad ignore_case parameter while creating encoding map") + if intype not in [None, "str", "bin", "ord"]: + raise ValueError("Bad input type parameter while creating encoding map") + if outype not in [None, "str", "bin", "ord"]: + raise ValueError("Bad output type parameter while creating encoding map") + + def __generic_code(decode=False): + def _wrapper(param): + """ The parameter for wrapping comes from the encoding regex pattern ; e.g. + [no pattern] => param will be None everytime + r"barbie[-_]?([1-4])$" => param could be int 1, 2, 3 or 4 + r"^morse(|[-_]?.{3})$" => param could be None, "-ABC" (for mapping to ".-/") + + In order of precedence: + 1. when param is a key in mapdict or mapdict is a list of encoding maps (hence in the case of "barbie...", + param MUST be an int, otherwise for the first case it could clash with a character of the encoding map) + 2. otherwise handle it as a new encoding character map "ABC" translates to ".-/" for morse + """ + p = param + if isinstance(encmap, FunctionType): + mapdict = encmap(p) + p = None + else: + mapdict = encmap + if isinstance(mapdict, dict): + smapdict = {k: v for k, v in mapdict.items()} + elif isinstance(mapdict, list) and isinstance(mapdict[0], dict): + smapdict = {k: v for k, v in mapdict[0].items()} + else: + raise ValueError("Bad mapping dictionary or list of mapping dictionaries") + if p is not None: + # case 1: param is empty string + if p == "": + if isinstance(mapdict, list): + smapdict = {k: v for k, v in mapdict[0].items()} + elif isinstance(mapdict, dict): + if '' in mapdict.keys() and isinstance(mapdict[''], dict): + smapdict = {k: v for k, v in mapdict[''].items()} + else: + smapdict = {k: v for k, v in mapdict.items()} + # no 'else' handling a LookupError here ; this case is covered by the first if/elif/else block + # case 2: list or dictionary or dictionary of numbered encodings + elif isinstance(p, int): + # if mapdict is a list, we shall align the parameter (starting from 1) as an index (starting from 0) + if isinstance(mapdict, list): + p -= 1 + if isinstance(mapdict, list) and 0 <= p < len(mapdict) or \ + isinstance(mapdict, dict) and p in mapdict.keys(): + smapdict = {k: v for k, v in mapdict[p].items()} + else: + raise LookupError(f"Bad parameter for encoding '{ename}': '{p}'") + # case 3: dictionary of regex-selected encoding mappings + elif isinstance(mapdict, dict) and isinstance(list(mapdict.values())[0], dict): + tmp = None + for r, d in mapdict.items(): + if r == '': # this is already handled in case 1 ; anyway, an empty regex always matches, hence + continue # it must be excluded + if re.match(r, p): + tmp = d + break + if tmp is None: + raise LookupError(f"Bad parameter for encoding '{ename}': '{p}'") + smapdict = tmp + # case 4: encoding characters translation + else: + # collect base tokens in order of appearance in the mapping dictionary + base_tokens = "" + for _, c in sorted(mapdict.items()): + for t in c: + for st in t: + if st not in base_tokens: + base_tokens += st + if " " not in sep: + base_tokens = base_tokens.replace(" ", "") + if len(p) > 0 and p[0] in "-_" and len(p[1:]) == len(set(p[1:])) == len(base_tokens): + p = p[1:] + if len(p) == len(set(p)) == len(base_tokens): + t = maketrans(base_tokens, p) + for k, v in smapdict.items(): + smapdict[k] = [x.translate(t) for x in v] if isinstance(v, list) else v.translate(t) + else: + raise LookupError(f"Bad parameter for encoding '{ename}': '{p}'") + if ignore_case is not None: + cases = ["upper", "lower"] + case_d = cases[any(c in str(list(smapdict.values())) for c in "abcdefghijklmnopqrstuvwxyz")] + case_e = cases[any(c in str(list(smapdict.keys())) for c in "abcdefghijklmnopqrstuvwxyz")] + i = ignore_case + smapdict = {getattr(k, case_e)() if i in ["both", "encode"] else k: \ + ([getattr(x, case_d)() for x in v] if isinstance(v, list) else getattr(v, case_d)()) \ + if i in ["both", "decode"] else v for k, v in smapdict.items()} + if decode: + tmp = {} + # this has a meaning for encoding maps that could have clashes in encoded chars (e.g. Bacon's cipher ; + # I => abaaa but also J => abaaa, with the following, we keep I instead of letting J overwrite it) + for k, v in sorted(smapdict.items()): + if not isinstance(v, list): + v = [v] + for x in v: + if x not in tmp.keys(): + tmp[x] = k + smapdict, cs = tmp, reduce(lambda acc, x: acc + x, tmp.keys()) + kwargs['strip_lines'], kwargs['strip_crlf'] = "\n" not in set(cs), "\r\n" not in cs + # this allows to avoid an error with Python2 in the "for i, c in enumerate(parts)" loop + if '' not in smapdict.keys(): + smapdict[''] = "" + # determine token and result lengths + tmaxlen = max(map(len, smapdict.keys())) + tminlen = max(1, min(map(len, set(smapdict.keys()) - {''}))) + l = [] + for x in smapdict.values(): + getattr(l, ["append", "extend"][isinstance(x, list)])(x) + rminlen = max(1, min(map(len, set(l) - {''}))) + + # generic encoding/decoding function for map encodings + def code(text, errors="strict"): + icase = ignore_case == "both" or \ + decode and ignore_case == "decode" or \ + not decode and ignore_case == "encode" + if icase: + case = case_d if decode else case_e + if no_error: + errors = "leave" + text = ensure_str(text) + if not decode: + if intype == "bin": + text = "".join(f"{bin(ord(c))[2:]:0>8}" for c in text) + elif intype == "ord": + text = "".join(str(ord(c)).zfill(3) for c in text) + r = "" + lsep = "" if decode else sep if len(sep) <= 1 else sep[0] + kind = ["character", "token"][tmaxlen > 1] + error_func = handle_error(ename, errors, lsep, repl_char, rminlen, decode, kind) + + # get the value from the mapping dictionary, trying the token with its inverted case if relevant + def __get_value(token, position, case_changed=False): + try: + result = smapdict[token] + except KeyError: + if icase and not case_changed: + token_inv_case = getattr(token, case)() + return __get_value(token_inv_case, position, True) + return error_func(token, position) + if isinstance(result, list): + result = result[0] + return result + lsep + + # if a separator is defined, rely on it by splitting the input text + if decode and len(sep) > 0: + for i, c in enumerate(re.split("[" + sep + "]", text)): + r += __get_value(c, i) + # otherwise, move through the text using a cursor for tokenizing it ; this allows defining more complex + # encodings with variable token lengths + else: + cursor, bad = 0, "" + while cursor < len(text): + token = text[cursor:cursor+1] + for l in range(tminlen, tmaxlen + 1): + token = text[cursor:cursor+l] + if token in smapdict.keys() or icase and getattr(token, case)() in smapdict.keys(): + r += __get_value(token, cursor) + cursor += l + break + else: + # collect bad chars and only move the cursor one char to the right + bad += text[cursor] + cursor += 1 + # if the number of bad chars is the minimum token length, consume it and start a new buffer + if len(bad) == tminlen or errors == "leave": + posn = cursor - len(bad) + r += error_func(bad, posn) + bad = "" + if decode: + if outype in ["bin", "ord"]: + tmp, r = "", r.replace(lsep, "") + step = [3, 8][outype == "bin"] + for i in range(0, len(r), step): + s = r[i:i+step] + try: + tmp += chr(int(s, 2) if outype == "bin" else int(s)) + except ValueError: + if len(s) > 0: + tmp += "[" + s + "]" + r = tmp + lsep + return r[:len(r)-len(lsep)], len(b(text)) + return code + if re.search(r"\([^(?:)]", kwargs.get('pattern', "")) is None: + # in this case, there is no capturing group for parametrization + return _wrapper(None) + return _wrapper + + glob = currentframe().f_back.f_globals + kwargs['category'] = glob['__file__'].split(os.path.sep)[-2].rstrip("s") + kwargs['examples'] = kwargs.get('examples', glob.get('__examples__')) + kwargs['encmap'] = encmap + kwargs['repl_char'] = repl_char + kwargs['sep'] = sep + kwargs['ignore_case'] = ignore_case + kwargs['no_error'] = no_error + kwargs['intype'] = intype + kwargs['outype'] = outype + kwargs['module'] = glob.get('__name__') + try: + if isinstance(encmap, dict): + smapdict = {k: v for k, v in encmap.items()} + elif isinstance(encmap, list) and isinstance(encmap[0], dict): + smapdict = {k: v for k, v in encmap[0].items()} + kwargs['repl_minlen'] = i = max(1, min(map(len, set(smapdict.values()) - {''}))) + kwargs['repl_minlen_b'] = max(1, min(map(len, map(b, set(smapdict.values()) - {''})))) + except: + pass + return add(ename, __generic_code(), __generic_code(True), **kwargs) +codecs.add_map = add_map + + +def clear(): + """ Clear codext's local registry of search functions. """ + global __codecs_registry, MACROS, PERS_MACROS # noqa: F824 + __codecs_registry, MACROS, PERS_MACROS = [], {}, {} +codecs.clear = clear + + +def examples(encoding, number=10): + """ Use the search function to get the matching encodings and provide examples of valid encoding names. """ + e = [] + for name in search(encoding): + for search_function in __codecs_registry: + n = search_function.__name__ + if name in [n, n.replace("_", "-")]: + temp = [] + for s in generate_strings_from_regex(search_function.__pattern__, yield_max=16*number): + temp.append(s) + random.shuffle(temp) + i = 0 + while i < min(number, len(temp)): + if not temp[i].isdigit(): + try: + lookup(temp[i], False) + e.append(temp[i]) + except LookupError: + pass + i += 1 + for alias, codec in ALIASES.items(): + if name == codec: + if codec not in e: + e.append(codec) + if not alias.isdigit(): + e.append(alias) + random.shuffle(e) + return sorted([e[i] for i in range(min(number, len(e)))], key=_human_keys) +codecs.examples = examples + + +def is_native(encoding): + """ Determine if a given encoding is native or not. """ + return lookup(encoding, False).parameters['category'] == "native" + + +def list_categories(): + """ Get a list of all codec categories. """ + c = CODECS_CATEGORIES + root = os.path.dirname(__file__) + for d in os.listdir(root): + if os.path.isdir(os.path.join(root, d)) and not d.startswith("__"): + c.append(d.rstrip("s")) + # particular category, hardcoded from base/_base.py + c += ["base-generic"] + return list(set(c)) +list_categories() + + +def list_encodings(*categories): + """ Get a list of all codecs. """ + # if "non-native" is in the input list, extend the list with the whole categories but "native" + categories, exclude = list(categories), [] + for c in categories[:]: + if c == "non-native": + for c in CODECS_CATEGORIES: + if c == "native" or c in categories: + continue + categories.append(c) + categories.remove("non-native") + if c.startswith("~"): + exclude.append(c[1:]) + categories.remove(c) + try: + categories.remove(c[1:]) + except ValueError: + pass + # now, filter codecs according to the input list of categories + enc = [] + if (len(categories) == 0 or "native" in categories) and "native" not in exclude: + for a in set(ALIASES.values()): + try: + ci = __orig_lookup(a) + except LookupError: + continue + if lookup(a) is ci: + enc.append(ci.name) + for search_function in CODECS_OVERWRITTEN + __codecs_registry: + name = search_function.__name__.replace("_", "-") + p = search_function.__pattern__ + ci = search_function(name) if p is None else search_function(generate_string_from_regex(p)) + c = "other" if ci is None else ci.parameters['category'] + if (len(categories) == 0 or c in categories) and c not in exclude: + enc.append(name) + for category in categories: + if category not in CODECS_CATEGORIES: + raise ValueError(f"Category '{category}' does not exist") + return sorted(list(set(enc)), key=_human_keys) + + +def list_macros(): + """ Get a list of all macros, with the precedence on personal ones. """ + return sorted(list(set(list(MACROS.keys()) + list(PERS_MACROS.keys())))) + + +def remove(name): + """ Remove all search functions matching the input encoding name from codext's local registry or any macro with the + given name. """ + global __codecs_registry, MACROS, PERS_MACROS # noqa: F824 + tbr = [] + for search_function in __codecs_registry: + if search_function(name) is not None: + tbr.append(search_function) + for search_function in tbr: + __codecs_registry.remove(search_function) + try: + del MACROS[name] + except KeyError: + pass + try: + del PERS_MACROS[name] + with open(PERS_MACROS_FILE, 'w') as f: + json.dump(PERS_MACROS, f, indent=2) + except KeyError: + pass + try: + del CODECS_CACHE[name] + except KeyError: + pass + for s in ["En", "De"]: + try: + delattr(builtins, f"{name.capitalize()}{s}codeError") + except AttributeError: + pass +codecs.remove = remove + + +def reset(): + """ Reset codext's local registry of search functions and macros. """ + from importlib import reload + global __codecs_registry, CODECS_REGISTRY, MACROS, PERS_MACROS # noqa: F824 + clear() + d = os.path.dirname(__file__) + for pkg in sorted(os.listdir(d)): + if pkg.startswith("_") or not os.path.isdir(os.path.join(d, pkg)): + continue + reload(import_module("codext." + pkg)) + # backup codext's registry + if CODECS_REGISTRY is None: + CODECS_REGISTRY = __codecs_registry[:] + # restore codext's registry + else: + __codecs_registry = CODECS_REGISTRY[:] + # restore codext's embedded set of macros + with open(os.path.join(os.path.dirname(__file__), "macros.json")) as f: + MACROS = json.load(f) + # reload personal set of macros + PERS_MACROS = {} + if os.path.exists(PERS_MACROS_FILE): + with open(PERS_MACROS_FILE) as f: + PERS_MACROS = json.load(f) +codecs.reset = reset + + +# conversion functions +def b(s): + """ Non-crashing bytes conversion function. """ + try: + return s.encode("latin-1") + except: + pass + try: + return s.encode("utf-8") + except: + pass + return s + + +def ensure_str(s, encoding="utf-8", errors='strict'): + """ Dummy str conversion function. """ + if isinstance(s, bytes): + try: + return s.decode(encoding, errors) + except: + return s.decode("latin-1") + return s + + +# make conversion functions compatible with input/output strings/bytes +def fix_inout_formats(f): + """ This decorator ensures that the first output of f will have the same text format as the first input (str or + bytes). """ + @wraps(f) + def _wrapper(*args, **kwargs): + a0 = args[0] + a0_isb = isb(a0) + a0 = ensure_str(a0) if iss(a0) or a0_isb else a0 + r = f(a0, *args[1:], **kwargs) + # special case: input is in bytes ; ensure that the returned length is this of the bytes, not this processed by + # the decode/encode function + if isinstance(r, (tuple, list)) and isinstance(r[1], int) and a0_isb: + r = tuple([list(r)[0]] + [len(args[0])] + list(r)[2:]) + return (fix(r[0], args[0]), ) + r[1:] if isinstance(r, (tuple, list)) else fix(r, args[0]) + return _wrapper + + +# alphabet generation function from a given mask +def get_alphabet_from_mask(mask): + """ This function generates an alphabet from the given mask. The style used is similar to Hashcat ; group keys are + marked with a heading "?". """ + i, alphabet = 0, "" + while i < len(mask): + c = mask[i] + if c == "?" and i < len(mask) - 1 and mask[i+1] in MASKS.keys(): + for c in MASKS[mask[i+1]]: + if c not in alphabet: + alphabet += c + i += 1 + elif c not in alphabet: + alphabet += c + i += 1 + return alphabet + + +# generic error handling function +def handle_error(ename, errors, sep="", repl_char="?", repl_minlen=1, decode=False, kind="character", item="position"): + """ This shortcut function allows to handle error modes given some tuning parameters. + + :param ename: encoding name + :param errors: error handling mode + :param sep: token separator + :param repl_char: replacement character (for use when errors="replace") + :param repl_minlen: repeat number for the replacement character + :param decode: whether we are encoding or decoding + :param item: position item description (for describing the error ; e.g. "group" or "token") + """ + exc = f"{exc_name(ename)}{['En','De'][decode]}codeError" + + def _handle_error(token, position, output="", eename=None): + """ This handles an encoding/decoding error according to the selected handling mode. + + :param token: input token to be encoded/decoded + :param position: token position index + :param output: output, as decoded up to the position of the error + """ + if errors == "strict": + token = f"{token[:7]}..." if len(token := ensure_str(token)) > 10 else token + err = getattr(builtins, exc)(f"'{eename or ename}' codec can't {['en','de'][decode]}code {kind} '{token}' " + f"in {item} {position}") + err.output = output + err.__cause__ = err + raise err + elif errors == "leave": + return token + sep + elif errors == "replace": + return repl_char * repl_minlen + sep + elif errors == "ignore": + return "" + else: + raise ValueError(f"Unsupported error handling '{errors}'") + return _handle_error + + +# codecs module hooks +__orig_lookup = _codecs.lookup +__orig_register = _codecs.register + + +def __add(ename, encode=None, decode=None, pattern=None, text=True, **kwargs): + kwargs.pop('add_to_codecs', None) + return add(ename, encode, decode, pattern, text, True, **kwargs) +__add.__doc__ = add.__doc__ +codecs.add = __add + + +def decode(obj, encoding='utf-8', errors='strict'): + """ Custom decode function relying on the hooked lookup function. """ + return lookup(encoding).decode(obj, errors)[0] +codecs.decode = decode + + +def encode(obj, encoding='utf-8', errors='strict'): + """ Custom encode function relying on the hooked lookup function. """ + n, m = 1, re.search(r"\[(\d+)\]$", encoding) + if m: + n = int(m.group(1)) + encoding = re.sub(r"\[(\d+)\]$", "", encoding) + ci = lookup(encoding) + for i in range(n): + try: + obj = ci.encode(obj, errors)[0] + except (AttributeError, TypeError) as e: # occurs for encodings that require str as input while 'obj' is bytes + if str(e) not in ["'bytes' object has no attribute 'encode'", + "ord() expected string of length 1, but int found"] or \ + encoding in ["latin-1", "utf-8"]: # encodings considered when using b(...) + raise + obj = ci.encode(ensure_str(obj), errors)[0] + return obj +codecs.encode = encode + + +def lookup(encoding, macro=True): + """ Hooked lookup function for searching first for codecs in the local registry of this module. """ + # first, try to match the given encoding with codecs' search functions + for search_function in CODECS_OVERWRITTEN + __codecs_registry: + codecinfo = search_function(encoding) + if codecinfo is not None: + return codecinfo + # then, if a codec name was given, generate an encoding name from its pattern and get the CodecInfo + for search_function in CODECS_OVERWRITTEN + __codecs_registry: + if search_function.__name__.replace("_", "-") == encoding or \ + encoding in getattr(search_function, "__aliases__", []): + codecinfo = search_function(generate_string_from_regex(search_function.__pattern__)) + if codecinfo is not None: + return codecinfo + # finally, get a CodecInfo with the original lookup function and refine it with a dictionary of parameters + try: + ci = __orig_lookup(encoding) + ci.parameters = {'category': "native", 'module': "codecs", 'name': ALIASES.get(ci.name, ci.name)} + return ci + except LookupError: + if not macro: + raise + try: + return CodecMacro(encoding) + except LookupError: + e = LookupError(f"unknown encoding: {encoding}") + e.__cause__ = e # stop exception chaining + raise e +codecs.lookup = lookup + + +def register(search_function, add_to_codecs=False): + """ Register function for registering new codecs in the local registry of this module and, if required, in the + native codecs registry (for use with the built-in 'open' function). + + :param search_function: search function for the codecs registry + :param add_to_codecs: also add the search function to the native registry + NB: this will make the codec available in the built-in open(...) but will make it impossible + to remove the codec later + """ + if search_function not in __codecs_registry: + try: + __orig_lookup(search_function.__name__) + l = CODECS_OVERWRITTEN + except LookupError: + l = __codecs_registry + l.append(search_function) + if add_to_codecs: + __orig_register(search_function) + + +def __register(search_function): + """ Same as register(...), but with add_to_codecs set by default to True. """ + register(search_function, True) +codecs.register = __register + + +def search(encoding_regex, extended=True): + """ Function similar to lookup but allows to search for an encoding based on a regex instead. It searches this way + into the local registry but also tries a simple lookup with the original lookup function. """ + matches = [] + for search_function in CODECS_OVERWRITTEN + __codecs_registry: + n = search_function.__name__ + for name in [n, n.replace("_", "-")]: + if re.search(encoding_regex, name): + matches.append(n.replace("_", "-")) + continue + if extended: + # in some cases, encoding_regex can match a generated string that uses a particular portion of its + # generating pattern ; e.g. we expect encoding_regex="uu_" to find "uu" and "uu_codec" while it can also + # find "morse" or "atbash" very rarely because of their dynamic patterns and the limited number of randomly + # generated strings + # so, we can use a qualified majority voting to ensure we do not get a "junk" encoding in the list of + # matches ; executing 5 times the string generation for a given codec but adding the codec to the list of + # matches only if we get at least 3 matches ensures that we consider up to 2 failures that could be + # stochastic, therefore drastically decreasing the probability to get a "junk" encoding in the matches list + c = 0 + for i in range(5): + for s in generate_strings_from_regex(search_function.__pattern__): + if re.search(encoding_regex, s): + c += 1 + break + if c >= 3: + matches.append(n) + break + for s, n in ALIASES.items(): + if re.search(encoding_regex, s) or re.search(encoding_regex, n): + matches.append(n) + return sorted(list(set(matches)), key=_human_keys) +codecs.search = search + + +# utility function for the search feature +CATEGORIES = { + 'digit': digits, + 'not_digit': reduce(lambda x, c: x.replace(c, ""), digits, printable), + 'space': whitespace, + 'not_space': reduce(lambda x, c: x.replace(c, ""), whitespace, printable), + 'word': ascii_letters + digits + '_', + 'not_word': reduce(lambda x, c: x.replace(c, ""), ascii_letters + digits + '_', printable), +} +REPEAT_MAX = 10 +STAR_PLUS_MAX = 10 +YIELD_MAX = 100 + + +def __gen_str_from_re(regex, star_plus_max, repeat_max, yield_max, parsed=False): + """ Recursive function to generate strings from a regex pattern. """ + if regex is None: + return + __groups = {} + tokens = [] + negate, last_rand = False, None + for state in (regex if parsed else re.sre_parse.parse(b(getattr(regex, "pattern", regex)))): + code = getattr(state[0], "name", state[0]).lower() + value = getattr(state[1], "name", state[1]) + value = value.lower() if isinstance(value, str) else value + if code in ["assert_not", "at"]: + continue + elif code == "any": + charset = list(printable.replace("\n", "")) + while charset[0] == last_rand and len(charset) > 1: + random.shuffle(charset) + last_rand = charset[0] + tokens.append(charset) # should be ord(x) with x belongs to [0, 256[ + elif code == "assert": + tokens.append(list(__gen_str_from_re(value[1], star_plus_max, repeat_max, yield_max, True))) + elif code == "branch": + result = [] + for r in value[1]: + result += list(__gen_str_from_re(r, star_plus_max, repeat_max, yield_max, True)) or [""] + tokens.append(result) + elif code == "category": + charset = list(CATEGORIES[value[9:]]) + if negate: + negate = False + charset = list(set(printable).difference(charset)) + while charset[0] == last_rand and len(charset) > 1: + random.shuffle(charset) + last_rand = charset[0] + tokens.append(charset) + elif code == "groupref": + tokens.extend(__groups[value]) + elif code == "in": + subtokens = list(__gen_str_from_re(value, star_plus_max, repeat_max, yield_max, True)) + subtokens = [x for l in subtokens for x in l] + tokens.append(subtokens) + elif code == "literal": + tokens.append(chr(value)) + elif code in ["max_repeat", "min_repeat"]: + start, end = value[:2] + end = min(end, star_plus_max) + start = min(start, end) + charset = list(__gen_str_from_re(value[-1], star_plus_max, repeat_max, yield_max, True)) + subtokens = [] + if start == 0 and end == 1: + subtokens.append("") + subtokens.extend(charset) + elif len(charset) ** end > repeat_max: + for i in range(min(repeat_max, 10 * len(charset))): + n = random.randint(start, end + 1) + token = "" if n == 0 else "".join(random.choice(charset) for i in range(n)) + if token not in subtokens: + subtokens.append(token) + else: + i -= 1 + else: + for n in range(start, end + 1): + for c in product(charset, repeat=n): + subtokens.append("".join(c)) + tokens.append(subtokens) + elif code == "negate": + negate = True + elif code == "not_literal": + charset = list(printable.replace(chr(value), "")) + while charset[0] == last_rand and len(charset) > 1: + random.shuffle(charset) + last_rand = charset[0] + tokens.append(charset) + elif code == "range": + tokens.append("".join(chr(i) for i in range(value[0], value[1] + 1))) + elif code == "subpattern": + result = list(__gen_str_from_re(value[-1], star_plus_max, repeat_max, yield_max, True)) + if value[0]: + __groups[value[0]] = result + tokens.append(result) + else: + raise NotImplementedError(f"Unhandled code '{code}'") + if len(tokens) == 0: + tokens = [""] + i = 0 + for result in product(*tokens): + yield "".join(result) + i += 1 + if i >= yield_max: + break + + +def _human_keys(text): + """ Sorting function for considering strings with numbers (e.g. base2, base10, base100) """ + tokens = [] + for s in re.split(r"(\d+|\D+)", text): + tokens.append(int(s) if s.isdigit() else s) + return tokens + + +def generate_string_from_regex(regex): + """ Utility function to generate a single string from a regex pattern. """ + if regex: + return list(generate_strings_from_regex(regex, yield_max=1))[0] + + +def generate_strings_from_regex(regex, star_plus_max=STAR_PLUS_MAX, repeat_max=REPEAT_MAX, yield_max=YIELD_MAX): + """ Utility function to generate strings from a regex pattern. """ + for r in __gen_str_from_re(regex, star_plus_max, repeat_max, yield_max): + yield r + + +# guess feature objects +__module_exists = lambda n: n in [x[1] for x in iter_modules()] +stopfunc = ModuleType("stopfunc", """ + Predefined stop functions + ~~~~~~~~~~~~~~~~~~~~~~~~~ + + This submodule contains stop functions for the guess feature of codext. + + - `flag`: searches for the pattern "[Ff][Ll1][Aa4@][Gg9]" (either UTF-8 or UTF-16) + - `lang_**`: checks if the given lang (any from the PROFILES_DIRECTORY of the langdetect module) is detected + - `printables`: checks that every output character is in the set of printables + - `regex`: takes one argument, the regular expression, for checking a string against the given pattern + - `text`: checks for printables and an entropy less than 4.6 (empirically determined) +""") +stopfunc.printables = lambda s: all(c in printable for c in ensure_str(s)) +stopfunc.printables.__name__ = stopfunc.printables.__qualname__ = "printables" +stopfunc.regex = lambda p: lambda s: re.search(p, ensure_str(s)) is not None +stopfunc.regex.__name__ = stopfunc.regex.__qualname__ = "regex" +stopfunc.text = lambda s: stopfunc.printables(s) and entropy(s) < 4.6 +stopfunc.text.__name__ = stopfunc.text.__qualname__ = "text" +stopfunc.flag = lambda x: re.search(r"[Ff][Ll1][Aa4@][Gg96]", ensure_str(x)) is not None +stopfunc.flag.__name__ = stopfunc.flag.__qualname__ = "flag" +stopfunc.default = stopfunc.text + +stopfunc.LANG_BACKEND = None +stopfunc.LANG_BACKENDS = [n for n in ["pycld2", "langdetect", "langid", "cld3", "textblob"] if __module_exists(n)] +if len(stopfunc.LANG_BACKENDS) > 0: + stopfunc.LANG_BACKEND = stopfunc.LANG_BACKENDS[0] +if "cld3" in stopfunc.LANG_BACKENDS: + stopfunc.CLD3_LANGUAGES = "af|am|ar|bg|bn|bs|ca|ce|co|cs|cy|da|de|el|en|eo|es|et|eu|fa|fi|fr|fy|ga|gd|gl|gu|ha|" \ + "hi|hm|hr|ht|hu|hy|id|ig|is|it|iw|ja|jv|ka|kk|km|kn|ko|ku|ky|la|lb|lo|lt|lv|mg|mi|mk|" \ + "ml|mn|mr|ms|mt|my|ne|nl|no|ny|pa|pl|ps|pt|ro|ru|sd|si|sk|sl|sm|sn|so|sq|sr|st|su|sv|" \ + "sw|ta|te|tg|th|tr|uk|ur|uz|vi|xh|yi|yo|zh|zu".split("|") +if "textblob" in stopfunc.LANG_BACKENDS: + stopfunc.TEXTBLOB_LANGUAGES = "af|ar|az|be|bg|bn|ca|cs|cy|da|de|el|en|eo|es|et|eu|fa|fi|fr|ga|gl|gu|hi|hr|ht|hu|" \ + "id|is|it|iw|ja|ka|kn|ko|la|lt|lv|mk|ms|mt|nl|no|pl|pt|ro|ru|sk|sl|sq|sr|sv|sw|ta|" \ + "te|th|tl|tr|uk|ur|vi|yi|zh".split("|") + + +# guess/rank/score functionality extracted to _guess.py +from ._guess import guess, rank, _load_lang_backend, _validate +stopfunc._reload_lang = _load_lang_backend +stopfunc._validate = _validate +codecs.guess = guess +codecs.rank = rank diff --git a/src/codext/_guess.py b/src/codext/_guess.py new file mode 100644 index 0000000..3ae2f34 --- /dev/null +++ b/src/codext/_guess.py @@ -0,0 +1,343 @@ +# -*- coding: UTF-8 -*- +"""Extracted guess/rank/score functionality from __common__.py.""" +import codecs +import os +import re +from types import FunctionType + +from .__common__ import ( + ensure_str, b, isb, lookup, list_encodings, search, + CODECS_CACHE, CODECS_CATEGORIES, printable, entropy, + LANG, decode, stopfunc, +) + + +def _detect(text): + _lb, t = stopfunc.LANG_BACKEND, ensure_str(text) + if _lb is None: + raise ValueError("No language backend %s" % ["selected", "installed"][len(stopfunc.LANG_BACKENDS) == 0]) + return langid.classify(t)[0] if _lb == "langid" else \ + langdetect.detect(t) if _lb == "langdetect" else \ + pycld2.detect(t)[2][0][1] if _lb == "pycld2" else \ + cld3.get_language(t).language[:2] if _lb == "cld3" else \ + textblob.TextBlob(t).detect_language()[:2] + + +def _lang(lang): + def _test(s): + if not stopfunc.text(s): + return False + try: + return _detect(ensure_str(s))[:2] == lang + except: + return False + return _test + + +def _load_lang_backend(backend=None): + # import the requested backend library if not imported yet + if backend is None or backend in stopfunc.LANG_BACKENDS: + stopfunc.LANG_BACKEND = backend + if backend: + globals()[backend] = __import__(backend) + else: + raise ValueError("Unsupported language detection backend") + # remove language-related stop functions + for attr in dir(stopfunc): + if attr.startswith("_") or not isinstance(getattr(stopfunc, attr), FunctionType): + continue + if re.match(r"lang_[a-z]{2}$", attr): + delattr(stopfunc, attr) + # rebind applicable language-related stop functions + if stopfunc.LANG_BACKEND: + _lb = stopfunc.LANG_BACKEND + if _lb == "langid": + langid.langid.load_model() + for lang in ( + langid.langid.identifier.nb_classes if _lb == "langid" else \ + list(set(p[:2] for p in os.listdir(langdetect.PROFILES_DIRECTORY))) if _lb == "langdetect" else \ + list(set(x[1][:2] for x in pycld2.LANGUAGES if x[0] in pycld2.DETECTED_LANGUAGES)) if _lb == "pycld2" else \ + stopfunc.CLD3_LANGUAGES if _lb == "cld3" else \ + stopfunc.TEXTBLOB_LANGUAGES if _lb == "textblob" else \ + []): + n = f"lang_{lang}" + setattr(stopfunc, n, _lang(lang)) + getattr(stopfunc, n).__name__ = getattr(stopfunc, n).__qualname__ = n + if LANG: + flng = f"lang_{LANG}" + if getattr(stopfunc, flng, None): + stopfunc.default = getattr(stopfunc, flng) + + +def _validate(stop_function, lang_backend="none"): + s, lb = stop_function, lang_backend + if isinstance(s, str): + if re.match(r"lang_[a-z]{2}$", s) and lb != "none" and \ + all(re.match(r"lang_[a-z]{2}$", x) is None for x in dir(stopfunc)): + stopfunc._reload_lang(lb) + f = getattr(stopfunc, s, None) + if f: + return f + elif not isinstance(s, FunctionType): + raise ValueError("Bad stop function") + return s + + +def __guess(prev_input, input, stop_func, depth, max_depth, min_depth, encodings, result, found=(), + stop=True, show=False, scoring_heuristic=False, extended=False, debug=False): + """ Perform a breadth-first tree search using a ranking logic to select and prune the list of codecs. """ + if depth > min_depth and stop_func(input): + if not stop and (show or debug) and found not in result: + s = repr(input) + s = s[2:-1] if s.startswith("b'") and s.endswith("'") else s + s = f"[+] {', '.join(found)}: {s}" + print(s if len(s) <= 80 else f"{s[:77]}...") + result[found] = input + if depth >= max_depth or len(result) > 0 and stop: + return + prev_enc = found[-1] if len(found) > 0 else "" + e = encodings.get(depth, encodings.get(-1, [])) + for new_input, encoding in __rank(prev_input, input, prev_enc, e, scoring_heuristic, extended): + if len(result) > 0 and stop: + return + if debug: + print(f"[*] Depth {depth+1:0{len(str(max_depth))}}/{max_depth}: {encoding}") + __guess(input, new_input, stop_func, depth+1, max_depth, min_depth, encodings, result, found + (encoding, ), + stop, show, scoring_heuristic, extended, debug) + + +def __make_encodings_dict(include, exclude): + """ Process encodings inclusion and exclusion lists, listing categories and developping codecs' lists of possible + encoding names. It also creates a cache with the CodecInfo objects for improving performance. """ + def _develop(d, keep=True): + d = d or {} + for k, v in d.items(): + l, cc, sc = [], [e for e in v if e in CODECS_CATEGORIES], [e for e in v if e not in CODECS_CATEGORIES] + # list from in-scope categories and then everything that is not a category + for enc in ((list_encodings(*cc) if (len(cc) > 0 or keep) and len(sc) == 0 else []) + sc): + g = [] + for e in (search(enc, False) or [enc]): + try: + ci = lookup(e, False) + g.extend(ci.parameters['guess']) + except: + pass + if enc in g: # e.g. "rot-1" => ["rot-1", "rot-2", ...] ; only "rot-1" is to be selected + l.append(enc) + else: # e.g. "rot" => ["rot-1", "rot-2", ...] ; all the "rot-N" shall be selected + l.extend(g) + d[k] = list(set(l)) + return d + _excl, _incl = _develop(exclude, False), _develop(include) + return {k: [x for x in v if x not in _excl.get(k, [])] for k, v in _incl.items()} + + +def __rank(prev_input, input, prev_encoding, encodings, heuristic=False, extended=False, yield_score=False): + """ Filter valid encodings and rank them by relevance. """ + ranking = {} + for e in encodings: + try: + codec = CODECS_CACHE[e] + except KeyError: + try: + CODECS_CACHE[e] = codec = lookup(e, False) + except LookupError: + continue + t = __score(prev_input, input, prev_encoding, e, codec, heuristic, extended) + if t: + ranking[e] = t + for encoding, result in sorted(ranking.items(), key=lambda x: (-x[1][0], x[0])): + yield result if yield_score else result[1], encoding + + +class _Text(object): + __slots__ = ["entropy", "lcharset", "len", "padding", "printables", "text"] + + def __init__(self, text, pad_char=None): + self.text = ensure_str(text) + c = self.text[-1] + pad_char, last_char = (chr(pad_char), chr(c)) if isinstance(c, int) else (pad_char, c) + self.padding = pad_char is not None and last_char == pad_char + if self.padding: + text = text.rstrip(b(pad_char) if isinstance(text, bytes) else pad_char) + self.len = len(self.text) + self.lcharset = len(set(self.text)) + self.printables = float(len([c for c in self.text if c in printable])) / self.len + self.entropy = entropy(self.text) + + +def __score(prev_input, input, prev_encoding, encoding, codec, heuristic=False, extended=False): + """ Score relevant encodings given an input. """ + obj = None + sc = codec.parameters.get('scoring', {}) + no_error, transitive = codec.parameters.get('no_error', False), sc.get('transitive', False) + # ignore encodings that fail to decode with their default errors handling value + try: + new_input = codec.decode(input)[0] + except: + return + # ignore encodings that give an output identical to the input (identity transformation) or to the previous input + if len(new_input) == 0 or prev_input is not None and b(input) == b(new_input) or b(prev_input) == b(new_input): + return + # ignore encodings that transitively give the same output (identity transformation by chaining twice a same + # codec (e.g. rot-15 is equivalent to rot-3 and rot-12 or rot-6 and rot-9) + if transitive and prev_encoding: + ci_prev = lookup(prev_encoding, False) + if ci_prev.parameters['name'] == codec.parameters['name']: + return + # compute input's characteristics only once and only if the control flow reaches this point + pad = sc.get('padding_char') + if obj is None: + obj = _Text(input, pad) + if heuristic: + # from here, the goal (e.g. if the input is Base32) is to rank candidate encodings (e.g. multiple base + # codecs) so that we can put the right one as early as possible and eventually exclude bad candidates + s = -sc.get('penalty', .0) + # first, apply a bonus if the length of input text's charset is exactly the same as encoding's charset ; + # on the contrary, if the length of input text's charset is strictly greater, give a penalty + lcs = sc.get('len_charset', 256) + if isinstance(lcs, type(lambda: None)): + lcs = int(lcs(encoding)) + if (pad and obj.padding and lcs + 1 >= obj.lcharset) or lcs >= obj.lcharset: + s += max(.0, round(.6 * (.99 ** (lcs - obj.lcharset)), 5) - .1) + elif (pad and obj.padding and lcs + 1 < obj.lcharset) or lcs < obj.lcharset: + s -= .2 # this can occur for encodings with no_error set to True + # then, take padding into account, giving a bonus if padding is to be encountered and effectively present, + # or a penalty when it should not be encountered but it is present + if pad and obj.padding: + s += .2 # when padding is encountered while it is legitimate, it could be a good indication => bonus + elif not pad and obj.padding: + s -= .1 # it could arise a padding character is encountered while not being padding => small penalty + # give a bonus when the rate of printable characters is greater or equal than expected and a penalty when + # lower only for codecs that DO NOT tolerate errors (otherwise, the printables rate can be biased) + if not no_error: + pr = sc.get('printables_rate', 0) + if isinstance(pr, type(lambda: None)): + pr = float(pr(obj.printables)) + if obj.printables - pr <= .05: + s += .1 + expf = sc.get('expansion_factor', 1.) + if expf: + f = obj.len / float(len(new_input)) # expansion while encoding => at decoding: 1/f + if isinstance(expf, type(lambda: None)): + try: # this case allows to consider the current encoding name from the current codec + expf = expf(f, encoding) + except TypeError: + expf = expf(f) + if isinstance(expf, (int, float)): + expf = 1/f - .1 <= 1/expf <= 1/f + .1 + elif isinstance(expf, (tuple, list)) and len(expf) == 2: + expf = 1/f - expf[1] <= 1/expf[0] <= 1/f + expf[1] + s += [-1., .1][expf] + # afterwards, if the input text has an entropy close to the expected one, give a bonus weighted on the + # number of input characters to take bad entropies of shorter strings into account + entr = sc.get('entropy', lambda e: e) + entr = entr.get(encoding, entr.get('default')) if isinstance(entr, dict) else entr + if isinstance(entr, type(lambda: None)): + try: # this case allows to consider the current encoding name from the current codec + entr = entr(obj.entropy, encoding) + except TypeError: + entr = entr(obj.entropy) + if entr is not None: + # use a quadratic heuristic to compute a weight for the entropy delta, aligned on (256,.2) and (512,1) + d_entr = min(3.04575e-06 * obj.len**2 + .000394 * obj.len, 1) * abs(entr - obj.entropy) + if d_entr <= .5: + s += .5 - d_entr + # finally, if relevant, apply a custom bonus (e.g. when a regex pattern is matched) + bonus = sc.get('bonus_func') + if bonus is not None: + if isinstance(bonus, type(lambda: None)): + bonus = bonus(obj, codec, encoding) + if bonus: + s += .2 + else: + s = 1. + # exclude negative (and eventually null) scores as they are (hopefully) not relevant + if extended and s >= .0 or not extended and s > .0: + return s, new_input + + +def guess(input, stop_func=stopfunc.default, min_depth=0, max_depth=5, include=None, exclude=None, found=(), + stop=True, show=False, scoring_heuristic=True, extended=False, debug=False): + """ Try decoding without the knowledge of the encoding(s). + + :param input: input text to be guessed + :param stop_func: function defining the stop condition + :param min_depth: minimum search depth + :param max_depth: maximum search depth + ;param include: inclusion item OR list with category, codec or encoding names OR dictionary with lists per + depth (nothing means include every encoding) + :param exclude: exclusion item OR list with category, codec or encoding names OR dictionary with lists per + depth (nothing means exclude no encoding) + :param found: tuple of already found encodings + :param stop: whether to stop or not when a valid solution is found + :param show: whether to immediately show once a solution is found + :param scoring_heuristic: whether to apply the scoring heuristic during the search (if disabled, all scores are 1., + meaning that every non-failing encoding will be considered with no order of precedence) + :param extended: whether to also consider null scores with the heuristic + :param debug: whether to show each attempt at each depth during computation + """ + if len(input) == 0: + return "" + # check for min and max depths + if max_depth <= 0: + raise ValueError("Depth must be a non-null positive integer") + if min_depth > max_depth: + raise ValueError("Min depth shall be less than or equal to the max depth") + # take the tuple of found encodings into account + if len(found) > 0: + for encoding in found: + input = decode(input, encoding) + # handle the stop function as a regex if a string was given + if isinstance(stop_func, str): + stop_func = stopfunc.regex(stop_func) + # reformat include and exclude arguments ; supported formats: + for n, l in zip(["inc", "exc"], [include, exclude]): + if l is None: + if n == "inc": + include = l = {-1: CODECS_CATEGORIES} + else: + exclude = l = {} + # "category" OR "enc_name" OR whatever => means a single item for all depths + if isinstance(l, str): + if n == "inc": + include = l = {-1: [l]} + else: + exclude = l = {-1: [l]} + # ["enc_name1", "enc_name2", ...] => means for all depths + if isinstance(l, (list, tuple)): + if n == "inc": + include = l = {-1: l} + else: + exclude = l = {-1: l} + # {-1: [...], 2: [...], ...} => means prefedined depths with their lists of in-/excluded encodings + if not isinstance(l, dict) or not all(isinstance(k, int) for k in l.keys()): + raise ValueError("Include argument shall be a list or a dictionary with integer keys") + # precompute encodings lists per depth and cache the related CodecInfo objects + encodings, result = __make_encodings_dict(include, exclude), {} + try: + # breadth-first search + for d in range(max_depth): + __guess("", input, stop_func, 0, d+1, min_depth, encodings, result, tuple(found), stop, show, + scoring_heuristic, extended, debug) + if stop and len(result) > 0: + break + except KeyboardInterrupt: + pass + CODECS_CACHE = {} + return result + + +def rank(input, extended=False, limit=-1, include=None, exclude=None): + """ Rank the most probable encodings based on the given input. + + :param input: input text to be evaluated + :param extended: whether to consider null scores too (NB: negative scores are not output !) + :param limit: number of encodings to be returned (-1 means all of them) + :param include: inclusion list with category, codec or encoding names (nothing means include every encoding) + :param exclude: exclusion list with category, codec or encoding names (nothing means exclude no encoding) + """ + encodings = __make_encodings_dict(include if isinstance(include, dict) else {-1: include or CODECS_CATEGORIES}, + exclude if isinstance(exclude, dict) else {-1: exclude or []}) + r = list(__rank(None, input, "", encodings[-1], True, extended, True)) + return r[:limit] if len(r) > 1 else r