# Copyright (c) 2011-2015 Rusty Wagner # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import threading import time import struct import X86 import PPC import Arm from ElfFile import * from PEFile import * from MachOFile import * ExeFormats = [ElfFile, PEFile, MachOFile] class InstructionText: def __init__(self): self.lines = [] self.tokens = [] class X86Instruction: def __init__(self, opcode, addr, disasm, addr_size): self.opcode = opcode self.addr = addr self.disasm = disasm self.addr_size = addr_size self.plt = None self.text = InstructionText() if ((disasm.operation == "jmpn") or self.isCall() or self.isConditionalBranch()) and (disasm.operands[0].operand == "imm"): self.target = disasm.operands[0].immediate else: self.target = None def isConditionalBranch(self): return self.disasm.operation in ["jo", "jno", "jb", "jae", "je", "jne", "jbe", "ja", "js", "jns", "jpe", "jpo", "jl", "jge", "jle", "jg", "jcxz", "jecxz", "jrcxz", "loop"] def isLocalJump(self): if self.disasm.operation == "jmpn": return True if self.isConditionalBranch(): return True return False def isCall(self): if self.disasm.operation == "calln": return True if self.disasm.operation == "callf": return True return False def isBlockEnding(self): if self.disasm.operation in ["jmpn", "jmpf", "retn", "retf"]: return True if self.isConditionalBranch(): return True if self.disasm.operation == "hlt": return True return False def isValid(self): return self.disasm.operation != None def length(self): return self.disasm.length def format_text(self, block, options): old_lines = [] old_tokens = [] self.text.lines = [] self.text.tokens = [] line = [] tokens = [] x = 0 instr = self.disasm if "address" in options: string = "%.8x " % self.addr line += [[string, QColor(0, 0, 128)]] x += len(string) if instr.operation == None: line += [["??", Qt.black]] self.text.lines += [line] self.text.tokens += [tokens] return (old_lines != self.text.lines) or (old_tokens != self.text.tokens) result = "" operation = "" if instr.flags & X86.FLAG_LOCK: operation += "lock " if instr.flags & X86.FLAG_ANY_REP: operation += "rep" if instr.flags & X86.FLAG_REPNE: operation += "ne" elif instr.flags & X86.FLAG_REPE: operation += "e" operation += " " operation += instr.operation if len(operation) < 7: operation += " " * (7 - len(operation)) result += operation + " " for j in range(0, len(instr.operands)): if j != 0: result += ", " if instr.operands[j].operand == "imm": value = instr.operands[j].immediate & ((1 << (instr.operands[j].size * 8)) - 1) numfmt = "0x%%.%dx" % (instr.operands[j].size * 2) string = numfmt % value if (instr.operands[j].size == self.addr_size) and (block.analysis.functions.has_key(value)): # Pointer to existing function func = block.analysis.functions[value] string = func.name if func.plt: color = QColor(192, 0, 192) else: color = QColor(0, 0, 192) if len(result) > 0: line += [[result, Qt.black]] x += len(result) result = "" line += [[string, color]] tokens += [[x, len(string), "ptr", value, string]] x += len(string) elif (instr.operands[j].size == self.addr_size) and (value >= block.exe.start()) and (value < block.exe.end()) and (not self.isLocalJump()): # Pointer within module if len(result) > 0: line += [[result, Qt.black]] x += len(result) result = "" if value in block.exe.symbols_by_addr: string = block.exe.symbols_by_addr[value] line += [[string, QColor(0, 0, 192)]] tokens += [[x, len(string), "ptr", value, string]] x += len(string) else: result += string elif instr.operands[j].operand == "mem": plus = False result += X86.get_size_string(instr.operands[j].size) if (instr.segment != None) or (instr.operands[j].segment == "es"): result += instr.operands[j].segment + ":" result += '[' if instr.operands[j].components[0] != None: tokens += [[x + len(result), len(instr.operands[j].components[0]), "reg", instr.operands[j].components[0]]] result += instr.operands[j].components[0] plus = True if instr.operands[j].components[1] != None: if plus: tokens += [[x + len(result) + 1, len(instr.operands[j].components[1]), "reg", instr.operands[j].components[1]]] else: tokens += [[x + len(result), len(instr.operands[j].components[1]), "reg", instr.operands[j].components[1]]] result += X86.get_operand_string(instr.operands[j].components[1], instr.operands[j].scale, plus) plus = True if (instr.operands[j].immediate != 0) or ((instr.operands[j].components[0] == None) and (instr.operands[j].components[1] == None)): if plus and (instr.operands[j].immediate >= -0x80) and (instr.operands[j].immediate < 0): result += '-' result += "0x%.2x" % (-instr.operands[j].immediate) elif plus and (instr.operands[j].immediate > 0) and (instr.operands[j].immediate <= 0x7f): result += '+' result += "0x%.2x" % instr.operands[j].immediate elif plus and (instr.operands[j].immediate >= -0x8000) and (instr.operands[j].immediate < 0): result += '-' result += "0x%.8x" % (-instr.operands[j].immediate) elif instr.flags & X86.FLAG_64BIT_ADDRESS: if plus: result += '+' value = instr.operands[j].immediate string = "0x%.16x" % instr.operands[j].immediate if hasattr(block.exe, "plt") and block.exe.plt.has_key(value): # Pointer to PLT entry self.plt = block.exe.plt[value] if len(result) > 0: line += [[result, Qt.black]] x += len(result) result = "" string = self.plt + "@PLT" line += [[string, QColor(0, 0, 192)]] tokens += [[x, len(string), "ptr", value, string]] x += len(string) elif (value >= block.exe.start()) and (value < block.exe.end()): # Pointer within module if len(result) > 0: line += [[result, Qt.black]] x += len(result) result = "" if value in block.exe.symbols_by_addr: string = block.exe.symbols_by_addr[value] line += [[string, QColor(0, 0, 192)]] tokens += [[x, len(string), "ptr", value, string]] x += len(string) else: result += string else: if plus: result += '+' value = instr.operands[j].immediate & 0xffffffff string = "0x%.8x" % value if (self.addr_size == 4) and hasattr(block.exe, "plt") and block.exe.plt.has_key(value): # Pointer to PLT entry self.plt = block.exe.plt[value] if len(result) > 0: line += [[result, Qt.black]] x += len(result) result = "" string = block.exe.decorate_plt_name(self.plt) line += [[string, QColor(0, 0, 192)]] tokens += [[x, len(string), "ptr", value, string]] x += len(string) elif (self.addr_size == 4) and (value >= block.exe.start()) and (value < block.exe.end()): # Pointer within module if len(result) > 0: line += [[result, Qt.black]] x += len(result) result = "" if value in block.exe.symbols_by_addr: string = block.exe.symbols_by_addr[value] line += [[string, QColor(0, 0, 192)]] tokens += [[x, len(string), "ptr", value, string]] x += len(string) else: result += string result += ']' else: tokens += [[x + len(result), len(instr.operands[j].operand), "reg", instr.operands[j].operand]] result += instr.operands[j].operand if len(result) > 0: line += [[result, Qt.black]] self.text.lines += [line] self.text.tokens += [tokens] return (old_lines != self.text.lines) or (old_tokens != self.text.tokens) def get_prefix_count(self): for count in range(0, len(self.opcode)): if self.opcode[count] in ["\x26", "\x2e", "\x36", "\x3e", "\x64", "\x65", "\x66", "\x67", "\xf0", "\xf2", "\xf3"]: continue if (self.addr_size == 8) and (self.opcode[count] >= '\x40') and (self.opcode[count] <= "\x4f"): continue return count return len(self.opcode) def patch_to_nop(self, exe): exe.write(self.addr, "\x90" * len(self.opcode)) def is_patch_branch_allowed(self): return self.isConditionalBranch() def patch_to_always_branch(self, exe): prefix_count = self.get_prefix_count() if self.opcode[prefix_count] == "\x0f": # Two byte branch exe.write(self.addr, ("\x90" * (prefix_count + 1)) + "\xb9" + self.opcode[prefix_count+2:]) else: # One byte branch exe.write(self.addr, ("\x90" * prefix_count) + "\xeb" + self.opcode[prefix_count+1:]) def patch_to_invert_branch(self, exe): prefix_count = self.get_prefix_count() if self.opcode[prefix_count] == "\x0f": # Two byte branch exe.write(self.addr, self.opcode[0:prefix_count+1] + chr(ord(self.opcode[prefix_count+1]) ^ 1) + self.opcode[prefix_count+2:]) else: # One byte branch exe.write(self.addr, self.opcode[0:prefix_count] + chr(ord(self.opcode[prefix_count]) ^ 1) + self.opcode[prefix_count+1:]) def is_patch_to_zero_return_allowed(self): return self.isCall() def patch_to_zero_return(self, exe): exe.write(self.addr, "\x31\xc0" + ("\x90" * (len(self.opcode) - 2))) def is_patch_to_fixed_return_value_allowed(self): return self.isCall() and (len(self.opcode) >= 5) def patch_to_fixed_return_value(self, exe, value): if len(self.opcode) < 5: return exe.write(self.addr, "\xb8" + struct.pack(" 0): if type(disasm.operands[-1]) != str: self.target = disasm.operands[-1] def isConditionalBranch(self): return self.disasm.operation in PPC.ConditionalBranches def isLocalJump(self): if self.disasm.operation in ["b", "ba"]: return True if self.isConditionalBranch(): return True return False def isCall(self): return self.disasm.operation in PPC.CallInstructions def isBlockEnding(self): if self.disasm.operation in (PPC.BranchInstructions + ["trap"]): return True if self.isLocalJump(): return True return False def isValid(self): return self.disasm.operation != None def length(self): return 4 def format_text(self, block, options): old_lines = [] old_tokens = [] self.text.lines = [] self.text.tokens = [] line = [] tokens = [] x = 0 instr = self.disasm if "address" in options: string = "%.8x " % self.addr line += [[string, QColor(0, 0, 128)]] x += len(string) if instr.operation == None: line += [["??", Qt.black]] self.text.lines += [line] self.text.tokens += [tokens] return (old_lines != self.text.lines) or (old_tokens != self.text.tokens) result = "" operation = instr.operation if len(operation) < 7: operation += " " * (7 - len(operation)) result += operation + " " for j in range(0, len(instr.operands)): if j != 0: result += ", " if type(instr.operands[j]) != str: value = instr.operands[j] if instr.operands[j] < 0: string = "-0x%x" % -instr.operands[j] else: string = "0x%x" % instr.operands[j] if block.analysis.functions.has_key(value): # Pointer to existing function func = block.analysis.functions[value] string = func.name if func.plt: color = QColor(192, 0, 192) else: color = QColor(0, 0, 192) if len(result) > 0: line += [[result, Qt.black]] x += len(result) result = "" line += [[string, color]] tokens += [[x, len(string), "ptr", value, string]] x += len(string) elif (value >= block.exe.start()) and (value < block.exe.end()) and (not self.isLocalJump()): # Pointer within module if len(result) > 0: line += [[result, Qt.black]] x += len(result) result = "" if value in block.exe.symbols_by_addr: string = block.exe.symbols_by_addr[value] line += [[string, QColor(0, 0, 192)]] tokens += [[x, len(string), "ptr", value, string]] x += len(string) else: result += string else: tokens += [[x + len(result), len(instr.operands[j]), "reg", instr.operands[j]]] result += instr.operands[j] if len(result) > 0: line += [[result, Qt.black]] self.text.lines += [line] self.text.tokens += [tokens] return (old_lines != self.text.lines) or (old_tokens != self.text.tokens) def patch_to_nop(self, exe): exe.write(self.addr, "\x60\x00\x00\x00") def is_patch_branch_allowed(self): return self.isConditionalBranch() def patch_to_always_branch(self, exe): pass def patch_to_invert_branch(self, exe): pass def is_patch_to_zero_return_allowed(self): return self.isCall() def patch_to_zero_return(self, exe): pass def is_patch_to_fixed_return_value_allowed(self): return self.isCall() def patch_to_fixed_return_value(self, exe, value): pass class ArmInstruction: def __init__(self, opcode, addr, disasm): self.opcode = opcode self.addr = addr self.disasm = disasm self.plt = None self.text = InstructionText() self.target = None if self.disasm.operation and ((self.disasm.operation == "b") or (self.disasm.operation == "bx") or (self.disasm.operation == "bl") or (self.disasm.operation == "blx")) and (type(self.disasm.operands[0]) != str): self.target = disasm.operands[0] elif self.disasm.operation and ((self.disasm.operation[0:2] == "b.") or (self.disasm.operation[0:3] == "bx.") or (self.disasm.operation[0:3] == "bl.") or (self.disasm.operation[0:4] == "blx.")) and (type(self.disasm.operands[0]) != str): self.target = disasm.operands[0] def isConditionalBranch(self): if self.disasm.operation and ((self.disasm.operation[0:2] == "b.") or (self.disasm.operation[0:3] == "bx.")) and (self.target is not None): return True return False def isLocalJump(self): if self.disasm.operation and ((self.disasm.operation == "b") or (self.disasm.operation == "bx")) and (self.target is not None): return True return self.isConditionalBranch() def isCall(self): return self.disasm.operation and ((self.disasm.operation == "bl") or (self.disasm.operation == "blx") or (self.disasm.operation[0:3] == "bl.") or (self.disasm.operation[0:4] == "blx.")) def isBlockEnding(self): if self.isLocalJump(): return True if self.disasm.operation and ((self.disasm.operation == "b") or (self.disasm.operation == "bx")): return True if self.disasm.operation and (self.disasm.operation[0:3] == "ldm") and ("pc" in self.disasm.operands[1:]): return True if self.disasm.operation and (self.disasm.operation == "ldr") and (self.disasm.operands[0] == "pc"): return True if self.disasm.operation and (self.disasm.operation == "pop") and ("pc" in self.disasm.operands): return True return False def isValid(self): return self.disasm.operation != None def length(self): return self.disasm.length def format_text(self, block, options): old_lines = [] old_tokens = [] self.text.lines = [] self.text.tokens = [] line = [] tokens = [] x = 0 instr = self.disasm if "address" in options: string = "%.8x " % self.addr line += [[string, QColor(0, 0, 128)]] x += len(string) if instr.operation == None: line += [["??", Qt.black]] self.text.lines += [line] self.text.tokens += [tokens] return (old_lines != self.text.lines) or (old_tokens != self.text.tokens) result = "" operation = instr.operation.replace(".", "") if len(operation) < 7: operation += " " * (7 - len(operation)) result += operation + " " for j in range(0, len(instr.operands)): if j != 0: result += ", " if type(instr.operands[j]) == list: if instr.operands[j][0][0] == "-": tokens += [[x + len(result) + 1, len(instr.operands[j][0]) - 1, "reg", instr.operands[j][0][1:]]] elif instr.operands[j][0][-1] == "!": tokens += [[x + len(result), len(instr.operands[j][0]) - 1, "reg", instr.operands[j][0][:-1]]] else: tokens += [[x + len(result), len(instr.operands[j][0]), "reg", instr.operands[j][0]]] result += instr.operands[j][0] + " " if len(instr.operands[j]) == 2: result += instr.operands[j][1] elif type(instr.operands[j][2]) == str: result += instr.operands[j][1] + " " if instr.operands[j][2][0] == "-": tokens += [[x + len(result) + 1, len(instr.operands[j][2]) - 1, "reg", instr.operands[j][2][1:]]] elif instr.operands[j][0][-1] == "!": tokens += [[x + len(result), len(instr.operands[j][2]) - 1, "reg", instr.operands[j][2][:-1]]] else: tokens += [[x + len(result), len(instr.operands[j][2]), "reg", instr.operands[j][2]]] result += instr.operands[j][2] else: result += " %s %d" % (instr.operands[j][1], instr.operands[j][2]) elif instr.operands[j].__class__ == Arm.MemoryOperand: result += "[" for k in range(0, len(instr.operands[j].components)): if k != 0: result += ", " if type(instr.operands[j].components[k]) == str: if instr.operands[j].components[k][0] == "-": tokens += [[x + len(result) + 1, len(instr.operands[j].components[k]) - 1, "reg", instr.operands[j].components[k][1:]]] elif instr.operands[j].components[k][-1] == "!": tokens += [[x + len(result), len(instr.operands[j].components[k]) - 1, "reg", instr.operands[j].components[k][:-1]]] else: tokens += [[x + len(result), len(instr.operands[j].components[k]), "reg", instr.operands[j].components[k]]] result += instr.operands[j].components[k] elif type(instr.operands[j].components[k]) == list: if instr.operands[j].components[k][0][0] == "-": tokens += [[x + len(result) + 1, len(instr.operands[j].components[k][0]) - 1, "reg", instr.operands[j].components[k][0][1:]]] elif instr.operands[j].components[k][0][-1] == "!": tokens += [[x + len(result), len(instr.operands[j].components[k][0]) - 1, "reg", instr.operands[j].components[k][0][:-1]]] else: tokens += [[x + len(result), len(instr.operands[j].components[k][0]), "reg", instr.operands[j].components[k][0]]] result += instr.operands[j].components[k][0] + " " if len(instr.operands[j].components[k]) == 2: result += instr.operands[j].components[k][1] elif type(instr.operands[j].components[k][2]) == str: result += instr.operands[j].components[k][1] + " " if instr.operands[j].components[k][2][0] == "-": tokens += [[x + len(result) + 1, len(instr.operands[j].components[k][2]) - 1, "reg", instr.operands[j].components[k][2][1:]]] elif instr.operands[j][2][-1] == "!": tokens += [[x + len(result), len(instr.operands[j].components[k][2]) - 1, "reg", instr.operands[j].components[k][2][:-1]]] else: tokens += [[x + len(result), len(instr.operands[j].components[k][2]), "reg", instr.operands[j].components[k][2]]] result += instr.operands[j].components[k][2] else: result += "%s %d" % (instr.operands[j].components[k][1], instr.operands[j].components[k][2]) else: value = instr.operands[j].components[k] if instr.operands[j].components[k] < 0: string = "-0x%x" % -instr.operands[j].components[k] else: string = "0x%x" % instr.operands[j].components[k] if block.analysis.functions.has_key(value): # Pointer to existing function func = block.analysis.functions[value] string = func.name if func.plt: color = QColor(192, 0, 192) else: color = QColor(0, 0, 192) if len(result) > 0: line += [[result, Qt.black]] x += len(result) result = "" line += [[string, color]] tokens += [[x, len(string), "ptr", value, string]] x += len(string) elif (value >= block.exe.start()) and (value < block.exe.end()) and (not self.isLocalJump()): # Pointer within module if len(result) > 0: line += [[result, Qt.black]] x += len(result) result = "" if value in block.exe.symbols_by_addr: string = block.exe.symbols_by_addr[value] line += [[string, QColor(0, 0, 192)]] tokens += [[x, len(string), "ptr", value, string]] x += len(string) else: result += string result += "]" if instr.operands[j].writeback: result += "!" elif type(instr.operands[j]) != str: value = instr.operands[j] if instr.operands[j] < 0: string = "-0x%x" % -instr.operands[j] else: string = "0x%x" % instr.operands[j] if block.analysis.functions.has_key(value): # Pointer to existing function func = block.analysis.functions[value] string = func.name if func.plt: color = QColor(192, 0, 192) else: color = QColor(0, 0, 192) if len(result) > 0: line += [[result, Qt.black]] x += len(result) result = "" line += [[string, color]] tokens += [[x, len(string), "ptr", value, string]] x += len(string) elif (value >= block.exe.start()) and (value < block.exe.end()) and (not self.isLocalJump()): # Pointer within module if len(result) > 0: line += [[result, Qt.black]] x += len(result) result = "" if value in block.exe.symbols_by_addr: string = block.exe.symbols_by_addr[value] line += [[string, QColor(0, 0, 192)]] tokens += [[x, len(string), "ptr", value, string]] x += len(string) else: result += string else: if instr.operands[j][0] == "-": tokens += [[x + len(result) + 1, len(instr.operands[j]) - 1, "reg", instr.operands[j][1:]]] elif instr.operands[j][-1] == "!": tokens += [[x + len(result), len(instr.operands[j]) - 1, "reg", instr.operands[j][:-1]]] else: tokens += [[x + len(result), len(instr.operands[j]), "reg", instr.operands[j]]] result += instr.operands[j] if (instr.operation == "ldr") and (instr.operands[1].__class__ == Arm.MemoryOperand) and (len(instr.operands[1].components) == 1) and (type(instr.operands[1].components[0]) != str) and (type(instr.operands[1].components[0]) != list): addr = instr.operands[1].components[0] if (addr >= block.exe.start()) and ((addr + 4) <= block.exe.end()): result += " ; =" value = block.exe.read_uint32(addr) string = "0x%x" % value if block.analysis.functions.has_key(value): # Pointer to existing function func = block.analysis.functions[value] string = func.name if func.plt: color = QColor(192, 0, 192) else: color = QColor(0, 0, 192) if len(result) > 0: line += [[result, Qt.black]] x += len(result) result = "" line += [[string, color]] tokens += [[x, len(string), "ptr", value, string]] x += len(string) elif (value >= block.exe.start()) and (value < block.exe.end()) and (not self.isLocalJump()): # Pointer within module if len(result) > 0: line += [[result, Qt.black]] x += len(result) result = "" if value in block.exe.symbols_by_addr: string = block.exe.symbols_by_addr[value] line += [[string, QColor(0, 0, 192)]] tokens += [[x, len(string), "ptr", value, string]] x += len(string) else: result += string if len(result) > 0: line += [[result, Qt.black]] self.text.lines += [line] self.text.tokens += [tokens] return (old_lines != self.text.lines) or (old_tokens != self.text.tokens) def patch_to_nop(self, exe): if self.addr & 1: if self.disasm.length == 4: exe.write(self.addr & (~1), "\x00\x46\x00\x46") else: exe.write(self.addr & (~1), "\x00\x46") else: exe.write(self.addr, "\x00\x00\xa0\xe1") def is_patch_branch_allowed(self): return self.isConditionalBranch() def patch_to_always_branch(self, exe): if self.addr & 1: opcode = exe.read_uint16(self.addr & (~1)) imm8 = opcode & 0xff if imm8 & 0x80: imm11 = imm8 | 0x700 else: imm11 = imm8 opcode = 0xe000 | imm11 exe.write_uint16(self.addr & (~1), opcode) else: exe.write_uint32(self.addr, (exe.read_uint32(self.addr) & 0x0fffffff) | 0xe0000000) def patch_to_invert_branch(self, exe): if self.addr & 1: exe.write_uint16(self.addr & (~1), self.read_uint16(self.addr & (~1)) ^ (1 << 8)) else: exe.write_uint32(self.addr, exe.read_uint32(self.addr) ^ (1 << 28)) def is_patch_to_zero_return_allowed(self): return self.isCall() def patch_to_zero_return(self, exe): if self.addr & 1: if self.disasm.length == 4: exe.write(self.addr & (~1), "\x00\x20\x00\x46") else: exe.write(self.addr & (~1), "\x00\x20") else: cc = exe.read_uint32(self.addr) & 0xf0000000 if cc == 0xf0000000: cc = 0xe0000000 exe.write_uint32(self.addr, cc | 0x03a00000) def is_patch_to_fixed_return_value_allowed(self): return self.isCall() def patch_to_fixed_return_value(self, exe, value): if self.addr & 1: exe.write_uint16(self.addr & (~1), 0x2000 | (value & 0xff)) if self.disasm.length == 4: exe.write((self.addr + 2) & (~1), "\x00\x46") else: cc = exe.read_uint32(self.addr) & 0xf0000000 if cc == 0xf0000000: cc = 0xe0000000 exe.write_uint32(self.addr, cc | 0x03000000 | ((value << 4) & 0xf0000) | (value & 0xfff)) class BasicBlock: def __init__(self, analysis, exe, entry): self.analysis = analysis self.exe = exe self.entry = entry self.exits = [] self.prev = [] self.true_path = None self.false_path = None self.instrs = [] self.header_text = InstructionText() def populate(self, known_instrs): addr = self.entry while True: known_instrs[addr] = self if self.exe.architecture() == "x86": opcode = self.exe.read(addr, 15) result = X86.disassemble32(opcode, addr) opcode = opcode[0:result.length] instr = X86Instruction(opcode, addr, result, 4) arch = X86 elif self.exe.architecture() == "x86_64": opcode = self.exe.read(addr, 15) result = X86.disassemble64(opcode, addr) opcode = opcode[0:result.length] instr = X86Instruction(opcode, addr, result, 8) arch = X86 elif self.exe.architecture() == "ppc": opcode = self.exe.read(addr, 4) if len(opcode) == 4: result = PPC.disassemble(struct.unpack(">I", opcode)[0], addr) instr = PPCInstruction(opcode, addr, result) else: instr = PPCInstruction("", addr, PPC.Instruction()) arch = PPC elif self.exe.architecture() == "arm": opcode = self.exe.read(addr & (~1), 4) if len(opcode) == 4: result = Arm.disassemble(struct.unpack(" 0: block = queue.pop() # Find instructions for this block block.populate(known_instrs) self.blocks[block.entry] = block # Follow block exits for edge in block.exits: already_found = edge in self.blocks for i in queue: if i.entry == edge: already_found = True if not already_found: if edge in known_instrs: # Address is within another basic block, split the block so that # the new edge can point to a basic block as well block = known_instrs[edge] for i in range(0, len(block.instrs)): if block.instrs[i].addr == edge: break new_block = BasicBlock(self.analysis, self.exe, edge) new_block.exits = block.exits new_block.true_path = block.true_path new_block.false_path = block.false_path new_block.instrs = block.instrs[i:] for instr in new_block.instrs: known_instrs[instr.addr] = new_block self.blocks[edge] = new_block block.exits = [edge] block.true_path = None block.false_path = None block.instrs = block.instrs[0:i] else: # New basic block block = BasicBlock(self.analysis, self.exe, edge) self.blocks[edge] = block known_instrs[edge] = block queue += [block] # Set previous block list for each block for block in self.blocks.values(): block.prev = [] for block in self.blocks.values(): for exit in block.exits: self.blocks[exit].prev.append(block) if (len(self.blocks) == 1) and (len(block.instrs) == 1) and (block.instrs[0].plt != None): # Function is a trampoline to a PLT entry self.rename(block.instrs[0].plt) self.plt = True self.exe.create_symbol(self.entry, self.name) def findCalls(self): calls = [] for block in self.blocks.values(): for instr in block.instrs: if instr.isCall() and (instr.target != None): if (instr.target >= self.exe.start()) and (instr.target < self.exe.end()): calls += [instr.target] return calls def update(self): changed = False for block in self.blocks.values(): if block.update(): changed = True if changed: self.update_id = self.analysis.get_next_update_id() def rename(self, name): self.name = name self.blocks[self.entry].header_text.lines[0] = [[name + ":", QColor(192, 0, 0)]] self.blocks[self.entry].header_text.tokens[0] = [[0, len(self.name), "ptr", self.entry, self.name]] class Analysis: def __init__(self, exe): self.exe = exe self.start = None self.functions = {} self.run = True self.lock = threading.Lock() self.queue = [] self.status = "" self.update_id = 0 self.update_request = False self.options = set() self.exe.add_callback(self) def get_next_update_id(self): self.update_id += 1 return self.update_id def analyze(self): self.lock.acquire() if hasattr(self.exe, "entry"): self.status = "Disassembling function at 0x%.8x..." % self.exe.entry() self.start = Function(self, self.exe, self.exe.entry(), '_start') self.functions[self.exe.entry()] = self.start self.start.findBasicBlocks() self.queue += self.start.findCalls() self.start.ready = True self.lock.release() while self.run: while (len(self.queue) > 0) and self.run: self.lock.acquire() entry = self.queue.pop() self.status = "Disassembling function at 0x%.8x..." % entry if entry not in self.functions: if entry in self.exe.symbols_by_addr: func = Function(self, self.exe, entry, self.exe.symbols_by_addr[entry]) else: func = Function(self, self.exe, entry) self.functions[entry] = func else: func = self.functions[entry] func.findBasicBlocks() calls = func.findCalls() for call in calls: already_found = self.functions.has_key(call) for i in self.queue: if i == call: already_found = True if not already_found: self.queue += [call] func.ready = True self.lock.release() # Give GUI thread a chance to do stuff time.sleep(0.001) # Update disassembly so that function names are correct self.update_request = False for func in self.functions.values(): if not self.run: break self.lock.acquire() self.status = "Updating function at 0x%.8x..." % func.entry func.update() self.lock.release() time.sleep(0.001) # Wait for any additional function requests to come in self.status = "" while (len(self.queue) == 0) and (not self.update_request) and self.run: time.sleep(0.1) def stop(self): run = False def find_instr(self, addr, exact_match = False): self.lock.acquire() for func in self.functions.values(): for block in func.blocks.values(): for instr in block.instrs: if (exact_match and (addr == instr.addr)) or ((not exact_match) and (addr >= instr.addr) and (addr < (instr.addr + len(instr.opcode)))): result = [func.entry, instr.addr] self.lock.release() return result self.lock.release() return [None, None] def notify_data_write(self, data, ofs, contents): self.lock.acquire() # Update any functions containing the updated bytes start = ofs end = ofs + len(contents) for func in self.functions.values(): added = False for block in func.blocks.values(): for instr in block.instrs: if (end > instr.addr) and (start < (instr.addr + len(instr.opcode))): if func.entry not in self.queue: self.queue += [func.entry] added = True break if added: break self.lock.release() def create_symbol(self, addr, name): self.lock.acquire() self.exe.create_symbol(addr, name) if addr in self.functions: self.functions[addr].rename(name) self.update_request = True self.lock.release() def undefine_symbol(self, addr, name): self.lock.acquire() self.exe.delete_symbol(addr, name) if addr in self.functions: self.functions[addr].rename("sub_%.8x" % addr) self.update_request = True self.lock.release() def set_address_view(self, addr): self.lock.acquire() if addr: self.options.add("address") else: self.options.remove("address") self.update_request = True self.lock.release() def isPreferredForFile(data): for type in ExeFormats: exe = type(data) if exe.valid: return True return False isPreferredForFile = staticmethod(isPreferredForFile)