forked from Mindinventory/MindSQL
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsqlite.py
More file actions
184 lines (149 loc) · 5.79 KB
/
Copy pathsqlite.py
File metadata and controls
184 lines (149 loc) · 5.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import os
import sqlite3
import warnings
from sqlite3 import Connection
from typing import List
from urllib.parse import urlparse
import pandas as pd
import requests
from .._utils import logger
from .._utils.constants import ERROR_DOWNLOADING_SQLITE_DB_CONSTANT, ERROR_CONNECTING_TO_DB_CONSTANT, \
INVALID_DB_CONNECTION_OBJECT, ERROR_WHILE_RUNNING_QUERY, SQLLITE_GET_DB_QUERY, SQLLIE_TABLE_INFO_SCHEMA_CONSTANT, \
SQLLITE_TRAINING_DATASET_QUERY_CONSTANT, CONNECTION_ESTABLISH_ERROR_CONSTANT
from . import IDatabase
warnings.simplefilter(action='ignore', category=UserWarning)
log = logger.init_loggers("Sqlite")
class Sqlite(IDatabase):
@staticmethod
def __download_database(url: str, destination_path: str) -> None:
"""
Download the database if it doesn't exist
Parameters:
url (str): The URL of the database
destination_path (str): The path to save the database
Returns:
None
"""
try:
response = requests.get(url)
response.raise_for_status() # Check that the request was successful
with open(destination_path, "wb") as f:
f.write(response.content)
except requests.RequestException as e:
log.info(ERROR_DOWNLOADING_SQLITE_DB_CONSTANT.format(e))
def create_connection(self, url: str, **kwargs) -> Connection | None:
"""
A method to create a connection with the database.
Parameters:
url (str): The URL of the database
Returns:
Connection | None: A connection object
"""
if urlparse(url).scheme == '' and os.path.isabs(url):
path = url
else:
path = os.path.basename(urlparse(url).path)
# Download the database if it doesn't exist
if not os.path.exists(path):
self.__download_database(url, path)
try:
conn = sqlite3.connect(path)
return conn
except sqlite3.Error as e:
log.info(ERROR_CONNECTING_TO_DB_CONSTANT.format("SQLite", e))
return None
def validate_connection(self, connection):
"""
A function that validates if the provided connection is a SQLite connection.
Parameters:
connection (Connection): A connection object
Returns:
None
"""
if connection is None:
raise ValueError(CONNECTION_ESTABLISH_ERROR_CONSTANT)
if not isinstance(connection, sqlite3.Connection):
raise ValueError(INVALID_DB_CONNECTION_OBJECT.format("SQLite"))
def execute_sql(self, connection, sql: str) -> pd.DataFrame:
"""
A method to run a SQL query on the database.
Parameters:
connection (Connection): A connection object
sql (str): The SQL query to run
Returns:
pd.DataFrame: A DataFrame containing the query results
"""
self.validate_connection(connection)
try:
result = pd.read_sql_query(sql, connection)
return result
except sqlite3.Error as e:
log.info(ERROR_WHILE_RUNNING_QUERY.format(e))
return pd.DataFrame()
def get_databases(self, connection) -> List[str]:
"""
Get a list of databases from the given connection and SQL query.
Parameters:
connection (Connection): A connection object
Returns:
List[str]: A list of unique database names
"""
self.validate_connection(connection)
try:
df_databases = pd.read_sql_query(SQLLITE_GET_DB_QUERY, connection)
return df_databases["name"].tolist()
except Exception as e:
log.info(e)
return []
def get_table_names(self, connection, database: str) -> pd.DataFrame:
"""
A method to get the list of tables in the database.
Parameters:
connection (Connection): A connection object
database (str): The name of the database
Returns:
pd.DataFrame: The list of tables
"""
self.validate_connection(connection)
try:
result = pd.read_sql_query(SQLLIE_TABLE_INFO_SCHEMA_CONSTANT, connection)
return result
except sqlite3.Error as e:
log.info(ERROR_WHILE_RUNNING_QUERY.format(e))
return pd.DataFrame()
def get_all_ddls(self, connection, database: str) -> pd.DataFrame:
"""
A method to get all DDLs in the database.
Parameters:
connection (Connection): A connection object
database (str): The name of the database
Returns:
pd.DataFrame: The list of DDLs
"""
self.validate_connection(connection)
df_tables = self.get_table_names(connection, database)
df_ddl = pd.DataFrame(columns=['Table', 'DDL'])
for _, row in df_tables.iterrows():
table_name = row['name']
ddl_df = self.get_ddl(connection, table_name)
df_ddl = df_ddl._append({'Table': table_name, 'DDL': ddl_df}, ignore_index=True)
return df_ddl
def get_ddl(self, connection, table_name: str, **kwargs) -> str:
"""
A method to get the DDL for the table.
Parameters:
connection (Connection): A connection object
table_name (str): The name of the table
Returns:
str: The DDL for the table
"""
self.validate_connection(connection)
ddl_df = pd.read_sql_query(SQLLITE_TRAINING_DATASET_QUERY_CONSTANT.format(table_name), connection)
return ddl_df["sql"].iloc[0]
def get_dialect(self) -> str:
"""
A method to get the dialect of the database.
Returns:
str: The dialect
"""
return 'sqlite3'