diff --git a/README.rst b/README.rst index 8c3c660..8f1e664 100644 --- a/README.rst +++ b/README.rst @@ -1,3 +1,8 @@ +THIS PROJECT IS UNMAINTAINED +**************************** +I'm not interested in this project anymore, sorry. +I don't work as an network engineer anymore so I haven't any special goals to improve and maintain it. + Netdev ****** @@ -13,13 +18,14 @@ Requires: * pyYAML -Supports: +Supports: --------- * Cisco IOS * Cisco IOS XE * Cisco IOS XR * Cisco ASA -* Cisco NX-OS +* Cisco NX-OS +* Cisco SG3XX * HP Comware (like V1910 too) * Fujitsu Blade Switches * Mikrotik RouterOS @@ -28,6 +34,7 @@ Supports: * Aruba AOS 6.X * Aruba AOS 8.X * Terminal +* Alcatel AOS Examples: --------- diff --git a/docs/api.rst b/docs/api.rst index 80e971d..eb674e8 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -88,6 +88,13 @@ CiscoNXOS :members: :inherited-members: +CiscoSG3XX +~~~~~~~~~ + +.. autoclass:: CiscoSG3XX + :members: + :inherited-members: + FujitsuSwitch ~~~~~~~~~~~~~ @@ -143,3 +150,10 @@ Terminal .. autoclass:: Terminal :members: :inherited-members: + +AlcatelAOS +~~~~~~~~ + +.. autoclass:: AlcatelAOS + :members: + :inherited-members: diff --git a/docs/examples.rst b/docs/examples.rst index fdc9d61..d3fed1d 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -60,6 +60,10 @@ IOS XR example -------------- .. literalinclude:: ../examples/cisco_iosxr.py +IOS SG3XX example +-------------- +.. literalinclude:: ../examples/cisco_sg3xx.py + Fujitsu example --------------- .. literalinclude:: ../examples/fujitsu_switch.py @@ -90,4 +94,8 @@ Juniper JunOS example Terminal example ---------------- -.. literalinclude:: ../examples/terminal.py \ No newline at end of file +.. literalinclude:: ../examples/terminal.py + +Alcatel AOS example +---------------- +.. literalinclude:: ../examples/alcatel_aos.py \ No newline at end of file diff --git a/docs/overview.rst b/docs/overview.rst index fc8f8c9..4bec854 100644 --- a/docs/overview.rst +++ b/docs/overview.rst @@ -67,11 +67,13 @@ All other classes are the end classes which you can use for working with particu * :class:`CiscoIOSXR` * :class:`CiscoASA` * :class:`CiscoNXOS` +* :class:`CiscoSG3XX` * :class:`FujitsuSwitch` * :class:`HPComware` * :class:`HPComwareLimited` * :class:`AristaEOS` * :class:`JuniperJunOS` +* :class:`AlcatelAOS` The particular class selected by parameter *device_type* in :func:`create` diff --git a/examples/alcatel_aos.py b/examples/alcatel_aos.py new file mode 100644 index 0000000..0e61b5d --- /dev/null +++ b/examples/alcatel_aos.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python3 + +# Import Python library +import asyncio, netdev + +# Coroutine used for the tasks +async def task(param): + + # Create an object for the devices and open SSH connections + async with netdev.create(**param) as device: + + # Testing sending simple command + + # Command to send + cmd = "show system" + + # Sending command + output = await device.send_command(cmd) + + # Display the output + print(output) + + # Display separator + print("*" * 80) + + # Testing sending configuration set + + # Commands to send + commands = ["vlan 3000", "no vlan 3001"] + + # Sending command + output = await device.send_config_set(commands) + + # Display the output + print(output) + + +# Main coroutine +async def main(): + + # Parameters of the network device + my_device = { 'username' : 'LOGIN', + 'password' : 'PASSWORD', + 'host': 'IP_ADDRESS', + 'device_type': 'alcatel_aos', + } + + # List of devices + devices = [my_device] + + # List of tasks + my_tasks = [task(dev) for dev in devices] + + # Starting the coroutine of the tasks + await asyncio.wait(my_tasks) + + +# Main function call +if __name__ == '__main__': + + # Run the main coroutine + asyncio.run(main()) + + ''' + Result: + ******************************************************************************** + System: + Description: Alcatel-Lucent Enterprise OS6860E-48 8.4.1.141.R03 GA, December 07, 2017., + Object ID: 1.3.6.1.4.1.6486.801.1.1.2.1.11.1.7, + Up Time: 5 days 5 hours 3 minutes and 56 seconds, + Contact: Alcatel-Lucent, http://enterprise.alcatel-lucent.com, + Name: switch01, + Location: Somewhere nearby, + Services: 78, + Date & Time: SAT AUG 29 2020 18:48:53 (CEST) + Flash Space: + Primary CMM: + Available (bytes): 933896192, + Comments : None + + ******************************************************************************** + vlan 3000 + + switch01 ==> no vlan 3001 + + switch01 ==> + + ******************************************************************************** + + ''' \ No newline at end of file diff --git a/examples/cisco_sg3xx.py b/examples/cisco_sg3xx.py new file mode 100644 index 0000000..a0892cd --- /dev/null +++ b/examples/cisco_sg3xx.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python3 + +# Import Python library +import asyncio, netdev + +# Coroutine used for the tasks +async def task(param): + + # Create an object for the devices and open SSH connections + async with netdev.create(**param) as ios: + + # Testing sending simple command + + # Command to send + cmd = "show clock" + + # Sending command + output = await ios.send_command(cmd) + + # Display the output + print(output) + +# Main coroutine +async def main(): + + # Parameters of the network device + my_device = { 'username' : 'LOGIN', + 'password' : 'PASSWORD', + 'host': 'IP_ADDRESS', + 'device_type': 'cisco_sg3xx', + } + + # List of devices + devices = [my_device] + + # List of tasks + my_tasks = [task(dev) for dev in devices] + + # Starting the coroutine of the tasks + await asyncio.wait(my_tasks) + + +# Main function call +if __name__ == '__main__': + + # Run the main coroutine + asyncio.run(main()) + + ''' + Result: + ******************************************************************************** + .14:07:35 J Aug 28 2020 + Time source is sntp + Time from Browser is disabled + ******************************************************************************** + + ''' \ No newline at end of file diff --git a/netdev/dispatcher.py b/netdev/dispatcher.py index aaab07f..5f54483 100644 --- a/netdev/dispatcher.py +++ b/netdev/dispatcher.py @@ -1,19 +1,23 @@ """ Factory function for creating netdev classes """ +from netdev.vendors import AlcatelAOS from netdev.vendors import AristaEOS from netdev.vendors import ArubaAOS6, ArubaAOS8 -from netdev.vendors import CiscoASA, CiscoIOS, CiscoIOSXR, CiscoNXOS +from netdev.vendors import CiscoASA, CiscoIOS, CiscoIOSXR, CiscoNXOS, CiscoSG3XX from netdev.vendors import FujitsuSwitch from netdev.vendors import HPComware, HPComwareLimited from netdev.vendors import JuniperJunOS from netdev.vendors import MikrotikRouterOS from netdev.vendors import Terminal from netdev.vendors import UbiquityEdgeSwitch +from netdev.vendors import HW1000 +from netdev.vendors import Huawei # @formatter:off # The keys of this dictionary are the supported device_types CLASS_MAPPER = { + "alcatel_aos": AlcatelAOS, "arista_eos": AristaEOS, "aruba_aos_6": ArubaAOS6, "aruba_aos_8": ArubaAOS8, @@ -22,6 +26,7 @@ "cisco_ios_xe": CiscoIOS, "cisco_ios_xr": CiscoIOSXR, "cisco_nxos": CiscoNXOS, + "cisco_sg3xx": CiscoSG3XX, "fujitsu_switch": FujitsuSwitch, "hp_comware": HPComware, "hp_comware_limited": HPComwareLimited, @@ -29,6 +34,8 @@ "mikrotik_routeros": MikrotikRouterOS, "ubiquity_edge": UbiquityEdgeSwitch, "terminal": Terminal, + "hw1000": HW1000, + "huawei": Huawei, } # @formatter:on diff --git a/netdev/vendors/__init__.py b/netdev/vendors/__init__.py index f7f410b..e1664f2 100644 --- a/netdev/vendors/__init__.py +++ b/netdev/vendors/__init__.py @@ -1,7 +1,8 @@ +from netdev.vendors.alcatel import AlcatelAOS from netdev.vendors.arista import AristaEOS from netdev.vendors.aruba import ArubaAOS8, ArubaAOS6 from netdev.vendors.base import BaseDevice -from netdev.vendors.cisco import CiscoNXOS, CiscoIOSXR, CiscoASA, CiscoIOS +from netdev.vendors.cisco import CiscoNXOS, CiscoIOSXR, CiscoASA, CiscoIOS, CiscoSG3XX from netdev.vendors.comware_like import ComwareLikeDevice from netdev.vendors.fujitsu import FujitsuSwitch from netdev.vendors.hp import HPComware, HPComwareLimited @@ -11,12 +12,15 @@ from netdev.vendors.mikrotik import MikrotikRouterOS from netdev.vendors.terminal import Terminal from netdev.vendors.ubiquiti import UbiquityEdgeSwitch +from netdev.vendors.infotecs import HW1000 +from netdev.vendors.huawei import Huawei __all__ = ( "CiscoASA", "CiscoIOS", "CiscoIOSXR", "CiscoNXOS", + "CiscoSG3XX", "HPComware", "HPComwareLimited", "FujitsuSwitch", @@ -38,4 +42,7 @@ "juniper", "mikrotik", "UbiquityEdgeSwitch", + "HW1000", + "AlcatelAOS", + "Huawei", ) diff --git a/netdev/vendors/alcatel/__init__.py b/netdev/vendors/alcatel/__init__.py new file mode 100644 index 0000000..7b2aa44 --- /dev/null +++ b/netdev/vendors/alcatel/__init__.py @@ -0,0 +1,3 @@ +from .alcatel_aos import AlcatelAOS + +__all__ = ["AlcatelAOS"] diff --git a/netdev/vendors/alcatel/alcatel_aos.py b/netdev/vendors/alcatel/alcatel_aos.py new file mode 100644 index 0000000..cb2db39 --- /dev/null +++ b/netdev/vendors/alcatel/alcatel_aos.py @@ -0,0 +1,32 @@ +from netdev.vendors.base import BaseDevice +from netdev.logger import logger +import asyncio +import re + +class AlcatelAOS(BaseDevice): + """Class for working with Alcatel AOS""" + + + async def _read_until_prompt_or_pattern(self, pattern="", re_flags=0): + """Read until either self.base_pattern or pattern is detected. Return ALL data available""" + output = "" + logger.info("Host {}: Reading until prompt or pattern".format(self._host)) + if not pattern: + pattern = self._base_pattern + base_prompt_pattern = self._base_pattern + while True: + fut = self._stdout.read(self._MAX_BUFFER) + try: + output += await asyncio.wait_for(fut, self._timeout) + except asyncio.TimeoutError: + raise TimeoutError(self._host) + if re.search("\n" + pattern, output, flags=re_flags) or re.search( + "\n" + base_prompt_pattern, output, flags=re_flags + ): + logger.debug( + "Host {}: Reading pattern '{}' or '{}' was found: {}".format( + self._host, pattern, base_prompt_pattern, repr(output) + ) + ) + return output + diff --git a/netdev/vendors/base.py b/netdev/vendors/base.py index 48d422c..19bfb0f 100644 --- a/netdev/vendors/base.py +++ b/netdev/vendors/base.py @@ -42,6 +42,7 @@ def __init__( mac_algs=(), compression_algs=(), signature_algs=(), + server_host_key_algs=None, ): """ Initialize base class for asynchronous working with network devices @@ -92,6 +93,10 @@ def __init__( A list of public key signature algorithms to use during the SSH handshake, taken from `signature algorithms `_ + :param server_host_key_algs: + A list of server host key algorithms to allow during the SSH handshake, + taken from server host key algorithms. + https://asyncssh.readthedocs.io/en/latest/api.html#publickeyalgs :type host: str @@ -121,6 +126,7 @@ def __init__( :type mac_algs: list[str] :type compression_algs: list[str] :type signature_algs: list[str] + :type server_host_key_algs: list[str] """ if host: self._host = host @@ -146,7 +152,6 @@ def __init__( "passphrase": passphrase, "tunnel": tunnel, "agent_forwarding": agent_forwarding, - "loop": loop, "family": family, "agent_path": agent_path, "client_version": client_version, @@ -159,6 +164,14 @@ def __init__( if pattern is not None: self._pattern = pattern + + """ + A list of server host key algorithms to use instead of the default of + those present in known_hosts when performing the SSH handshake. This should only be set, + when the user sets it. + """ + if server_host_key_algs is not None: + self._connect_params_dict['server_host_key_algs'] = server_host_key_algs # Filling internal vars self._stdin = self._stdout = self._stderr = self._conn = None @@ -480,6 +493,7 @@ def _strip_ansi_escape_codes(string_buffer): ESC[24;27H Position cursor ESC[?25h Show the cursor ESC[E Next line (HP does ESC-E) + ESC[K Erase line. Clear from cursor to end of screen ESC[2K Erase line ESC[1;24r Enable scrolling from start to row end ESC7 Save cursor position @@ -505,6 +519,7 @@ def _strip_ansi_escape_codes(string_buffer): code_position_cursor = chr(27) + r"\[\d+;\d+H" code_show_cursor = chr(27) + r"\[\?25h" code_next_line = chr(27) + r"E" + code_erase_line_from_cursor = chr(27) + r"\[K" code_erase_line = chr(27) + r"\[2K" code_enable_scroll = chr(27) + r"\[\d+;\d+r" @@ -517,6 +532,7 @@ def _strip_ansi_escape_codes(string_buffer): code_position_cursor, code_show_cursor, code_erase_line, + code_erase_line_from_cursor, code_enable_scroll, ] diff --git a/netdev/vendors/cisco/__init__.py b/netdev/vendors/cisco/__init__.py index 463ac7a..8a41e86 100644 --- a/netdev/vendors/cisco/__init__.py +++ b/netdev/vendors/cisco/__init__.py @@ -2,5 +2,6 @@ from .cisco_ios import CiscoIOS from .cisco_iosxr import CiscoIOSXR from .cisco_nxos import CiscoNXOS +from .cisco_sg3xx import CiscoSG3XX -__all__ = ["CiscoIOS", "CiscoIOSXR", "CiscoASA", "CiscoNXOS"] +__all__ = ["CiscoIOS", "CiscoIOSXR", "CiscoASA", "CiscoNXOS", "CiscoSG3XX"] diff --git a/netdev/vendors/cisco/cisco_sg3xx.py b/netdev/vendors/cisco/cisco_sg3xx.py new file mode 100644 index 0000000..f91c58b --- /dev/null +++ b/netdev/vendors/cisco/cisco_sg3xx.py @@ -0,0 +1,30 @@ +"""Subclass specific to Cisco SG3XX""" + +from netdev.vendors.ios_like import IOSLikeDevice + +class CiscoSG3XX(IOSLikeDevice): + """Class for working with Cisco SG3XX""" + + def __init__(self, *args, **kwargs): + """ + Initialize class for asynchronous working with network devices + + :param str host: device hostname or ip address for connection + :param str username: username for logging to device + :param str password: user password for logging to device + :param str secret: secret password for privilege mode + :param int port: ssh port for connection. Default is 22 + :param str device_type: network device type + :param known_hosts: file with known hosts. Default is None (no policy). With () it will use default file + :param str local_addr: local address for binding source of tcp connection + :param client_keys: path for client keys. Default in None. With () it will use default file in OS + :param str passphrase: password for encrypted client keys + :param float timeout: timeout in second for getting information from channel + :param loop: asyncio loop object + """ + super().__init__(*args, **kwargs) + self._ansi_escape_codes = True + + _disable_paging_command = "terminal datadump" + + diff --git a/netdev/vendors/huawei/__init__.py b/netdev/vendors/huawei/__init__.py new file mode 100644 index 0000000..6134d83 --- /dev/null +++ b/netdev/vendors/huawei/__init__.py @@ -0,0 +1,3 @@ +from .huawei import Huawei + +__all__ = ["Huawei"] \ No newline at end of file diff --git a/netdev/vendors/huawei/huawei.py b/netdev/vendors/huawei/huawei.py new file mode 100644 index 0000000..bbe789f --- /dev/null +++ b/netdev/vendors/huawei/huawei.py @@ -0,0 +1,38 @@ +from netdev.vendors.comware_like import ComwareLikeDevice +import re +from netdev.logger import logger + +class Huawei(ComwareLikeDevice): + """Class for working with Huawei""" + + _disable_paging_command = "screen-length 0 temporary" + """Command for disabling paging""" + + async def _set_base_prompt(self): + """ + Setting two important vars + base_prompt - textual prompt in CLI (usually hostname) + base_pattern - regexp for finding the end of command. IT's platform specific parameter + + For Comware devices base_pattern is "[\]|>]prompt(\-\w+)?[\]|>] + """ + logger.info("Host {}: Setting base prompt".format(self._host)) + prompt = await self._find_prompt() + # Strip off any leading HRP_. characters for USGv5 HA + prompt = re.sub(r"^HRP_.", "", prompt, flags=re.M) + # Strip off trailing terminator + self._base_prompt = prompt[1:-1] + delimiter_right = map(re.escape, type(self)._delimiter_list) + delimiter_right = r"|".join(delimiter_right) + delimiter_left = map(re.escape, type(self)._delimiter_left_list) + delimiter_left = r"|".join(delimiter_left) + base_prompt = re.escape(self._base_prompt[:12]) + pattern = type(self)._pattern + self._base_pattern = pattern.format( + delimiter_left=delimiter_left, + prompt=base_prompt, + delimiter_right=delimiter_right, + ) + logger.debug("Host {}: Base Prompt: {}".format(self._host, self._base_prompt)) + logger.debug("Host {}: Base Pattern: {}".format(self._host, self._base_pattern)) + return self._base_prompt \ No newline at end of file diff --git a/netdev/vendors/infotecs/HW1000.py b/netdev/vendors/infotecs/HW1000.py new file mode 100644 index 0000000..93f95de --- /dev/null +++ b/netdev/vendors/infotecs/HW1000.py @@ -0,0 +1,170 @@ +""" +HW1000 is a class for working with Vipnet HW1000 crypto gateways +""" +import re +from netdev.logger import logger +from netdev.vendors.base import BaseDevice + +class HW1000(BaseDevice): + """ + Class for working with Vipnet HW1000 + + HW1000 devices have three administration modes: + *user exec or unprivileged exec. This mode allows you perform basic tests and get system information. + *privilege exec. This mode allows all EXEC mode commands available on the system. HW100 supports + only one active privilege session. Use preempt_privilege=True to close current privilege session + *shell. This mode exits standart device shell and enters Linux shell + """ + def __init__(self, secret=u'',preempt_privilege=False, *args, **kwargs): + """ + Initialize class for asynchronous working with network devices + :param str host: device hostname or ip address for connection + :param str username: username for logging to device + :param str password: user password for logging to device + :param str secret: secret password for privilege mode + :param bool preempt_privilege: close current privilige session (if exists). Default is False + :param int port: ssh port for connection. Default is 22 + :param str device_type: network device type + :param known_hosts: file with known hosts. Default is None (no policy). With () it will use default file + :param str local_addr: local address for binding source of tcp connection + :param client_keys: path for client keys. Default in None. With () it will use default file in OS + :param str passphrase: password for encrypted client keys + :param float timeout: timeout in second for getting information from channel + :param loop: asyncio loop object + """ + self._secret = secret + self._preempt_privilege = preempt_privilege + + super().__init__(*args, **kwargs) + + _priv_enter = 'enable' + """Command for entering to privilege exec""" + + _priv_exit = 'exit' + """Command for existing from privilege exec to user exec""" + + _priv_check = '#' + """Checking string in prompt. If it's exist im prompt - we are in privilege exec""" + + _priv_confirm_message = "Are you sure you want to force termination of the specified session" + """Confirmation message for privilege preemtion""" + + _shell_enter = "admin esc" + """Command for entering Linux shell""" + + _shell_exit = "exit" + """Command for exiting Linux shell""" + + _shell_check = "sh" + """Checking string in prompt. If it's exist im prompt - we are in Linux shell""" + + _shell_enter_message = "Are you sure you want to exit to the Linux system shell?" + """Confirmation message for entering Linux shell""" + + async def connect(self): + """ + Basic asynchronous connection method for HW1000 devices + + It connects to device and makes some preparation steps for working. + Usual using 3 functions: + + * _establish_connection() for connecting to device + * _set_base_prompt() for finding and setting device prompt + * _enable() for getting privilege exec mode + """ + logger.info("Host {}: Trying to connect to the device".format(self._host)) + await self._establish_connection() + await self._set_base_prompt() + await self.enable_mode() + logger.info("Host {}: Has connected to the device".format(self._host)) + + async def check_enable_mode(self): + """Check if we are in privilege exec. Return boolean""" + logger.info('Host {}: Checking privilege exec'.format(self._host)) + check_string = type(self)._priv_check + self._stdin.write(self._normalize_cmd('\n')) + output = await self._read_until_prompt() + return check_string in output + + async def enable_mode(self, pattern='password', re_flags=re.IGNORECASE): + """Enter to privilege exec""" + logger.info('Host {}: Entering to privilege exec'.format(self._host)) + output = "" + enable_command = type(self)._priv_enter + if not await self.check_enable_mode(): + self._stdin.write(self._normalize_cmd(enable_command)) + output += await self._read_until_prompt_or_pattern( + pattern=pattern, re_flags=re_flags) + if re.search(pattern, output, re_flags): + self._stdin.write(self._normalize_cmd(self._secret)) + output += await self._read_until_prompt_or_pattern( + pattern=type(self)._priv_confirm_message,re_flags=re_flags) + if re.search(type(self)._priv_confirm_message,output,re_flags): + if self._preempt_privilege: + self._stdin.write(self._normalize_cmd("Yes")) + else: + raise ValueError("Failed to enter privilege exec:" + "there is already a active administration session." + "Use preempt_privilege=True") + if not await self.check_enable_mode(): + raise ValueError("Failed to enter to privilege exec") + return output + + async def exit_enable_mode(self): + """Exit from privilege exec""" + logger.info('Host {}: Exiting from privilege exec'.format(self._host)) + output = "" + exit_enable = type(self)._priv_exit + if await self.check_enable_mode(): + self._stdin.write(self._normalize_cmd(exit_enable)) + output += await self._read_until_prompt() + if await self.check_enable_mode(): + raise ValueError("Failed to exit from privilege exec") + return output + + async def check_shell_mode(self): + """Checks if device in shell mode or not""" + logger.info('Host {}: Checking shell mode'.format(self._host)) + check_string = type(self)._shell_check + self._stdin.write(self._normalize_cmd('\n')) + output = await self._read_until_pattern(r'[\>|\#]') + logger.info(output) + return check_string in output + + async def enter_shell_mode(self,re_flags=re.IGNORECASE): + """ Enter into shell mode""" + logger.info('Host {}: Entering to shell mode'.format(self._host)) + output = '' + shell_command = type(self)._shell_enter + if not await self.check_shell_mode(): + self._stdin.write(self._normalize_cmd(shell_command)) + output += await self._read_until_pattern( + pattern=type(self)._shell_enter_message,re_flags=re_flags) + self._stdin.write(self._normalize_cmd("Yes")) + output += await self._read_until_pattern('password:', re_flags=re_flags) + self._stdin.write(self._normalize_cmd(self._secret)) + output += await self._read_until_pattern(r'[\>|\#]') + await self._set_base_prompt() # base promt differs in shell mode + if not await self.check_shell_mode(): + raise ValueError('Failed to enter to shell mode') + return output + + async def exit_shell_mode(self): + """Exit from shell mode""" + logger.info('Host {}: Exiting from shell mode'.format(self._host)) + output = '' + exit_shell = type(self)._shell_exit + if await self.check_shell_mode(): + self._stdin.write(self._normalize_cmd(exit_shell)) + output = await self._read_until_pattern(r'[\>|\#]') + if await self.check_shell_mode(): + raise ValueError("Failed to exit from shell mode") + await self._set_base_prompt() # base promt differs in shell mode + return output + + async def _cleanup(self): + """ Any needed cleanup before closing connection """ + logger.info("Host {}: Cleanup session".format(self._host)) + await self.exit_shell_mode() + await self.exit_enable_mode() + \ No newline at end of file diff --git a/netdev/vendors/infotecs/__init__.py b/netdev/vendors/infotecs/__init__.py new file mode 100644 index 0000000..518d3c1 --- /dev/null +++ b/netdev/vendors/infotecs/__init__.py @@ -0,0 +1,3 @@ +from .HW1000 import HW1000 + +__all__ = ['HW1000'] diff --git a/netdev/vendors/mikrotik/mikrotik_routeros.py b/netdev/vendors/mikrotik/mikrotik_routeros.py index 97a8a1c..af10574 100644 --- a/netdev/vendors/mikrotik/mikrotik_routeros.py +++ b/netdev/vendors/mikrotik/mikrotik_routeros.py @@ -1,6 +1,7 @@ import asyncssh +import asyncio -from netdev.exceptions import DisconnectError +from netdev.exceptions import DisconnectError, TimeoutError from netdev.logger import logger from netdev.vendors.base import BaseDevice @@ -34,7 +35,7 @@ def __init__(self, *args, **kwargs): """ super().__init__(*args, **kwargs) self._base_pattern = r"\[.*?\] \>.*\[.*?\] \>" - self._username += "+ct200w" + self._connect_params_dict["username"] += "+ct200w" self._ansi_escape_codes = True _pattern = r"\[.*?\] (\/.*?)?\>" @@ -60,13 +61,15 @@ async def _establish_connection(self): ) output = "" # initiate SSH connection + fut = asyncssh.connect(**self._connect_params_dict) try: - self._conn = await asyncssh.connect(**self._connect_params_dict) + self._conn = await asyncio.wait_for(fut, self._timeout) except asyncssh.DisconnectError as e: raise DisconnectError(self._host, e.code, e.reason) - + except asyncio.TimeoutError: + raise TimeoutError(self._host) self._stdin, self._stdout, self._stderr = await self._conn.open_session( - term_type="dumb" + term_type="Dumb" ) logger.info("Host {}: Connection is established".format(self._host)) # Flush unnecessary data diff --git a/netdev/version.py b/netdev/version.py index 7593538..3a54693 100644 --- a/netdev/version.py +++ b/netdev/version.py @@ -1,7 +1,7 @@ """ Netdev Version information """ -__version__ = "0.9.1" +__version__ = "0.9.3" __author__ = "Yakovlev Sergey" __author_email__ = "selfuryon@gmail.com" __url__ = "http://netdev.readthedocs.io/" diff --git a/pyproject.toml b/pyproject.toml index 9275360..1aa58ef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.masonry.api" [tool.poetry] name = "netdev" -version = "0.9.1" +version = "0.9.3" description = "Asynchronous multi-vendor library for interacting with network devices" authors = ["Sergey Yakovlev "] license = "Apache-2.0" diff --git a/tests/test_cisco_iossg3xx.py b/tests/test_cisco_iossg3xx.py new file mode 100644 index 0000000..ca7c398 --- /dev/null +++ b/tests/test_cisco_iossg3xx.py @@ -0,0 +1,88 @@ +import asyncio +import logging +import unittest + +import yaml + +import netdev + +logging.basicConfig(filename='unittest.log', level=logging.DEBUG) +config_path = 'config.yaml' + + +class TestIOSSG3XX(unittest.TestCase): + @staticmethod + def load_credits(): + with open(config_path, 'r') as conf: + config = yaml.safe_load(conf) + with open(config['device_list'], 'r') as devs: + devices = yaml.safe_load(devs) + params = [p for p in devices if p['device_type'] == 'cisco_sg3xx'] + return params + + def setUp(self): + self.loop = asyncio.new_event_loop() + self.loop.set_debug(False) + asyncio.set_event_loop(self.loop) + self.devices = self.load_credits() + self.assertFalse(len(self.devices) == 0) + + def test_show_run_hostname(self): + async def task(): + for dev in self.devices: + async with netdev.create(**dev) as ios: + out = await ios.send_command('show run | i hostname') + self.assertIn("hostname", out) + + self.loop.run_until_complete(task()) + + def test_timeout(self): + async def task(): + for dev in self.devices: + with self.assertRaises(netdev.TimeoutError): + async with netdev.create(**dev, timeout=0.1) as ios: + await ios.send_command('show run | i hostname') + + self.loop.run_until_complete(task()) + + def test_show_several_commands(self): + async def task(): + for dev in self.devices: + async with netdev.create(**dev) as ios: + commands = ["dir", "show ver", "show run", "show ssh"] + for cmd in commands: + out = await ios.send_command(cmd, strip_command=False) + self.assertIn(cmd, out) + + self.loop.run_until_complete(task()) + + def test_config_set(self): + async def task(): + for dev in self.devices: + async with netdev.create(**dev) as ios: + commands = ["line con", "exit"] + out = await ios.send_config_set(commands) + self.assertIn("line con", out) + self.assertIn("exit", out) + + self.loop.run_until_complete(task()) + + def test_base_prompt(self): + async def task(): + for dev in self.devices: + async with netdev.create(**dev) as ios: + out = await ios.send_command('sh run | i hostname') + self.assertIn(ios.base_prompt, out) + + self.loop.run_until_complete(task()) + + def test_interactive_commands(self): + async def task(): + for dev in self.devices: + async with netdev.create(**dev) as ios: + out = await ios.send_command("conf", pattern=r'\[terminal\]\?', strip_command=False) + out += await ios.send_command("term", strip_command=False) + out += await ios.send_command("exit", strip_command=False) + self.assertIn('Enter configuration commands', out) + + self.loop.run_until_complete(task()) diff --git a/tests/test_hp_comware.py b/tests/test_hp_comware.py index aab7a45..bbb27ce 100644 --- a/tests/test_hp_comware.py +++ b/tests/test_hp_comware.py @@ -74,4 +74,4 @@ async def task(): async with netdev.create(**dev, timeout=0.1) as hp: await hp.send_command('display cur | i sysname') - self.loop.run_until_complete(task()) + self.loop.run_until_complete(task()) \ No newline at end of file diff --git a/tests/test_huawei.py b/tests/test_huawei.py new file mode 100644 index 0000000..7567348 --- /dev/null +++ b/tests/test_huawei.py @@ -0,0 +1,77 @@ +import asyncio +import logging +import unittest + +import yaml + +import netdev + +logging.basicConfig(filename='unittest.log', level=logging.DEBUG) +config_path = 'config.yaml' + + +class TestHuawei(unittest.TestCase): + @staticmethod + def load_credits(): + with open(config_path, 'r') as conf: + config = yaml.safe_load(conf) + with open(config['device_list'], 'r') as devs: + devices = yaml.safe_load(devs) + params = [p for p in devices if p['device_type'] == 'huawei'] + return params + + def setUp(self): + self.loop = asyncio.new_event_loop() + self.loop.set_debug(False) + asyncio.set_event_loop(self.loop) + self.devices = self.load_credits() + self.assertFalse(len(self.devices) == 0) + + def test_show_sysname(self): + async def task(): + for dev in self.devices: + async with netdev.create(**dev) as huawei: + out = await huawei.send_command('display cur | i sysname') + self.assertIn("sysname", out) + + self.loop.run_until_complete(task()) + + def test_show_several_commands(self): + async def task(): + for dev in self.devices: + async with netdev.create(**dev) as huawei: + commands = ["dir", "display ver", "display cur"] + for cmd in commands: + out = await huawei.send_command(cmd, strip_command=False) + self.assertIn(cmd, out) + + self.loop.run_until_complete(task()) + + def test_config_set(self): + async def task(): + for dev in self.devices: + async with netdev.create(**dev) as huawei: + commands = ["vlan 1", "quit"] + out = await huawei.send_config_set(commands) + self.assertIn("vlan 1", out) + self.assertIn("quit", out) + + self.loop.run_until_complete(task()) + + def test_base_prompt(self): + async def task(): + for dev in self.devices: + async with netdev.create(**dev) as huawei: + out = await huawei.send_command('display cur | i sysname') + self.assertIn(huawei.base_prompt, out) + + self.loop.run_until_complete(task()) + + def test_timeout(self): + async def task(): + for dev in self.devices: + with self.assertRaises(netdev.TimeoutError): + async with netdev.create(**dev, timeout=0.1) as huawei: + await huawei.send_command('display cur | i sysname') + + self.loop.run_until_complete(task()) \ No newline at end of file diff --git a/tests/test_hw1000.py b/tests/test_hw1000.py new file mode 100644 index 0000000..156923d --- /dev/null +++ b/tests/test_hw1000.py @@ -0,0 +1,61 @@ +import asyncio +import logging +import unittest +import yaml +import netdev + +logging.basicConfig(filename='unittest.log', level=logging.DEBUG) +config_path = 'config.yaml' + +class TestHW1000(unittest.TestCase): + @staticmethod + def load_credits(): + with open(config_path, 'r') as conf: + config = yaml.load(conf) + with open(config['device_list'], 'r') as devs: + devices = yaml.load(devs) + params = [p for p in devices if p['device_type'] == 'hw1000'] + return params + + def setUp(self): + self.loop = asyncio.new_event_loop() + self.loop.set_debug(False) + asyncio.set_event_loop(self.loop) + self.devices = self.load_credits() + self.assertFalse(len(self.devices) == 0) + + def test_simple_command(self): + async def task(): + for dev in self.devices: + async with netdev.create(**dev) as hw: + out = await hw.send_command("inet show snmp") + self.assertIn('SNMP',out) + self.loop.run_until_complete(task()) + + def test_long_command(self): + async def task(): + for dev in self.devices: + async with netdev.create(**dev) as hw: + out = await hw.send_command("inet show interface") + out_len=(len(out.split('\n'))) + self.assertGreater(out_len,10) + self.loop.run_until_complete(task()) + + def test_linux_mode(self): + async def task(): + for dev in self.devices: + async with netdev.create(**dev) as hw: + out = await hw.enter_shell_mode() + out = await hw.send_command("id") + self.assertIn('uid=0(root)',out) + self.loop.run_until_complete(task()) + + def test_linux_mode_indepotence(self): + async def task(): + for dev in self.devices: + async with netdev.create(**dev) as hw: + out = await hw.enter_shell_mode() + out = await hw.enter_shell_mode() + out = await hw.send_command("id") + self.assertIn('uid=0(root)',out) + self.loop.run_until_complete(task())