from os import _exit, walk
from os.path import join
from subprocess import Popen, PIPE
from utils.shell_options import ShellOptions
from utils.dynamic_load import load_module
from utils.custom_exception import exception
from utils.custom_print import print_info, print_error, print_ok
from utils.banner import banner
from utils.global_list import Global
from utils.help import show_help
from utils.find import Find
from utils.tasks import Task
from utils.color_palette import ColorSelected, colors_terminal
class CommandParser:
def __init__(self):
self.module = None
self.module_commands = ["set", "unset", "back", "show", "run", "global"]
# Our switcher
self.commands = {
"load": self._load,
"help": self._help,
"banner": self._banner,
"find": self._find,
"import": self._fill,
"export": self._save,
"modules": self._list_modules,
"theme": self._load_theme,
"tasks": self._task,
"exit": self._exit,
"quit": self._exit,
}
self.shell_options = ShellOptions.get_instance()
# 'command' is the user input
@exception("Error parsing input")
def parser(self, command):
if not command:
return
if command.startswith("#"):
self._execute_command(command[1:])
else:
u_input = command.split()
if len(u_input) >= 2:
self.commands.get(u_input[0])(u_input[1:])
else:
self.commands.get(u_input[0])()
def update_module(self, value):
self.module = value
def get_module_name(self):
name = ""
if self.module:
name = self.module.get_module_name()
return name
def _exit(self, param=None):
print_info("Killing tasks... ")
Task().get_instance().kill_all_tasks()
print_info("Bye...")
_exit(0)
# Execute a system command
@exception("Error Executing OS Command")
def _execute_command(self, command):
data = Popen(command, shell=True, stdout=PIPE).stdout.read()
print("")
print(data.decode(errors="ignore"))
def _load_theme(self, theme):
theme = colors_terminal.get(theme[0], None)
if theme is not None:
ColorSelected(theme)
print_ok("Theme changed!")
else:
print_error("Theme not available")
def _load(self, name):
try:
loaded = load_module(name[0])
except Exception as e:
print(e)
if loaded:
self._unload()
self.module = loaded
self.module.set_name(name[0])
new_functions = self.module.get_new_functions()
self.shell_options.add_module_options(self.module.get_options_names(), new_functions)
# Add new commands to autocomplete
module_new_commands = {
"set": self.module.set_value,
"global": self._setglobal,
"unset": self.module.unset,
"back": self._unload,
"show": self.module.show,
"run": self._run
}
for f in new_functions:
if f not in list(module_new_commands.keys()):
module_new_commands[f] = getattr(self.module, f)
self.module_commands.append(f)
self.commands.update(module_new_commands)
self.module.update_complete_set()
@exception("")
def _unload(self):
self.module = None
# Remove commands that cannot be used without a module
self.shell_options.del_module_options()
for c in self.module_commands:
try:
del self.commands[c]
except:
self.module_commands.remove(c)
for f in self.module.get_new_functions():
del self.commands[f]
@exception("Error setting global option")
def _setglobal(self, user_input=[]):
if user_input and self.module:
success = self.module.set_value(user_input)
if success:
Global.get_instance().add_value(user_input[0], ' '.join([str(x) for x in user_input[1:]]))
@exception("There are required options without value")
def _run(self):
if not self.module.check_arguments():
raise("")
try:
self.module.run()
except Exception as e:
print(e)
def _fill(self, param=None):
Global.get_instance().load_configuration()
def _save(self, param=None):
Global.get_instance().save_configuration()
def _banner(self, param=None):
banner(animation=False)
def _help(self, param=None):
data = None
if self.module:
data = self.module.get_extra_help()
show_help(data)
def _list_modules(self, category=None):
pwd = "./modules"
if category != None:
pwd += "/" + category[0]
print("")
msg = "Modules list "
print(msg)
print("-"*len(msg))
total = 0
for (p, d, files) in walk(pwd):
for f in files:
if ("_" not in f) and ("_" not in p):
print_info(join(p.replace("./modules/", ""), f.replace(".py", "")))
total += 1
print("-"*len(msg))
print_info(f"Modules count: {total}")
print("")
def _find(self, word=""):
word = ' '.join(word).lower()
data = f"Searching: {word}"
print_info(data)
print("-"*len(data))
modules = Find().search(word)
if not modules:
print_info("No found")
return
for module in modules:
print_info(module)
print("-"*len(data))
print_info(f"Modules count: {len(modules)}")
def _task(self, params):
if "list" in params:
Task().get_instance().show_tasks()
elif "kill" in params and len(params) >= 2:
Task().get_instance().kill_task(params[1])