File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change 1+ #!/usr/bin/env python3
2+ # -*- coding: utf-8 -*-
3+
4+ import random , time , queue
5+ from multiprocessing .managers import BaseManager
6+
7+ # 发送任务的队列:
8+ task_queue = queue .Queue ()
9+ # 接收结果的队列:
10+ result_queue = queue .Queue ()
11+
12+ # 从BaseManager继承的QueueManager:
13+ class QueueManager (BaseManager ):
14+ pass
15+
16+ # 把两个Queue都注册到网络上, callable参数关联了Queue对象:
17+ QueueManager .register ('get_task_queue' , callable = lambda : task_queue )
18+ QueueManager .register ('get_result_queue' , callable = lambda : result_queue )
19+ # 绑定端口5000, 设置验证码'abc':
20+ manager = QueueManager (address = ('' , 5000 ), authkey = b'abc' )
21+ # 启动Queue:
22+ manager .start ()
23+ # 获得通过网络访问的Queue对象:
24+ task = manager .get_task_queue ()
25+ result = manager .get_result_queue ()
26+ # 放几个任务进去:
27+ for i in range (10 ):
28+ n = random .randint (0 , 10000 )
29+ print ('Put task %d...' % n )
30+ task .put (n )
31+ # 从result队列读取结果:
32+ print ('Try get results...' )
33+ for i in range (10 ):
34+ r = result .get (timeout = 10 )
35+ print ('Result: %s' % r )
36+ # 关闭:
37+ manager .shutdown ()
38+ print ('master exit.' )
Original file line number Diff line number Diff line change 1+ #!/usr/bin/env python3
2+ # -*- coding: utf-8 -*-
3+
4+ import time , sys , queue
5+ from multiprocessing .managers import BaseManager
6+
7+ # 创建类似的QueueManager:
8+ class QueueManager (BaseManager ):
9+ pass
10+
11+ # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
12+ QueueManager .register ('get_task_queue' )
13+ QueueManager .register ('get_result_queue' )
14+
15+ # 连接到服务器,也就是运行taskmanager.py的机器:
16+ server_addr = '127.0.0.1'
17+ print ('Connect to server %s...' % server_addr )
18+ # 端口和验证码注意保持与taskmanager.py设置的完全一致:
19+ m = QueueManager (address = (server_addr , 5000 ), authkey = b'abc' )
20+ # 从网络连接:
21+ m .connect ()
22+ # 获取Queue的对象:
23+ task = m .get_task_queue ()
24+ result = m .get_result_queue ()
25+ # 从task队列取任务,并把结果写入result队列:
26+ for i in range (10 ):
27+ try :
28+ n = task .get (timeout = 1 )
29+ print ('run task %d * %d...' % (n , n ))
30+ r = '%d * %d = %d' % (n , n , n * n )
31+ time .sleep (1 )
32+ result .put (r )
33+ except Queue .Empty :
34+ print ('task queue is empty.' )
35+ # 处理结束:
36+ print ('worker exit.' )
You can’t perform that action at this time.
0 commit comments