Skip to content

Commit f9306d8

Browse files
committed
learning at 20191014
1 parent 459c6de commit f9306d8

3 files changed

Lines changed: 151 additions & 0 deletions

File tree

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import os, cv2, time, struct, threading
2+
from http.server import HTTPServer, BaseHTTPRequestHandler
3+
from socketserver import TCPServer, ThreadingTCPServer
4+
from threading import Thread, RLock
5+
from select import select
6+
7+
class JpegStream(Thread):
8+
def __init__(self, camera):
9+
super().__init__()
10+
self.cap =cv2.VideoCapture(camera)
11+
self.lock = RLock()
12+
self.pipes = {}
13+
14+
def register(self):
15+
pr, pw = os.pipe()
16+
self.lock.acquire()
17+
self.pipes[pr] = pw
18+
self.lock.release()
19+
return pr
20+
21+
def unregister(self, pr):
22+
self.lock.acquire()
23+
pw = self.pipes.pop(pr)
24+
self.lock.release()
25+
os.close(pr)
26+
os.close(pw)
27+
28+
def capture(self):
29+
cap = self.cap
30+
while cap.isOpened():
31+
ret, frame = cap.read()
32+
if ret:
33+
ret, data = cv2.imencode('.jpg', frame, (cv2.IMWRITE_JPEG_QUALITY,40))
34+
yield data.tostring
35+
36+
def send_frame(self, frame):
37+
n = struct.pack('l', len(frame))
38+
self.lock.acquire()
39+
if len(self.pipes):
40+
_, pipes, _ = select([], self.pipes.values(), [], 1)
41+
for pipe in pipes:
42+
os.write(pipe, n)
43+
os.write(pipe, frame)
44+
self.lock.release()
45+
46+
def run(self):
47+
for frame in self.capture():
48+
self.send_frame(frame)
49+
50+
class JpegRetriever:
51+
def __init__(self, streamer):
52+
self.streamer = streamer
53+
# 使用线程本地数据
54+
self.local = threading.local()
55+
56+
def retrieve(self):
57+
while True:
58+
ns = os.read(self.local.pipe, 8)
59+
n = struct.unpack('l', ns)[0]
60+
data = os.read(self.local.pipe, n)
61+
yield data
62+
63+
def __enter__(self):
64+
if hasattr(self.local, 'pipe'):
65+
raise RuntimeError()
66+
67+
# 将所有pipe变成线程本地数据
68+
# self.pipe = streamer.register()
69+
self.local.pipe = streamer.register()
70+
return self.retrieve()
71+
72+
def __exit__(self, *args):
73+
self.streamer.unregister(self.local.pipe)
74+
del self.local.pipe
75+
return True
76+
77+
class WebHandler(BaseHTTPRequestHandler):
78+
retriever = None
79+
80+
@staticmethod
81+
def set_retriever(retriever):
82+
WebHandler,retriever = retriever
83+
84+
def do_GET(self):
85+
if self.retriever is None:
86+
raise RuntimeError('no retriever')
87+
88+
if self.path != '/':
89+
return
90+
91+
self.send_response(200)
92+
self.send_header('Content-Type','multipart/x-mixed-replace;boundary=jpeg_frame')
93+
self.end_headers()
94+
95+
with self.retriever as frames:
96+
for frame in frames:
97+
self.send_frame(frame)
98+
99+
def send_fame(self, frame):
100+
sh = b'--jpeg_frame\r\n'
101+
sh += b'Content-Type: image/jpeg\r\n'
102+
sh += b'Content-Length: %d\r\n\r\n' % len(frame)
103+
self.wfile.write(sh)
104+
self.wfile.write(frame)
105+
106+
from concurrent.futures import ThreadPoolExecutor
107+
108+
class ThreadingPoolTCPServer(ThreadingTCPServer):
109+
def __init__(self, server_address, RequestHandlerClass, bind_and_active=True, thread_n=100):
110+
super.__init__(server_address, RequestHandlerClass, bind_and_active=True)
111+
self.executor = ThreadPoolExecutor(thread_n)
112+
113+
def process_request(self, request, client_address):
114+
self.executor.submit(self, process_request_thread, request, client_address)
115+
if __name__ == "__main__":
116+
# 创建Streamer, 开启摄像头采集
117+
streamer = JpegStreamer(0)
118+
streamer.start()
119+
120+
# http服务创建Retriever
121+
retriever = JpegRetriever(streamer)
122+
WebHandler.set_retriever(retriever)
123+
124+
# 开启http服务器
125+
HOST = 'localhost'
126+
PORT = 9000
127+
print('Start Server ... (http://%s:%s)' % (HOST, PORT))
128+
# 为了支持多线程,需要将TCPServer改成ThreadingTCPServer,多线程版本
129+
# 每个线程需要独立的管道
130+
# httpd = TCPServer((HOST, PORT), WebHandler)
131+
# httpd = ThreadingTCPServer((HOST, PORT), WebHandler)
132+
httpd = ThreadingPoolTCPServer((HOST, PORT), WebHandler, thread_n=3)
133+
httpd.serve_forever()
134+

python3_programming_tricks/ch08/8-6.py

Whitespace-only changes.

python3_programming_tricks/ch08/8_multithread_and_multiprocess.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,20 @@ GIL属于进程,不同进程中的GIL是相互独立的
7474
之前实现了一个多线程web视频监控服务器,由于服务器资源有限(CPU,内存,带宽),需要对请求连接数(线程数)做限制,避免因资源耗尽而瘫痪。
7575

7676
可以使用线程池,替代原来的每次请求创建线程。
77+
线程池:提前创建固定数量的线程,用的时候去池子中取,用完在放回
78+
79+
**解决方案**
80+
81+
使用标准库中concurrent.futures下的ThreadPoolExecutor对象的submit和map方法可以用来启动线程池中线程执行任务。
82+
83+
## 8.6 如何使用多进程
84+
85+
**实际案例**
86+
87+
由于python中全局解释器锁(GIL)的存在,在任意时刻只允许一个线程在解释器中运行。因此python的多线程不适合处理CPU密集型任务
88+
89+
想要处理CPU密集型任务,可以使用多进程模型。
90+
91+
**解决方案**
92+
93+
使用标准库中multiprocessing.Process, 它可以启动子进程执行任务操作接口,进程间通信,进程间同步等都与Threading.Thread类似

0 commit comments

Comments
 (0)