forked from cwjokaka/ok_ip_proxy_pool
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsqlite_opt.py
More file actions
143 lines (128 loc) · 4.13 KB
/
sqlite_opt.py
File metadata and controls
143 lines (128 loc) · 4.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from sqlalchemy.exc import IntegrityError
from setting import DB
from src.database.abs_database import AbsDatabase
from sqlalchemy import create_engine, desc
from sqlalchemy.orm import sessionmaker
from src.entity.proxy_entity import ProxyEntity
from src.log.logger import logger
import sqlite3
class SqliteOpt(AbsDatabase):
def __init__(self) -> None:
engine = create_engine(f'sqlite:///{DB["db_name"]}?check_same_thread=False', echo=True)
self._DBSession = sessionmaker(bind=engine)
def add_proxy(self, proxy):
session = self._DBSession()
session.add(proxy)
result = 0
# 提交即保存到数据库:
try:
session.commit()
result = 1
except IntegrityError as e:
logger.info(f'ip: {proxy.url} 已存在')
finally:
# 关闭session:
session.close()
return result
def get_all_proxies(self):
session = self._DBSession()
try:
return session.query(ProxyEntity).all()
except Exception as e:
logger.exception(e)
pass
finally:
session.close()
return []
def increase_reliability(self, url):
conn = self._get_connect()
cursor = conn.cursor()
try:
cursor.execute(f"""
UPDATE {DB["table_name"]} SET reliability = reliability + 1,
last_check_time=datetime(CURRENT_TIMESTAMP,'localtime'),
check_count = check_count + 1
WHERE url='{url}'
""")
conn.commit()
except Exception as e:
pass
# logger.exception(e)
finally:
cursor.close()
conn.close()
def reduce_reliability(self, url):
conn = self._get_connect()
cursor = conn.cursor()
try:
cursor.execute(f"""
UPDATE {DB["table_name"]} SET reliability = reliability - 1,
last_check_time=datetime(CURRENT_TIMESTAMP, 'localtime'),
check_count = check_count + 1
WHERE url='{url}'
""")
conn.commit()
except Exception as e:
pass
# logger.exception(e)
finally:
cursor.close()
conn.close()
def remove(self, key):
return super().remove(key)
def init_db(self):
conn = self._get_connect()
cursor = conn.cursor()
try:
cursor.execute(f"""
create table {DB["table_name"]}(
url varchar(36) not null,
source varchar(16),
supplier varchar(32),
proxy_type tinyint(3),
proxy_cover tinyint(3),
check_count int(10),
region varchar(36),
last_check_time text,
create_time text default (datetime(CURRENT_TIMESTAMP,'localtime')),
reliability integer not null default 0 check(reliability >= 0) check(reliability <= 15),
PRIMARY KEY ("url")
)
""")
except sqlite3.OperationalError as e:
logger.warn(e)
# logger.exception(e)
finally:
cursor.close()
conn.close()
def clean(self):
conn = self._get_connect()
cursor = conn.cursor()
try:
cursor.execute(f'DELETE FROM {DB["table_name"]}')
conn.commit()
finally:
cursor.close()
conn.close()
def get_one_in_page(self):
session = self._DBSession()
try:
return session.query(ProxyEntity).order_by(desc(ProxyEntity.reliability)).first()
except Exception as e:
logger.exception(e)
finally:
session.close()
return None
def get_all_in_page(self):
session = self._DBSession()
try:
return session.query(ProxyEntity).filter(ProxyEntity.reliability > 0).all()
except Exception as e:
logger.exception(e)
finally:
session.close()
return None
@staticmethod
def _get_connect():
return sqlite3.connect(DB['db_name'])
sqlite_opt = SqliteOpt()