-
Notifications
You must be signed in to change notification settings - Fork 283
Expand file tree
/
Copy pathutil.py
More file actions
157 lines (123 loc) · 5.14 KB
/
util.py
File metadata and controls
157 lines (123 loc) · 5.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import ctypes
from typing import Callable, Dict, List, Optional, Union
from .. import _binaryninjacore as core
from . import changeset, merge
ProgressFuncType = Callable[[int, int], bool]
NameChangesetFuncType = Callable[['changeset.Changeset'], bool]
ConflictHandlerFuncType = Callable[[Dict[str, 'merge.MergeConflict']], bool]
ConflictHandlerType = Union['merge.ConflictHandler', ConflictHandlerFuncType]
def _last_error() -> str:
"""
Get last error from the api
:return: Last error string
"""
return "Operation failed"
# TODO: keep track of last error in thread
#return core.BNCollaborationGetLastError()
def nop(*args, **kwargs):
"""
Function that just returns True, used as default for callbacks
:return: True
"""
return True
def wrap_progress(progress_func: ProgressFuncType):
"""
Wraps a progress function in a ctypes function for passing to the FFI
:param progress_func: Python progress function
:return: Wrapped ctypes function
"""
return ctypes.CFUNCTYPE(ctypes.c_bool, ctypes.c_void_p, ctypes.c_ulonglong, ctypes.c_ulonglong)(
lambda ctxt, cur, total: progress_func(cur, total))
def wrap_name_changeset(name_changeset_func: NameChangesetFuncType):
"""
Wraps a changeset naming function in a ctypes function for passing to the FFI
:param name_changeset_func: Python changeset naming function
:return: Wrapped ctypes function
"""
return ctypes.CFUNCTYPE(ctypes.c_bool, ctypes.c_void_p, core.BNCollaborationChangesetHandle)(
lambda ctxt, cs: name_changeset_func(changeset.Changeset(handle=cs)))
def wrap_conflict_handler(handler: Union[ConflictHandlerFuncType, merge.ConflictHandler]):
"""
Wraps a conflict handler function in a ConflictHandler object so you can be lazy and just use a lambda
:param handler: Python conflict handler function
:return: Wrapped ConflictHandler object
"""
if isinstance(handler, merge.ConflictHandler):
handler_class = handler
else:
class LambdaConflictHandler(merge.ConflictHandler):
def handle(self, conflicts: Dict[str, 'merge.MergeConflict']) -> bool:
return handler(conflicts)
handler_class = LambdaConflictHandler()
return core.BNCollaborationAnalysisConflictHandler(handler_class._handle)
def split_progress(progress_func: Optional[ProgressFuncType], subpart: int,
subpart_weights: List[float]) -> ProgressFuncType:
"""
Split a single progress function into equally sized subparts.
This function takes the original progress function and returns a new function whose signature
is the same but whose output is shortened to correspond to the specified subparts.
The length of a subpart is proportional to the sum of all the weights.
E.g. If subpart = 1 and subpartWeights = { 0.25, 0.5, 0.25 }, this will return a function that calls
progress_func and maps its progress to the range [0.25, 0.75]
Internally this works by calling progress_func with total = 1000000 and doing math on the current value
:param progress_func: Original progress function (usually updates a UI)
:param subpart: Index of subpart whose function to return, from 0 to (subpartWeights.size() - 1)
:param subpart_weights: Weights of subparts, described above
:return: A function that will call progress_func() within a modified progress region
"""
if not progress_func:
return lambda cur, total: True
subpart_sum = sum(subpart_weights)
if subpart_sum < 0.00001:
return lambda cur, total: True
# Normalize weights and keep a running count of weights for the start
subpart_starts = []
start = 0
for i in range(len(subpart_weights)):
subpart_starts.append(start)
subpart_weights[i] /= subpart_sum
start += subpart_weights[i]
def inner(cur: int, total: int) -> bool:
steps = 1000000
subpart_size = steps * subpart_weights[subpart]
subpart_progress = float(cur) / float(total) * subpart_size
return progress_func(int(subpart_starts[subpart] * steps + subpart_progress), steps)
return inner
class LazyT:
"""
Lazily loaded objects (but FFI)
Pretend this class is templated, because the C++ version is
"""
def __init__(self, ctor: Optional[Callable[[], object]] = None, handle=None):
"""
Create a new LazyT that will be initialized with the result of the given function, when it is first needed.
:param ctor: Function to construct object
:param handle: FFI handle for internal use
"""
if handle is not None:
self._handle = handle
else:
self.ctor = ctor
self.value = None
self._handle = core.BNCollaborationLazyTCreate(ctypes.CFUNCTYPE(ctypes.c_void_p, ctypes.c_void_p)(
lambda ctxt: self._perform_deref()), None)
def _perform_deref(self) -> ctypes.c_void_p:
if self.value is None:
self.value = self.ctor()
result = ctypes.cast(ctypes.py_object(self.value), ctypes.c_void_p)
return result
def get(self, expected_type=object):
"""
Access the lazily loaded object. Will construct it if this is the first usage.
:param expected_type: Expected type of result, ctypes will try to cast to it
:return: Result object
"""
result = core.BNCollaborationLazyTDereference(self._handle)
if result is None:
return None
if type == object:
result = ctypes.cast(result, ctypes.py_object)
return result
else:
result = ctypes.cast(result, expected_type)
return result