|
6 | 6 | from logging import Logger |
7 | 7 |
|
8 | 8 | class Publisher(): |
9 | | - def enable(self, project, topic_name): |
| 9 | + def __init__(self, project, topic_path = ''): |
| 10 | + self.project = project |
| 11 | + self.topic_path = topic_path |
10 | 12 | self.publisher = pubsub_v1.PublisherClient() |
11 | | - self.topic_path = self.publisher.topic_path(project, topic_name) |
| 13 | + |
| 14 | + def create(self): |
| 15 | + self.subscribers = [] |
| 16 | + topic_name = 'devnet_topic_' + str(random.randint(0, 9999999999)) + '_' + str(calendar.timegm(time.gmtime())) |
| 17 | + self.topic_path = self.publisher.topic_path(self.project, topic_name) |
| 18 | + publisher.create_topic(self.topic_path) |
| 19 | + return self.topic_path |
12 | 20 |
|
13 | 21 | def publish(self, **kwargs): |
14 | 22 | self.publisher.publish(self.topic_path, **kwargs) |
15 | 23 |
|
| 24 | + def add_subscription(self): |
| 25 | + subscriber = Subscriber(self.project) |
| 26 | + subscriber.create_for_publisher(self.topic_path) |
| 27 | + self.subscribers.append(subscriber) |
| 28 | + return subscriber |
| 29 | + |
| 30 | + def delete(self): |
| 31 | + for s in self.subscribers: |
| 32 | + s.delete() |
| 33 | + self.publisher.delete_topic(self.topic_path) |
| 34 | + |
16 | 35 | class Subscriber(): |
17 | | - def subscribe(self, project, topic_name, callback): |
| 36 | + def __init__(self, project, subscription_path = ''): |
| 37 | + self.project = project |
| 38 | + self.subscription_path = subscription_path |
| 39 | + self.subscriber = pubsub_v1.SubscriberClient() |
| 40 | + |
| 41 | + def create(self, topic_name): |
18 | 42 | self.publisher = pubsub_v1.PublisherClient() |
19 | | - topic_path = self.publisher.topic_path(project, topic_name) |
| 43 | + topic_path = self.publisher.topic_path(self.project, topic_name) |
| 44 | + return self.create_for_publisher(self, topic_path) |
20 | 45 |
|
21 | | - self.subscriber = pubsub_v1.SubscriberClient() |
22 | | - subscription_name = 'devnet_sub_' + str(random.randint(0, 9999999999)) + '_' + str(calendar.timegm(time.gmtime())) |
23 | | - self.subscription_path = self.subscriber.subscription_path(project, subscription_name) |
24 | | - subscription = self.subscriber.create_subscription(self.subscription_path, topic_path) |
| 46 | + def create_for_publisher(self, topic_path): |
| 47 | + self.subscription_name = 'devnet_sub_' + str(random.randint(0, 9999999999)) + '_' + str(calendar.timegm(time.gmtime())) |
| 48 | + self.subscription_path = self.subscriber.subscription_path(self.project, self.subscription_name) |
| 49 | + self.subscriber.create_subscription(self.subscription_path, topic_path) |
| 50 | + return self.subscription_name |
| 51 | + |
| 52 | + def subscribe(self, callback): |
25 | 53 | self.subscriber.subscribe(self.subscription_path, callback=callback) |
26 | 54 |
|
27 | 55 | def delete(self): |
|
0 commit comments