forked from bajins/scripts_python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhttp_util.py
More file actions
346 lines (299 loc) · 9.96 KB
/
http_util.py
File metadata and controls
346 lines (299 loc) · 9.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# @Author : bajins https://www.bajins.com
# @File : http_util.py
# @Version: 1.0.0
# @Time : 2019/8/21 15:32
# @Project: windows-wallpaper-python
# @Package:
# @Software: PyCharm
import asyncio
import io
import json
import os
import socket
import time
import urllib
import aiofiles
import aiohttp
import requests
import urllib3
# 去除警告
import constants
requests.packages.urllib3.disable_warnings()
# 如果请求失败默认重试次数
requests.adapters.DEFAULT_RETRIES = 5
def get_session():
"""
获取session
:param url:请求地址
:param data:数据,map或dict格式
:return:
"""
session = requests.sessions.Session()
# 关闭多余的连接
session.keep_alive = False
session.headers["User-Agent"] = constants.USER_AGENT
session.verify = False
session.timeout = 600
return session
def get(url, data=None):
"""
get请求
:param url:请求地址
:param data:数据,map或dict格式
:return:
"""
session = requests.sessions.Session()
# 关闭多余的连接
session.keep_alive = False
return session.get(url, params=data, headers={"User-Agent": constants.USER_AGENT}, verify=False, timeout=600)
def post(url, data):
"""
get请求
:param url:请求地址
:param data:数据,map或dict格式
:return:
"""
return requests.post(url, data, headers={"User-Agent": constants.USER_AGENT}, verify=False, timeout=600)
def delete(url, data):
"""
delete请求
:param url:请求地址
:param data:数据,map或dict格式
:return:
"""
return requests.delete(url=url, params=data, headers={"User-Agent": constants.USER_AGENT}, verify=False, timeout=600)
def get_json(url, data):
"""
get请求返回结果转json
:param url:
:param data:
:return:
"""
return json.loads(get(url=url, data=data).text)
def download_big_file_urlib(url, mkdir, name=""):
"""
使用urlib下载大文件和小文件都差不多,
只不过在read()函数中指定了每次读入文件的大小
不能使用迭代器,只能使用for循环,还得判断最后文件是否读完
:param url:
:param mkdir:
:param name:
:return:
"""
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
# 判断文件名称是否传入
if name is None or name == "":
ur = str(url).split("/")
# 如果没传,就取URL中最后的文件名
name = ur[len(ur) - 1]
# 判断是否传入文件夹
if mkdir is not None and mkdir != "":
# 判断目录是否存在
if not os.path.exists(mkdir):
# 目录不存在则创建
os.mkdir(mkdir)
name = os.path.join(mkdir, name)
req = Request(url)
# 增加header头信息
req.add_header('User-Agent', constants.USER_AGENT)
response = urlopen(req)
while True:
# 在read()中指定读入块的大小
tmp = response.read(8196)
if not tmp:
break
response.close()
with open(name, 'wb') as f:
f.write(tmp)
def download_big_file(url, mkdir, name=""):
"""
用requests下载大文件,边下边写
当把get函数的stream参数设置成True时,它不会立即开始下载,
当你使用iter_content或iter_lines遍历内容或访问内容属性时才开始下载。
需要注意一点:文件没有下载之前,它也需要保持连接。
iter_content:一块一块的遍历要下载的内容
iter_lines:一行一行的遍历要下载的内容
:param url:
:param mkdir:
:param name:
:return:
"""
# 判断文件名称是否传入
if name is None or name == "":
ur = str(url).split("/")
# 如果没传,就取URL中最后的文件名
name = ur[len(ur) - 1]
# 判断是否传入文件夹
if mkdir is not None and mkdir != "":
# 判断目录是否存在
if not os.path.exists(mkdir):
# 目录不存在则创建
os.mkdir(mkdir)
name = os.path.join(mkdir, name)
start_time = time.time()
req = requests.get(url, stream=True, headers={"User-Agent": constants.USER_AGENT}, verify=False)
with req as r:
content_length = int(r.headers['content-length'])
print(name, 'content-length: %dB/%.2fKB/%.2fMB' % (
content_length, content_length / 1024, content_length / 1024 / 1024))
down_size = 0
with open(name, 'wb') as f:
for chunk in r.iter_content(8196):
if chunk:
f.write(chunk)
down_size += len(chunk)
print(name, '%d KB/s - %.2f MB,共 %.2f MB' % (
down_size / 1024 / (time.time() - start_time), down_size / 1024 / 1024,
content_length / 1024 / 1024), end='\r')
if down_size >= content_length:
break
time_cost = time.time() - start_time
print(name, '共耗时:%.2f s,平均速度:%.2f KB/s' % (time_cost, down_size / 1024 / time_cost))
def download_file(url, mkdir, name=""):
"""
用requests下载文件,一次性读取到内存中然后写入
:param url:
:param mkdir:
:param name:
:return:
"""
# detectionModule("requests")
# 判断文件名称是否传入
if name is None or name == "":
ur = str(url).split("/")
# 如果没传,就取URL中最后的文件名
name = ur[len(ur) - 1]
# 判断是否传入文件夹
if mkdir is not None and mkdir != "":
# 判断目录是否存在
if not os.path.exists(mkdir):
# 目录不存在则创建
# os.mkdir(mkdir)
os.makedirs(mkdir)
name = os.path.join(mkdir, name)
# 判断文件是否存在
# if not os.path.exists(name):
if not os.path.isfile(name):
with requests.get(url, headers={"User-Agent": constants.USER_AGENT}, verify=False, timeout=600) as req:
with open(name, "wb") as f:
f.write(req.content)
return name
def save_file(fd: io.BufferedWriter, chunk):
fd.write(chunk)
async def download_one_fetch_async(url: str, mkdir: str, name: str):
"""
异步分块下载一个文件
:param url:
:param mkdir:
:param name:
:return:
"""
# 判断文件名称是否传入
if name is None or name == "":
ur = str(url).split("/")
# 如果没传,就取URL中最后的文件名
name = ur[len(ur) - 1]
# 判断是否传入文件夹
if mkdir is not None and mkdir != "":
# 判断目录是否存在
if not os.path.exists(mkdir):
# 目录不存在则创建
# os.mkdir(mkdir)
os.makedirs(mkdir)
name = os.path.join(mkdir, name)
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
with open(name, 'wb') as f:
while 1:
chunk = await resp.content.read(8196)
if not chunk:
break
# f.write(chunk)
lp = asyncio.get_event_loop()
# None事件循环中包含一个默认的线程池(ThreadPoolExecutor)
lp.run_in_executor(None, save_file, f, chunk)
async def download_one_async(url, mkdir, name):
"""
异步下载一个文件
:param url:
:param mkdir:
:param name:
:return:
"""
# 判断文件名称是否传入
if name is None or name == "":
ur = str(url).split("/")
# 如果没传,就取URL中最后的文件名
name = ur[len(ur) - 1]
# 判断是否传入文件夹
if mkdir is not None and mkdir != "":
# 判断目录是否存在
if not os.path.exists(mkdir):
# 目录不存在则创建
# os.mkdir(mkdir)
os.makedirs(mkdir)
name = os.path.join(mkdir, name)
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
async with aiofiles.open(name, 'wb') as f:
await f.write(await response.read())
def download_file_list(urls, mkdir, name):
"""
用urllib批量下载文件
:param urls:
:param mkdir:
:param name:
:return:
"""
# 老版本去除警告方法
# from requests.packages.urllib3.exceptions import InsecureRequestWarning
# requests.packages.disable_warnings(InsecureRequestWarning)
# 新版去除警告方法
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
for url in urls:
# 判断文件名称是否传入
if name.strip() == '':
ur = str(url).split("/")
# 如果没传,就取URL中最后的文件名
name = ur[len(ur) - 1]
# 判断是否传入文件夹
if mkdir.strip() != '':
# 判断目录是否存在
if not os.path.exists(mkdir):
# 目录不存在则创建
os.mkdir(mkdir)
name = mkdir + name
# os.path.join将多个路径组合后返回
# LocalPath = os.path.join('C:/Users/goatbishop/Desktop',file)
# 第一个参数url:需要下载的网络资源的URL地址
# 第二个参数LocalPath:文件下载到本地后的路径
urllib.request.urlretrieve(url, name)
# response = urllib.request.urlopen(url)
# pic = response.read()
# with open(name, 'wb') as f:
# f.write(pic)
def get_host_ip():
"""
查询本机ip地址
:return: ip
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
def get_remote_ip(host_name):
"""
获取指定域名IP地址
:param host_name:域名
:return:
"""
try:
return socket.gethostbyname(host_name)
except BaseException as e:
print(" %s:%s" % (host_name, e))