-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathexec.py
More file actions
90 lines (79 loc) · 2.53 KB
/
exec.py
File metadata and controls
90 lines (79 loc) · 2.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import asyncio
from asyncio.subprocess import SubprocessStreamProtocol
import functools
import os
import pathlib
import subprocess
from typing import Dict, List, Optional
from ceph_devstack import logger, VERBOSE
class LoggingStreamProtocol(asyncio.subprocess.SubprocessStreamProtocol):
def __init__(self, limit, loop, log_level):
self.log_level = log_level
super().__init__(limit=limit, loop=loop)
def pipe_data_received(self, fd, data):
logger.log(
self.log_level,
(data.decode() if isinstance(data, bytes) else str(data)).rstrip("\n"),
)
super().pipe_data_received(fd, data)
class Command:
def __init__(
self,
args: List[str],
cwd: Optional[pathlib.Path] = None,
env: Optional[Dict] = None,
stream_output: bool = False,
):
self.args = args
self.env = os.environ | (env or {})
self.kwargs: Dict = {
"stdout": asyncio.subprocess.PIPE,
"stderr": asyncio.subprocess.PIPE,
}
if cwd:
self.kwargs.update(cwd=cwd)
self.stream_output = stream_output
def _make_log_msg(self) -> str:
msg = "> " + " ".join(self.args)
if (cwd := str(self.kwargs.get("cwd", "."))) != ".":
msg = f"{msg} cwd='{cwd}'"
return msg
def run(self) -> subprocess.Popen:
logger.log(VERBOSE, self._make_log_msg())
proc = subprocess.Popen(
args=self.args,
env=self.env,
**self.kwargs,
)
proc.wait()
return proc
async def arun(self) -> asyncio.subprocess.Process:
logger.log(VERBOSE, self._make_log_msg())
loop = asyncio.get_running_loop()
protocol_factory: functools.partial[SubprocessStreamProtocol]
if self.stream_output:
protocol_factory = functools.partial(
LoggingStreamProtocol,
limit=2**16,
loop=loop,
log_level=VERBOSE,
)
else:
protocol_factory = functools.partial(
asyncio.subprocess.SubprocessStreamProtocol,
limit=2**16,
loop=loop,
)
transport, protocol = await loop.subprocess_exec(
protocol_factory,
*self.args,
env=self.env,
**self.kwargs,
)
return asyncio.subprocess.Process(
transport,
protocol,
loop,
)
def __str__(self):
return " ".join(self.args)