This repository was archived by the owner on Apr 15, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 216
Expand file tree
/
Copy pathconftest.py
More file actions
139 lines (112 loc) · 3.78 KB
/
conftest.py
File metadata and controls
139 lines (112 loc) · 3.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import collections
import subprocess
import platform
import sys
import tempfile
import socket
import shlex
import uuid
import time
import json
import os
import requests
import pytest
import py
collect_ignore = []
if sys.version_info[0] < 3:
p = os.path.join(os.path.dirname(__file__), 'test_aio.py')
collect_ignore.append(p)
if sys.version_info[0] == 2 and sys.version_info[1] < 7:
p = os.path.join(os.path.dirname(__file__), 'test_twisted.py')
collect_ignore.append(p)
else:
pytest_plugins = "pytest_twisted"
def get_free_ports(num, host=None):
if not host:
host = '127.0.0.1'
sockets = []
ret = []
for i in range(num):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, 0))
ret.append(s.getsockname()[1])
sockets.append(s)
for s in sockets:
s.close()
return ret
def start_consul_instance(acl_master_token=None):
"""
starts a consul instance. if acl_master_token is None, acl will be disabled
for this server, otherwise it will be enabled and the master token will be
set to the supplied token
returns: a tuple of the instances process object and the http port the
instance is listening on
"""
ports = dict(zip(
['http', 'rpc', 'serf_lan', 'serf_wan', 'server', 'dns'],
get_free_ports(5) + [-1]))
config = {'ports': ports}
if acl_master_token:
config['acl_datacenter'] = 'dc1'
config['acl_master_token'] = acl_master_token
tmpdir = py.path.local(tempfile.mkdtemp())
tmpdir.join('config.json').write(json.dumps(config))
tmpdir.chdir()
(system, node, release, version, machine, processor) = platform.uname()
if system == 'Darwin':
postfix = 'osx'
else:
postfix = 'linux64'
bin = os.path.join(os.path.dirname(__file__), 'consul.'+postfix)
command = '{bin} agent -server -bootstrap' \
' -bind=127.0.0.1' \
' -config-dir=. -data-dir=./data'
command = command.format(bin=bin).strip()
command = shlex.split(command)
p = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# wait for consul instance to bootstrap
base_uri = 'http://127.0.0.1:%s/v1/' % ports['http']
while True:
time.sleep(0.1)
try:
response = requests.get(base_uri + 'status/leader')
except requests.ConnectionError:
continue
if response.text.strip() != '""':
break
requests.put(base_uri + 'agent/service/register', data='{"name": "foo"}')
while True:
response = requests.get(base_uri + 'health/service/foo')
if response.text.strip() != '[]':
break
time.sleep(0.1)
requests.get(base_uri + 'agent/service/deregister/foo')
# phew
time.sleep(2)
return p, ports['http']
@pytest.yield_fixture(scope="session")
def consul_instance():
p, port = start_consul_instance()
yield port
p.terminate()
@pytest.yield_fixture
def consul_port(consul_instance):
yield consul_instance
# remove all data from the instance, to have a clean start
base_uri = 'http://127.0.0.1:%s/v1/' % consul_instance
requests.delete(base_uri + 'kv/?recurse=1')
@pytest.yield_fixture(scope="session")
def acl_consul_instance():
acl_master_token = uuid.uuid4().hex
p, port = start_consul_instance(acl_master_token=acl_master_token)
yield port, acl_master_token
p.terminate()
@pytest.yield_fixture
def acl_consul(acl_consul_instance):
ACLConsul = collections.namedtuple('ACLConsul', ['port', 'token'])
port, token = acl_consul_instance
yield ACLConsul(port, token)
# remove all data from the instance, to have a clean start
base_uri = 'http://127.0.0.1:%s/v1/' % port
requests.delete(base_uri + 'kv/?recurse=1')