Skip to content

Commit cb77b24

Browse files
author
Joe Bowers
committed
Passing messages between processes demo
1 parent a8b9029 commit cb77b24

1 file changed

Lines changed: 78 additions & 0 deletions

File tree

demo/subprocess_consumer.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
2+
import multiprocessing
3+
import random
4+
5+
from mixpanel import Mixpanel
6+
from mixpanel.consumer import BufferedConsumer
7+
8+
'''
9+
As your application scales, it's likely you'll want to
10+
to detect events in one place and send them somewhere
11+
else. For example, you might write the events to a queue
12+
to be consumed by another process.
13+
14+
This demo shows how you might do things, using
15+
a custom Consumer to consume events, and a
16+
and a BufferedConsumer to send them to Mixpanel
17+
'''
18+
19+
'''
20+
You can provide custom communication behaviors
21+
by providing your own consumer object to the
22+
Mixpanel constructor. Consumers are expected to
23+
have a single method, 'send', that takes an
24+
endpoint and a json message.
25+
'''
26+
class QueueWriteConsumer(object):
27+
def __init__(self, queue):
28+
self.queue = queue
29+
30+
def send(self, endpoint, json_message):
31+
self.queue.put((endpoint, json_message))
32+
33+
def do_tracking(project_token, distinct_id, queue):
34+
'''
35+
This process represents the work process where events
36+
and updates are generated. This might be the service
37+
thread of a web service, or some other process that
38+
is mostly concerned with getting time-sensitive work
39+
done.
40+
'''
41+
consumer = QueueWriteConsumer(queue)
42+
mp = Mixpanel(project_token, consumer)
43+
for i in xrange(100):
44+
event = 'Tick'
45+
mp.track(distinct_id, 'Tick', { 'Tick Number': i })
46+
print 'tick {0}'.format(i)
47+
48+
queue.put(None) # tell worker we're out of jobs
49+
50+
def do_sending(queue):
51+
'''
52+
This process is the analytics worker process- it can
53+
wait on HTTP responses to Mixpanel without blocking
54+
other jobs. This might be a queue consumer process
55+
or just a separate thread from the code that observes
56+
the things you want to measure.
57+
'''
58+
consumer = BufferedConsumer()
59+
payload = queue.get()
60+
while payload is not None:
61+
consumer.send(*payload)
62+
payload = queue.get()
63+
64+
consumer.flush()
65+
66+
if __name__ == '__main__':
67+
# replace token with your real project token
68+
token = '0ba349286c780fe53d8b4617d90e2d01'
69+
distinct_id = ''.join(random.choice('ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for x in xrange(32))
70+
71+
queue = multiprocessing.Queue()
72+
sender = multiprocessing.Process(target=do_sending, args=(queue,))
73+
tracker = multiprocessing.Process(target=do_tracking, args=(token, distinct_id, queue))
74+
75+
sender.start()
76+
tracker.start()
77+
tracker.join()
78+
sender.join()

0 commit comments

Comments
 (0)