-
Notifications
You must be signed in to change notification settings - Fork 283
Expand file tree
/
Copy pathwf_unflatten.py
More file actions
277 lines (227 loc) · 12.2 KB
/
wf_unflatten.py
File metadata and controls
277 lines (227 loc) · 12.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
import json
from binaryninja import Workflow, Activity, AnalysisContext, MediumLevelILBasicBlock, \
MediumLevelILInstruction, ReportCollection, FlowGraphReport, show_report_collection, \
FlowGraph, DisassemblySettings
from binaryninja.mediumlevelil import *
from binaryninja.enums import *
"""
This workflow reverses the control-flow flattening algorithm of Limoncello[1],
at least some of the time. It does this in a relatively simple manner:
1. Find "dispatcher blocks" which contain a MLIL_JUMP_TO whose targets all have the
dispatcher as a post-dominator (ie all targets must flow back to the dispatcher)
2. Walk backwards from all unconditional branches into the dispatcher until finding blocks
with conditional branches and build a list of all unconditional continuation blocks
for each block that flows into the dispatcher.
3. Copy those continuation blocks and the dispatcher into each block that calls them.
This leaves you with a copy of the dispatcher block (and intermediate bookkeeping) in
every path that would have normally flowed into the dispatcher.
4. Since each path now has its own copy of the dispatcher, use MLIL dataflow to solve for
which branch of the dispatcher is taken at each copy. When a dispatcher with a solved
target is encountered, rewrite it as an MLIL_GOTO with the target directly.
[1] https://github.com/jonpalmisc/limoncello
"""
def is_dispatcher(block: MediumLevelILBasicBlock) -> bool:
"""
Determine if a block looks like a CFF dispatcher, i.e. all outgoing edges
post-dominate it (or return)
:param block: The block to check
:return: True if it's a dispatcher
"""
for ins in block:
ins: MediumLevelILInstruction
if ins.operation == MediumLevelILOperation.MLIL_JUMP_TO:
for edge in block.outgoing_edges:
out_block = edge.target
# This is too trivial for fancier cases with multiple blocks that all lead
# to a return, but whatever this is just a sample plugin
if len(out_block.outgoing_edges) == 0:
continue
if block not in out_block.post_dominators:
return False
return True
return False
def graph_pls(fn: MediumLevelILFunction) -> FlowGraph:
# For ReportCollection, create graph with the settings I want
settings = DisassemblySettings()
settings.set_option(DisassemblyOption.ShowAddress, True)
return fn.create_graph_immediate(settings=settings)
def rewrite_action(context: AnalysisContext, do_it: bool):
# Main workflow action
# =====================================================================
# Custom debug report
report = None
if context.function.check_for_debug_report("unflatten"):
report = ReportCollection()
try:
if report is not None:
init_graph = graph_pls(context.mlil)
report.append(FlowGraphReport("Initial", init_graph, context.view))
# =====================================================================
# Finding flattened control flow and resolving continuations
# Look for dispatcher block
dispatcher = None
for block in context.mlil.basic_blocks:
if is_dispatcher(block):
dispatcher = block
break
if dispatcher is None:
return
if report is not None:
init_graph = graph_pls(context.mlil)
for node_index, node in enumerate(init_graph.nodes):
if node.basic_block.start == dispatcher.start:
node.highlight = HighlightStandardColor.RedHighlightColor
init_graph.replace(node_index, node)
report.append(FlowGraphReport("Finding Dispatcher", init_graph, context.view))
# Find blocks that flow directly into the dispatcher
queue = []
to_copy = {}
for incoming in dispatcher.incoming_edges:
if dispatcher in incoming.source.dominators:
to_copy[incoming.source] = [dispatcher]
queue.append(incoming.source)
# For each of these, walk back along unconditional branch edges to find their
# list of continuation blocks which will be copied at their end
while len(queue) > 0:
# This is a sort of backwards BFS
top = queue.pop(0)
any_conditional = False
for incoming in top.incoming_edges:
if incoming.type != BranchType.UnconditionalBranch:
any_conditional = True
# Having any conditional branches ends the unconditional chain
if not any_conditional:
for incoming in top.incoming_edges:
if incoming.type == BranchType.UnconditionalBranch:
if incoming.source not in to_copy:
queue.append(incoming.source)
to_copy[incoming.source] = [top] + to_copy[top]
to_copy[top] = []
# Ignore empty continuation paths
for bb in list(to_copy.keys()):
if len(to_copy[bb]) == 0:
to_copy.pop(bb)
if report is not None:
for block, path in to_copy.items():
init_graph = graph_pls(context.mlil)
for node_index, node in enumerate(init_graph.nodes):
if node.basic_block.start in [bb.start for bb in path]:
node.highlight = HighlightStandardColor.RedHighlightColor
if node.basic_block.start == block.start:
node.highlight = HighlightStandardColor.GreenHighlightColor
init_graph.replace(node_index, node)
report.append(FlowGraphReport(" Blocks flowing into", init_graph, context.view))
# =====================================================================
# Modify the IL to copy the continuations into all the blocks calling the dispatcher
# Make a new IL function and append the modified instructions to it
old_mlil = context.mlil
new_mlil = MediumLevelILFunction(old_mlil.arch, low_level_il=context.llil)
new_mlil.prepare_to_copy_function(old_mlil)
block_map_starts = {}
# Copy all instructions in all blocks of the old version of the function
for block in old_mlil.basic_blocks:
new_mlil.prepare_to_copy_block(block)
block_map_starts[block] = len(new_mlil)
new_mlil.set_current_address(old_mlil[InstructionIndex(block.start)].address, block.arch)
for instr_index in range(block.start, block.end):
old_instr: MediumLevelILInstruction = old_mlil[InstructionIndex(instr_index)]
# Copy continuation blocks to end of block calling dispatcher
if block in to_copy:
path = to_copy[block]
if instr_index == block.end - 1:
# For every block in the continuation, copy it at the end of this block
for copy_block in path:
new_mlil.prepare_to_copy_block(copy_block)
# Skip the final instruction in the continuations because it is a MLIL_GOTO
end = copy_block.end - 1
if copy_block == dispatcher:
end = copy_block.end
for copy_block_instr_index in range(copy_block.start, end):
# Copy instructions as-is
copy_block_instr: MediumLevelILInstruction = old_mlil[InstructionIndex(copy_block_instr_index)]
new_mlil.set_current_address(copy_block_instr.address, copy_block.arch)
new_mlil.append(copy_block_instr.copy_to(new_mlil), ILSourceLocation.from_instruction(copy_block_instr))
continue
# Otherwise, copy the instruction as-is
new_mlil.set_current_address(old_instr.address, block.arch)
new_mlil.append(old_instr.copy_to(new_mlil), ILSourceLocation.from_instruction(old_instr))
# Generate blocks and SSA (for dataflow) for the next part
new_mlil.finalize()
new_mlil.generate_ssa_form()
# Since we're constructing a new function twice, we need to commit the mappings
# of the intermediate function before copying again so that mappings will resolve
# all the way to the end (gross)
# TODO: Construct from another function without needing this
context.mlil = new_mlil
if report is not None:
newer_graph = graph_pls(new_mlil)
report.append(FlowGraphReport("Swapped dispatch with jump_to", newer_graph, context.view))
# =====================================================================
# Now convert all MLIL_JUMP_TO with known dest to a jump
# Maybe this should be a separate workflow action (so it can be composed)
old_mlil = new_mlil
new_mlil = MediumLevelILFunction(old_mlil.arch, low_level_il=context.llil)
new_mlil.prepare_to_copy_function(old_mlil)
for old_block in old_mlil.basic_blocks:
new_mlil.prepare_to_copy_block(old_block)
for instr_index in range(old_block.start, old_block.end):
old_instr: MediumLevelILInstruction = old_mlil[InstructionIndex(instr_index)]
new_mlil.set_current_address(old_instr.address, old_block.arch)
# If we find a MLIL_JUMP_TO with a known constant dest, then rewrite it
# to a MLIL_GOTO with the known dest filled in
if old_instr.operation == MediumLevelILOperation.MLIL_JUMP_TO:
if old_instr.dest.value.type == RegisterValueType.ConstantPointerValue:
dest_value = old_instr.dest.value.value
if dest_value in old_instr.targets:
old_target_index = old_instr.targets[dest_value]
new_mlil.append(new_mlil.goto(new_mlil.get_label_for_source_instruction(old_target_index), ILSourceLocation.from_instruction(old_instr)), ILSourceLocation.from_instruction(old_instr))
continue
# Otherwise, copy the instruction as-is
new_mlil.append(old_instr.copy_to(new_mlil), ILSourceLocation.from_instruction(old_instr))
new_mlil.finalize()
new_mlil.generate_ssa_form()
if report is not None:
newer_graph = graph_pls(new_mlil)
report.append(FlowGraphReport("Resolved constant jump_to's", newer_graph, context.view))
# =====================================================================
# And we're done
if do_it:
context.mlil = new_mlil
finally:
# Show debug report if requested, even on exception thrown
if report is not None:
show_report_collection("Unflatten Debug Report", report)
# Create and register the workflow for translating these instructions
wf = Workflow("core.function.metaAnalysis").clone("core.function.metaAnalysis")
# Define the custom activity configuration
wf.register_activity(Activity(
configuration=json.dumps({
"name": "extension.unflatten_limoncello.unflatten.dry_run",
"title": "Unflatten (Limoncello) Dry Run",
"description": "Detect and reverse Limoncello's Control Flow Flattening scheme.",
"eligibility": {
"auto": {
"default": False
}
}
}),
action=lambda context: rewrite_action(context, False)
))
wf.register_activity(Activity(
configuration=json.dumps({
"name": "extension.unflatten_limoncello.unflatten",
"title": "Unflatten (Limoncello)",
"description": "Detect and reverse Limoncello's Control Flow Flattening scheme.",
"eligibility": {
"auto": {
"default": False
}
}
}),
action=lambda context: rewrite_action(context, True)
))
wf.insert_after("core.function.generateMediumLevelIL",[
"extension.unflatten_limoncello.unflatten.dry_run",
"extension.unflatten_limoncello.unflatten"
])
wf.register()