# Copyright (c) 2013-2015 Rusty Wagner # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . from BinaryData import * from Structure import * from HexEditor import * from View import * class PEFile(BinaryAccessor): class SectionInfo: def __init__(self): self.virtual_size = None self.virtual_address = None self.size_of_raw_data = None self.pointer_to_raw_data = None self.characteristics = None def __init__(self, data): self.data = data self.valid = False self.callbacks = [] self.symbols_by_name = {} self.symbols_by_addr = {} if not self.is_pe(): return try: self.tree = Structure(self.data) self.mz = self.tree.struct("MZ header", "mz") self.mz.uint16("magic") self.mz.uint16("lastsize") self.mz.uint16("nblocks") self.mz.uint16("nreloc") self.mz.uint16("hdrsize") self.mz.uint16("minalloc") self.mz.uint16("maxalloc") self.mz.uint16("ss") self.mz.uint16("sp") self.mz.uint16("checksum") self.mz.uint16("ip") self.mz.uint16("cs") self.mz.uint16("relocpos") self.mz.uint16("noverlay") self.mz.bytes(8, "reserved1") self.mz.uint16("oem_id") self.mz.uint16("oem_info") self.mz.bytes(20, "reserved2") self.mz.uint32("pe_offset") self.header = self.tree.struct("PE header", "header") self.header.seek(self.mz.pe_offset) self.header.uint32("magic") self.header.uint16("machine") self.header.uint16("section_count") self.header.uint32("timestamp") self.header.uint32("coff_symbol_table") self.header.uint32("coff_symbol_count") self.header.uint16("optional_header_size") self.header.uint16("characteristics") self.header.struct("Optional header", "opt") self.header.opt.uint16("magic") self.header.opt.uint8("major_linker_version") self.header.opt.uint8("minor_linker_version") self.header.opt.uint32("size_of_code") self.header.opt.uint32("size_of_init_data") self.header.opt.uint32("size_of_uninit_data") self.header.opt.uint32("address_of_entry") self.header.opt.uint32("base_of_code") if self.header.opt.magic == 0x10b: # 32-bit self.bits = 32 self.header.opt.uint32("base_of_data") self.header.opt.uint32("image_base") self.header.opt.uint32("section_align") self.header.opt.uint32("file_align") self.header.opt.uint16("major_os_version") self.header.opt.uint16("minor_os_version") self.header.opt.uint16("major_image_version") self.header.opt.uint16("minor_image_version") self.header.opt.uint16("major_subsystem_version") self.header.opt.uint16("minor_subsystem_version") self.header.opt.uint32("win32_version") self.header.opt.uint32("size_of_image") self.header.opt.uint32("size_of_headers") self.header.opt.uint32("checksum") self.header.opt.uint16("subsystem") self.header.opt.uint16("dll_characteristics") self.header.opt.uint32("size_of_stack_reserve") self.header.opt.uint32("size_of_stack_commit") self.header.opt.uint32("size_of_heap_reserve") self.header.opt.uint32("size_of_heap_commit") self.header.opt.uint32("loader_flags") self.header.opt.uint32("data_dir_count") elif self.header.opt.magic == 0x20b: # 64-bit self.bits = 64 self.header.opt.uint64("image_base") self.header.opt.uint32("section_align") self.header.opt.uint32("file_align") self.header.opt.uint16("major_os_version") self.header.opt.uint16("minor_os_version") self.header.opt.uint16("major_image_version") self.header.opt.uint16("minor_image_version") self.header.opt.uint16("major_subsystem_version") self.header.opt.uint16("minor_subsystem_version") self.header.opt.uint32("win32_version") self.header.opt.uint32("size_of_image") self.header.opt.uint32("size_of_headers") self.header.opt.uint32("checksum") self.header.opt.uint16("subsystem") self.header.opt.uint16("dll_characteristics") self.header.opt.uint64("size_of_stack_reserve") self.header.opt.uint64("size_of_stack_commit") self.header.opt.uint64("size_of_heap_reserve") self.header.opt.uint64("size_of_heap_commit") self.header.opt.uint32("loader_flags") self.header.opt.uint32("data_dir_count") else: self.valid = False return self.image_base = self.header.opt.image_base self.data_dirs = self.header.array(self.header.opt.data_dir_count, "data_dirs") for i in xrange(0, self.header.opt.data_dir_count): self.data_dirs[i].uint32("virtual_address") self.data_dirs[i].uint32("size") self.sections = [] header_section_obj = PEFile.SectionInfo() header_section_obj.virtual_size = self.header.opt.size_of_headers header_section_obj.virtual_address = 0 header_section_obj.size_of_raw_data = self.header.opt.size_of_headers header_section_obj.pointer_to_raw_data = 0 header_section_obj.characteristics = 0 self.sections.append(header_section_obj) self.tree.array(self.header.section_count, "sections") for i in xrange(0, self.header.section_count): section = self.tree.sections[i] section.seek(self.mz.pe_offset + self.header.optional_header_size + 24 + (i * 40)) section.bytes(8, "name") section.uint32("virtual_size") section.uint32("virtual_address") section.uint32("size_of_raw_data") section.uint32("pointer_to_raw_data") section.uint32("pointer_to_relocs") section.uint32("pointer_to_line_numbers") section.uint16("reloc_count") section.uint16("line_number_count") section.uint32("characteristics") section_obj = PEFile.SectionInfo() section_obj.virtual_size = section.virtual_size section_obj.virtual_address = section.virtual_address & ~(self.header.opt.section_align - 1) section_obj.size_of_raw_data = section.size_of_raw_data section_obj.pointer_to_raw_data = section.pointer_to_raw_data & ~(self.header.opt.file_align - 1) section_obj.characteristics = section.characteristics self.sections.append(section_obj) self.symbols_by_name["_start"] = self.entry() self.symbols_by_addr[self.entry()] = "_start" if self.header.opt.data_dir_count >= 2: self.imports = self.tree.array(0, "imports") for i in xrange(0, self.data_dirs[1].size / 20): if self.read(self.image_base + self.data_dirs[1].virtual_address + (i * 20), 4) == "\0\0\0\0": break if self.read(self.image_base + self.data_dirs[1].virtual_address + (i * 20) + 16, 4) == "\0\0\0\0": break self.imports.append() dll = self.imports[i] dll.seek(self.virtual_address_to_file_offset(self.image_base + self.data_dirs[1].virtual_address) + (i * 20)) dll.uint32("lookup") dll.uint32("timestamp") dll.uint32("forward_chain") dll.uint32("name") dll.uint32("iat") for dll in self.imports: name = self.read_string(self.image_base + dll.name).split('.') if len(name) > 1: name = '.'.join(name[0:-1]) else: name = name[0] entry_ofs = self.image_base + dll.lookup iat_ofs = self.image_base + dll.iat while True: if self.bits == 32: entry = self.read_uint32(entry_ofs) is_ordinal = (entry & 0x80000000) != 0 entry &= 0x7fffffff else: entry = self.read_uint64(entry_ofs) is_ordinal = (entry & 0x8000000000000000) != 0 entry &= 0x7fffffffffffffff if (not is_ordinal) and (entry == 0): break if is_ordinal: func = name + "!Ordinal%d" % (entry & 0xffff) else: func = name + "!" + self.read_string(self.image_base + entry + 2) self.symbols_by_name[func] = iat_ofs self.symbols_by_addr[iat_ofs] = func entry_ofs += self.bits / 8 iat_ofs += self.bits / 8 if (self.header.opt.data_dir_count >= 1) and (self.data_dirs[0].size >= 40): self.exports = self.tree.struct("Export directory", "exports") self.exports.seek(self.virtual_address_to_file_offset(self.image_base + self.data_dirs[0].virtual_address)) self.exports.uint32("characteristics") self.exports.uint32("timestamp") self.exports.uint16("major_version") self.exports.uint16("minor_version") self.exports.uint32("dll_name") self.exports.uint32("base") self.exports.uint32("function_count") self.exports.uint32("name_count") self.exports.uint32("address_of_functions") self.exports.uint32("address_of_names") self.exports.uint32("address_of_name_ordinals") self.exports.array(self.exports.function_count, "functions") for i in xrange(0, self.exports.function_count): self.exports.functions[i].seek(self.virtual_address_to_file_offset(self.image_base + self.exports.address_of_functions) + (i * 4)) self.exports.functions[i].uint32("address") self.exports.array(self.exports.name_count, "names") for i in xrange(0, self.exports.name_count): self.exports.names[i].seek(self.virtual_address_to_file_offset(self.image_base + self.exports.address_of_names) + (i * 4)) self.exports.names[i].uint32("address_of_name") self.exports.array(self.exports.name_count, "name_ordinals") for i in xrange(0, self.exports.name_count): self.exports.name_ordinals[i].seek(self.virtual_address_to_file_offset(self.image_base + self.exports.address_of_name_ordinals) + (i * 2)) self.exports.name_ordinals[i].uint16("ordinal") for i in xrange(0, self.exports.name_count): function_index = self.exports.name_ordinals[i].ordinal - self.exports.base address = self.image_base + self.exports.functions[function_index].address name = self.read_string(self.image_base + self.exports.names[i].address_of_name) self.symbols_by_addr[address] = name self.symbols_by_name[name] = address self.tree.complete() self.valid = True except: self.valid = False if self.valid: self.data.add_callback(self) def read_string(self, addr): result = "" while True: ch = self.read(addr, 1) addr += 1 if (len(ch) == 0) or (ch == '\0'): break result += ch return result def virtual_address_to_file_offset(self, addr): for i in self.sections: if ((addr >= (self.image_base + i.virtual_address)) and (addr < (self.image_base + i.virtual_address + i.virtual_size))) and (i.virtual_size != 0): cur = i if cur == None: return None ofs = addr - (self.image_base + cur.virtual_address) return cur.pointer_to_raw_data + ofs def read(self, ofs, len): result = "" while len > 0: cur = None for i in self.sections: if ((ofs >= (self.image_base + i.virtual_address)) and (ofs < (self.image_base + i.virtual_address + i.virtual_size))) and (i.virtual_size != 0): cur = i if cur == None: break prog_ofs = ofs - (self.image_base + cur.virtual_address) mem_len = cur.virtual_size - prog_ofs file_len = cur.size_of_raw_data - prog_ofs if mem_len > len: mem_len = len if file_len > len: file_len = len if file_len <= 0: result += "\x00" * mem_len len -= mem_len ofs += mem_len continue result += self.data.read(cur.pointer_to_raw_data + prog_ofs, file_len) len -= file_len ofs += file_len return result def next_valid_addr(self, ofs): result = -1 for i in self.sections: if ((self.image_base + i.virtual_address) >= ofs) and (i.virtual_size != 0) and ((result == -1) or ((self.image_base + i.virtual_address) < result)): result = self.image_base + i.virtual_address return result def get_modification(self, ofs, len): result = [] while len > 0: cur = None for i in self.sections: if ((ofs >= (self.image_base + i.virtual_address)) and (ofs < (self.image_base + i.virtual_address + i.virtual_size))) and (i.virtual_size != 0): cur = i if cur == None: break prog_ofs = ofs - (self.image_base + cur.virtual_address) mem_len = cur.virtual_size - prog_ofs file_len = cur.size_of_raw_data - prog_ofs if mem_len > len: mem_len = len if file_len > len: file_len = len if file_len <= 0: result += [DATA_ORIGINAL] * mem_len len -= mem_len ofs += mem_len continue result += self.data.get_modification(cur.pointer_to_raw_data + prog_ofs, file_len) len -= file_len ofs += file_len return result def write(self, ofs, data): result = 0 while len(data) > 0: cur = None for i in self.sections: if ((ofs >= (self.image_base + i.virtual_address)) and (ofs < (self.image_base + i.virtual_address + i.virtual_size))) and (i.virtual_size != 0): cur = i if cur == None: break prog_ofs = ofs - (self.image_base + cur.virtual_address) mem_len = cur.virtual_size - prog_ofs file_len = cur.size_of_raw_data - prog_ofs if mem_len > len: mem_len = len if file_len > len: file_len = len if file_len <= 0: break result += self.data.write(cur.pointer_to_raw_data + prog_ofs, data[0:file_len]) data = data[file_len:] ofs += file_len return result def insert(self, ofs, data): return 0 def remove(self, ofs, size): return 0 def notify_data_write(self, data, ofs, contents): # Find sections that hold data backed by updated regions of the file for i in self.sections: if ((ofs + len(contents)) > i.pointer_to_raw_data) and (ofs < (i.pointer_to_raw_data + i.size_of_raw_data)) and (i.virtual_size != 0): # This section has been updated, compute which region has been changed from_start = ofs - i.pointer_to_raw_data data_ofs = 0 length = len(contents) if from_start < 0: length += from_start data_ofs -= from_start from_start = 0 if (from_start + length) > i.size_of_raw_data: length = i.size_of_raw_data - from_start # Notify callbacks if length > 0: for cb in self.callbacks: if hasattr(cb, "notify_data_write"): cb.notify_data_write(self, self.image_base + i.virtual_address + from_start, contents[data_ofs:(data_ofs + length)]) def save(self, filename): self.data.save(filename) def start(self): return self.image_base def entry(self): return self.image_base + self.header.opt.address_of_entry def __len__(self): max = None for i in self.sections: if ((max == None) or ((self.image_base + i.virtual_address + i.virtual_size) > max)) and (i.virtual_size != 0): max = self.image_base + i.virtual_address + i.virtual_size return max - self.start() def is_pe(self): if self.data.read(0, 2) != "MZ": return False ofs = self.data.read(0x3c, 4) if len(ofs) != 4: return False ofs = struct.unpack("