forked from sqlmapproject/sqlmap
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsqlalchemy.py
More file actions
110 lines (93 loc) · 4.1 KB
/
sqlalchemy.py
File metadata and controls
110 lines (93 loc) · 4.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#!/usr/bin/env python
"""
Copyright (c) 2006-2019 sqlmap developers (http://sqlmap.org/)
See the file 'LICENSE' for copying permission
"""
import imp
import logging
import os
import sys
import traceback
import warnings
_sqlalchemy = None
try:
f, pathname, desc = imp.find_module("sqlalchemy", sys.path[1:])
_ = imp.load_module("sqlalchemy", f, pathname, desc)
if hasattr(_, "dialects"):
_sqlalchemy = _
warnings.simplefilter(action="ignore", category=_sqlalchemy.exc.SAWarning)
except ImportError:
pass
try:
import MySQLdb # used by SQLAlchemy in case of MySQL
warnings.filterwarnings("error", category=MySQLdb.Warning)
except ImportError:
pass
from lib.core.data import conf
from lib.core.data import logger
from lib.core.exception import SqlmapConnectionException
from lib.core.exception import SqlmapFilePathException
from lib.core.exception import SqlmapMissingDependence
from plugins.generic.connector import Connector as GenericConnector
def getSafeExString(ex, encoding=None): # Cross-referenced function
raise NotImplementedError
class SQLAlchemy(GenericConnector):
def __init__(self, dialect=None):
GenericConnector.__init__(self)
self.dialect = dialect
def connect(self):
if _sqlalchemy:
self.initConnection()
try:
if not self.port and self.db:
if not os.path.exists(self.db):
raise SqlmapFilePathException("the provided database file '%s' does not exist" % self.db)
_ = conf.direct.split("//", 1)
conf.direct = "%s////%s" % (_[0], os.path.abspath(self.db))
if self.dialect:
conf.direct = conf.direct.replace(conf.dbms, self.dialect, 1)
if self.dialect == "sqlite":
engine = _sqlalchemy.create_engine(conf.direct, connect_args={"check_same_thread": False})
elif self.dialect == "oracle":
engine = _sqlalchemy.create_engine(conf.direct)
else:
engine = _sqlalchemy.create_engine(conf.direct, connect_args={})
self.connector = engine.connect()
except (TypeError, ValueError):
if "_get_server_version_info" in traceback.format_exc():
try:
import pymssql
if int(pymssql.__version__[0]) < 2:
raise SqlmapConnectionException("SQLAlchemy connection issue (obsolete version of pymssql ('%s') is causing problems)" % pymssql.__version__)
except ImportError:
pass
elif "invalid literal for int() with base 10: '0b" in traceback.format_exc():
raise SqlmapConnectionException("SQLAlchemy connection issue ('https://bitbucket.org/zzzeek/sqlalchemy/issues/3975')")
else:
pass
except SqlmapFilePathException:
raise
except Exception as ex:
raise SqlmapConnectionException("SQLAlchemy connection issue ('%s')" % getSafeExString(ex))
self.printConnected()
else:
raise SqlmapMissingDependence("SQLAlchemy not available")
def fetchall(self):
try:
retVal = []
for row in self.cursor.fetchall():
retVal.append(tuple(row))
return retVal
except _sqlalchemy.exc.ProgrammingError as ex:
logger.log(logging.WARN if conf.dbmsHandler else logging.DEBUG, "(remote) %s" % getSafeExString(ex))
return None
def execute(self, query):
try:
self.cursor = self.connector.execute(query)
except (_sqlalchemy.exc.OperationalError, _sqlalchemy.exc.ProgrammingError) as ex:
logger.log(logging.WARN if conf.dbmsHandler else logging.DEBUG, "(remote) %s" % getSafeExString(ex))
except _sqlalchemy.exc.InternalError as ex:
raise SqlmapConnectionException(getSafeExString(ex))
def select(self, query):
self.execute(query)
return self.fetchall()