-
Notifications
You must be signed in to change notification settings - Fork 70
Expand file tree
/
Copy pathbase.py
More file actions
163 lines (116 loc) · 5.47 KB
/
base.py
File metadata and controls
163 lines (116 loc) · 5.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
from threading import Thread
from time import sleep
from typing import Dict, Union
from cmd2 import Cmd, categorize
from cmd2.plugin import PostparsingData
from blessings import Terminal
from common.config import read_config
from common.models.client import Client
from shell.utils.screen import banner, to_bold_cyan
from shell.utils.general import get_prompt
class BasePlugin(Cmd):
""" BasePlugin - the base class which all of our plugins should inherit from.
It is meant to define all the necessary base functions for plugins. """
prompt = '>> '
ruler = '-'
intro = banner()
terminators = []
CATEGORY_SHELL = to_bold_cyan('Shell Based Operations')
CATEGORY_GENERAL = to_bold_cyan('General Commands')
def __init__(self):
Cmd.__init__(self,
startup_script=read_config().get('STARTUP_SCRIPT', ''),
use_ipython=True)
self.aliases.update({'exit': 'quit', 'help': 'help -v'})
self.hidden_commands.extend(['load', 'pyscript', 'set', 'shortcuts', 'alias', 'unalias', 'shell', 'macro'])
self.t = Terminal()
self.selected_client = None
self.prompt = self.get_prompt()
self.allow_cli_args = False
# Alerts Thread
self._stop_thread = False
self._seen_clients = set(Client.unique_client_ids())
self._alert_thread = Thread()
# Register the hook functions
self.register_preloop_hook(self._alert_thread_preloop_hook)
self.register_postloop_hook(self._alert_thread_postloop_hook)
# Set the window title
self.set_window_title('<< JSShell 2.0 >>')
categorize([
BasePlugin.do_help,
BasePlugin.do_quit,
BasePlugin.do_py,
BasePlugin.do_ipy,
BasePlugin.do_history,
BasePlugin.do_edit
], BasePlugin.CATEGORY_GENERAL)
self.register_postparsing_hook(self._refresh_client_data_post_parse_hook)
def _alert_thread_preloop_hook(self) -> None:
""" Start the alerter thread """
self._stop_thread = False
self._alert_thread = Thread(name='alerter', target=self._alert_function)
self._alert_thread.start()
def _alert_thread_postloop_hook(self) -> None:
""" Stops the alerter thread """
self._stop_thread = True
if self._alert_thread.is_alive():
self._alert_thread.join()
def _alert_function(self) -> None:
""" When the client list is larger than the one we know of
alert the user that a new client has registered """
while not self._stop_thread:
if self.terminal_lock.acquire(blocking=False):
current_clients = set(Client.unique_client_ids())
delta = current_clients - self._seen_clients
if len(delta) > 0:
self.async_alert(self.t.bold_blue(' << new client registered >>'), self.prompt)
self._seen_clients = current_clients
self.terminal_lock.release()
sleep(0.5)
def print_error(self, text: str, end: str ='\n', start: str='') -> None:
""" Prints a formatted error message """
self.poutput(start + self.t.bold_red('[-]') + ' ' + self.t.red(text), end=end)
def print_info(self, text: str, end: str='\n', start: str='') -> None:
""" Prints a formatted informational message """
self.poutput(start + self.t.bold_yellow('[!]') + ' ' + self.t.yellow(text), end=end)
def print_ok(self, text: str, end: str='\n', start: str='') -> None:
""" Prints a formatted success message """
self.poutput(start + self.t.bold_green('[+]') + ' ' + self.t.green(text), end=end)
def print_pairs(self, title: str, body: Dict[str, str],
just_return: bool=False, colors: bool=True) -> Union[str, None]:
""" Prints pairs of values with a certain title """
if colors:
data = [self.t.bold_white_underline(title)]
else:
data = [title]
for key, value in body.items():
k = key + ':'
if colors:
data.append(f' - {self.t.bold(k)} {value}')
else:
data.append(f' - {k} {value}')
if just_return:
return '\n'.join(data)
self.ppaged('\n'.join(data))
def select_client(self, client: Client) -> None:
""" Handles the operation of selecting a new client """
self.selected_client = client
self.update_prompt()
def _refresh_client_data_post_parse_hook(self, params: PostparsingData) -> PostparsingData:
""" Refreshes the selected client data from the database. We do that because
of `mongoengine`s behaviour, where if we set the current client, we do not track
for modifications. This way, before every command is parsed we re-select the client """
if self.selected_client:
cid = self.selected_client.cid
self.select_client(Client.objects(cid=cid).first())
return params
def get_prompt(self) -> str:
""" Handles the operations of getting the prompt string """
prompt = self.t.bold_cyan('>> ')
if self.selected_client:
client_id = self.t.bold_red(self.selected_client.cid)
prompt = self.t.cyan(f"[Client #{client_id}]") + ' ' + prompt
return prompt
def update_prompt(self) -> None:
""" Handles what is needed when updating the prompt """
self.prompt = get_prompt(self)