|
| 1 | + |
| 2 | +import multiprocessing |
| 3 | +import random |
| 4 | + |
| 5 | +from mixpanel import Mixpanel |
| 6 | +from mixpanel.consumer import BufferedConsumer |
| 7 | + |
| 8 | +''' |
| 9 | +As your application scales, it's likely you'll want to |
| 10 | +to detect events in one place and send them somewhere |
| 11 | +else. For example, you might write the events to a queue |
| 12 | +to be consumed by another process. |
| 13 | +
|
| 14 | +This demo shows how you might do things, using |
| 15 | +a custom Consumer to consume events, and a |
| 16 | +and a BufferedConsumer to send them to Mixpanel |
| 17 | +''' |
| 18 | + |
| 19 | +''' |
| 20 | +You can provide custom communication behaviors |
| 21 | +by providing your own consumer object to the |
| 22 | +Mixpanel constructor. Consumers are expected to |
| 23 | +have a single method, 'send', that takes an |
| 24 | +endpoint and a json message. |
| 25 | +''' |
| 26 | +class QueueWriteConsumer(object): |
| 27 | + def __init__(self, queue): |
| 28 | + self.queue = queue |
| 29 | + |
| 30 | + def send(self, endpoint, json_message): |
| 31 | + self.queue.put((endpoint, json_message)) |
| 32 | + |
| 33 | +def do_tracking(project_token, distinct_id, queue): |
| 34 | + ''' |
| 35 | + This process represents the work process where events |
| 36 | + and updates are generated. This might be the service |
| 37 | + thread of a web service, or some other process that |
| 38 | + is mostly concerned with getting time-sensitive work |
| 39 | + done. |
| 40 | + ''' |
| 41 | + consumer = QueueWriteConsumer(queue) |
| 42 | + mp = Mixpanel(project_token, consumer) |
| 43 | + for i in xrange(100): |
| 44 | + event = 'Tick' |
| 45 | + mp.track(distinct_id, 'Tick', { 'Tick Number': i }) |
| 46 | + print 'tick {0}'.format(i) |
| 47 | + |
| 48 | + queue.put(None) # tell worker we're out of jobs |
| 49 | + |
| 50 | +def do_sending(queue): |
| 51 | + ''' |
| 52 | + This process is the analytics worker process- it can |
| 53 | + wait on HTTP responses to Mixpanel without blocking |
| 54 | + other jobs. This might be a queue consumer process |
| 55 | + or just a separate thread from the code that observes |
| 56 | + the things you want to measure. |
| 57 | + ''' |
| 58 | + consumer = BufferedConsumer() |
| 59 | + payload = queue.get() |
| 60 | + while payload is not None: |
| 61 | + consumer.send(*payload) |
| 62 | + payload = queue.get() |
| 63 | + |
| 64 | + consumer.flush() |
| 65 | + |
| 66 | +if __name__ == '__main__': |
| 67 | + # replace token with your real project token |
| 68 | + token = '0ba349286c780fe53d8b4617d90e2d01' |
| 69 | + distinct_id = ''.join(random.choice('ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for x in xrange(32)) |
| 70 | + |
| 71 | + queue = multiprocessing.Queue() |
| 72 | + sender = multiprocessing.Process(target=do_sending, args=(queue,)) |
| 73 | + tracker = multiprocessing.Process(target=do_tracking, args=(token, distinct_id, queue)) |
| 74 | + |
| 75 | + sender.start() |
| 76 | + tracker.start() |
| 77 | + tracker.join() |
| 78 | + sender.join() |
0 commit comments