forked from DonJayamanne/pythonVSCode
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtools.py
More file actions
108 lines (95 loc) · 3.23 KB
/
tools.py
File metadata and controls
108 lines (95 loc) · 3.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import os
import os.path
import requests
import shutil
import time
from progress.bar import Bar
from subprocess import run, PIPE, Popen
def ensure_directory(dir: str):
if not os.path.exists(dir):
os.makedirs(dir)
def run_command(command, cwd=None, silent=False, progress_message=None, env=None):
"""Run the specified command in a subprocess shell."""
if progress_message is not None:
print(progress_message)
executable = shutil.which(command[0])
command[0] = executable
print(command)
stdout = PIPE if silent else None
run(command, cwd=cwd, stdout=stdout, shell=False, check=True, env=env)
# if progress_message:
# progress = Spinner(progress_message)
# while True:
# try:
# exit_code = proc.wait(1)
# except Exception:
# if progress:
# progress.next()
# continue
# print(exit_code)
# if exit_code == 0:
# return
# if exit_code is not None:
# raise SystemError(
# "Command exited with a non-zero exit code," + command
# ) # noqa
def run_command_and_kill(
command, cwd=None, silent=False, progress_message=None, timeout=10
):
"""Run the specified command in a subprocess shell."""
if progress_message is not None:
print(progress_message)
executable = shutil.which(command[0])
command[0] = executable
print(command)
stdout = PIPE if silent else None
proc = run(command, cwd=cwd, stdout=stdout, shell=False)
time.sleep(timeout)
procc.kill()
def unzip_file(zip_file: str, destination: str):
"""
Because python sucks at unzipping.`
:param zip_file:str:
:param destination:str:
"""
run_command(
["unzip", zip_file, "-d", destination],
silent=True,
progress_message="Extracting zip file",
)
def download_file(url: str, download_file: str, progress_message="Downloading"): # noqa
"""
Downloads a file and optionally displays a progress indicator.
:param url:str: Resource to download.
:param download_file:str: Destination file.
:param progress_message=None: Message to be displayed.
"""
download_file = os.path.abspath(download_file)
try:
os.remove(download_file)
except FileNotFoundError:
pass
progress = Bar(progress_message, max=100)
response = requests.get(url, stream=True)
total = response.headers.get("content-length")
try:
with open(download_file, "wb") as fs:
if total is None:
fs.write(response.content)
else:
downloaded = 0
total = int(total)
chunk_size = 1024 * 1024
percent = 0
for data in response.iter_content(chunk_size=chunk_size):
downloaded += len(data)
fs.write(data)
change_in_percent = int(downloaded * 100 / total) - percent
percent = int(downloaded * 100 / total)
for i in range(change_in_percent):
progress.next()
except Exception:
os.remove(download_file)
raise
finally:
progress.finish()