Skip to content

Commit 670856f

Browse files
committed
Create topics and subscriptions and don't use pre created
1 parent af4cda9 commit 670856f

4 files changed

Lines changed: 48 additions & 23 deletions

File tree

tests/base_test_agent.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,9 @@ def __init__(self):
2626

2727
def establish_links(self):
2828
self.project = config.CONFIG['project']
29-
self.down_subscriber = Subscriber()
30-
self.down_subscriber.subscribe(self.project, config.CONFIG['topic_name_downstream']
31-
, self.callback)
32-
self.up_publisher = Publisher()
33-
self.up_publisher.enable(self.project, config.CONFIG['topic_name_upstream'])
29+
self.down_subscriber = Subscriber(self.project, os.environ['SUBSCRIPTION_NAME_DOWNSTREAM'])
30+
self.down_subscriber.subscribe(self.callback)
31+
self.up_publisher = Publisher(self.project, os.environ['TOPIC_PATH_UPSTREAM'])
3432

3533
def start_node(self):
3634
logging.info('seeders:' + os.environ['SEEDERS'])

tests/base_test_ci.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,20 @@ def setUp(self):
2121
self.message = b'NULL'
2222

2323
self.project = config.CONFIG['project']
24-
self.up_subscriber = Subscriber()
25-
self.up_subscriber.subscribe(self.project, config.CONFIG['topic_name_upstream'], self.callback)
26-
self.down_publisher = Publisher()
27-
self.down_publisher.enable(self.project, config.CONFIG['topic_name_downstream'])
24+
self.up_publisher = Publisher(self.project)
25+
self.up_topic_path = self.up_publisher.create()
26+
self.up_publisher.add_subscription().subscribe(self.callback)
27+
self.down_publisher = Publisher(self.project)
28+
self.down_publisher.create()
2829

2930
self.agents = 0
3031
self.messages = []
3132

3233
def tearDown(self):
3334
for self.phase in self.phases:
3435
self.send('END')
35-
self.up_subscriber.delete()
36+
self.up_publisher.delete()
37+
self.down_publisher.delete()
3638

3739
def callback(self, message):
3840
if self.phase != message.attributes['phase']:
@@ -68,7 +70,8 @@ def send_and_wait(self, data, nodes):
6870
def start_node_agent_pair(self, seeders=config.CONFIG['no_seeders'], bootstrap = 'false', randcon = 5):
6971
docker = Docker()
7072
docker.stop('agent_' + str(self.agents))
71-
cmd = 'docker run --network=devnet --name agent_' + str(self.agents) + ' -v /root/spacemesh/devnet/tests:/opt/devnet -v /root/spacemesh/devnet/logs' + str(self.agents) + ':/opt/logs -v /root/spacemesh/devnet/cnf' + str(self.agents) + ':/opt/cnf/ -e SUBSCRIPTION_NAME_DOWNSTREAM=devnet_tests_agent_' + str(self.agents) + ' -e PHASE=' + self.phase + ' -e BOOTSTRAP=' + bootstrap +' -e NODE=' + str(self.agents) + ' -e SEEDERS=' + seeders + ' -e RANDCON=' + str(randcon) + ' spacemesh/devnet_agent:latest python3 /opt/devnet/base_test_agent.py'
73+
down_subscription = self.down_publisher.add_subscription()
74+
cmd = 'docker run --network=devnet --name agent_' + str(self.agents) + ' -v /root/spacemesh/devnet/tests:/opt/devnet -v /root/spacemesh/devnet/logs' + str(self.agents) + ':/opt/logs -v /root/spacemesh/devnet/cnf' + str(self.agents) + ':/opt/cnf/ -e SUBSCRIPTION_NAME_DOWNSTREAM=' + down_subscription + ' -e TOPIC_PATH_UPSTREAM=' + self.up_topic_path + ' -e PHASE=' + self.phase + ' -e BOOTSTRAP=' + bootstrap +' -e NODE=' + str(self.agents) + ' -e SEEDERS=' + seeders + ' -e RANDCON=' + str(randcon) + ' spacemesh/devnet_agent:latest python3 /opt/devnet/base_test_agent.py'
7275
docker.start(cmd)
7376
self.agents += 1
7477

tests/config.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
CONFIG = {
22
'project': 'spacemesh-198810',
3-
'topic_name_upstream': 'devnet_tests',
4-
'subscription_name_upstream': 'devnet_tests_ci',
5-
'topic_name_downstream': 'devnet_tests_downstream',
6-
'subscription_name_downstream': 'devnet_tests_agent',
73
'host': '127.0.0.1',
84
'host_user': 'deploy',
95
'host_password': 'deploy_password',

tests/topics.py

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,50 @@
66
from logging import Logger
77

88
class Publisher():
9-
def enable(self, project, topic_name):
9+
def __init__(self, project, topic_path = ''):
10+
self.project = project
11+
self.topic_path = topic_path
1012
self.publisher = pubsub_v1.PublisherClient()
11-
self.topic_path = self.publisher.topic_path(project, topic_name)
13+
14+
def create(self):
15+
self.subscribers = []
16+
topic_name = 'devnet_topic_' + str(random.randint(0, 9999999999)) + '_' + str(calendar.timegm(time.gmtime()))
17+
self.topic_path = self.publisher.topic_path(self.project, topic_name)
18+
publisher.create_topic(self.topic_path)
19+
return self.topic_path
1220

1321
def publish(self, **kwargs):
1422
self.publisher.publish(self.topic_path, **kwargs)
1523

24+
def add_subscription(self):
25+
subscriber = Subscriber(self.project)
26+
subscriber.create_for_publisher(self.topic_path)
27+
self.subscribers.append(subscriber)
28+
return subscriber
29+
30+
def delete(self):
31+
for s in self.subscribers:
32+
s.delete()
33+
self.publisher.delete_topic(self.topic_path)
34+
1635
class Subscriber():
17-
def subscribe(self, project, topic_name, callback):
36+
def __init__(self, project, subscription_path = ''):
37+
self.project = project
38+
self.subscription_path = subscription_path
39+
self.subscriber = pubsub_v1.SubscriberClient()
40+
41+
def create(self, topic_name):
1842
self.publisher = pubsub_v1.PublisherClient()
19-
topic_path = self.publisher.topic_path(project, topic_name)
43+
topic_path = self.publisher.topic_path(self.project, topic_name)
44+
return self.create_for_publisher(self, topic_path)
2045

21-
self.subscriber = pubsub_v1.SubscriberClient()
22-
subscription_name = 'devnet_sub_' + str(random.randint(0, 9999999999)) + '_' + str(calendar.timegm(time.gmtime()))
23-
self.subscription_path = self.subscriber.subscription_path(project, subscription_name)
24-
subscription = self.subscriber.create_subscription(self.subscription_path, topic_path)
46+
def create_for_publisher(self, topic_path):
47+
self.subscription_name = 'devnet_sub_' + str(random.randint(0, 9999999999)) + '_' + str(calendar.timegm(time.gmtime()))
48+
self.subscription_path = self.subscriber.subscription_path(self.project, self.subscription_name)
49+
self.subscriber.create_subscription(self.subscription_path, topic_path)
50+
return self.subscription_name
51+
52+
def subscribe(self, callback):
2553
self.subscriber.subscribe(self.subscription_path, callback=callback)
2654

2755
def delete(self):

0 commit comments

Comments
 (0)