forked from splunk/splunk-sdk-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_index.py
More file actions
executable file
·183 lines (153 loc) · 7.28 KB
/
test_index.py
File metadata and controls
executable file
·183 lines (153 loc) · 7.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
#!/usr/bin/env python
#
# Copyright © 2011-2024 Splunk, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"): you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import time
import pytest
from tests import testlib
from splunklib import client
class IndexTest(testlib.SDKTestCase):
def setUp(self):
super().setUp()
self.index_name = testlib.tmpname()
self.index = self.service.indexes.create(self.index_name)
self.assertEventuallyTrue(lambda: self.index.refresh()['disabled'] == '0')
def tearDown(self):
super().tearDown()
# We can't delete an index with the REST API before Splunk
# 5.0. In 4.x, we just have to leave them lying around until
# someone cares to go clean them up. Unique naming prevents
# clashes, though.
if self.service.splunk_version >= (5,):
if self.index_name in self.service.indexes:
time.sleep(5)
self.service.indexes.delete(self.index_name)
self.assertEventuallyTrue(lambda: self.index_name not in self.service.indexes)
else:
logging.warning("test_index.py:TestDeleteIndex: Skipped: cannot "
"delete indexes via the REST API in Splunk 4.x")
def totalEventCount(self):
self.index.refresh()
return int(self.index['totalEventCount'])
def test_delete(self):
if self.service.splunk_version >= (5,):
self.assertTrue(self.index_name in self.service.indexes)
time.sleep(5)
self.service.indexes.delete(self.index_name)
self.assertEventuallyTrue(lambda: self.index_name not in self.service.indexes)
def test_integrity(self):
self.check_entity(self.index)
def test_default(self):
default = self.service.indexes.get_default()
self.assertTrue(isinstance(default, str))
def test_disable_enable(self):
self.index.disable()
self.index.refresh()
self.assertEqual(self.index['disabled'], '1')
self.index.enable()
self.index.refresh()
self.assertEqual(self.index['disabled'], '0')
# def test_submit_and_clean(self):
# self.index.refresh()
# original_count = int(self.index['totalEventCount'])
# self.index.submit("Hello again!", sourcetype="Boris", host="meep")
# self.assertEventuallyTrue(lambda: self.totalEventCount() == original_count+1, timeout=50)
# # Cleaning an enabled index on 4.x takes forever, so we disable it.
# # However, cleaning it on 5 requires it to be enabled.
# if self.service.splunk_version < (5,):
# self.index.disable()
# self.restartSplunk()
# self.index.clean(timeout=500)
# self.assertEqual(self.index['totalEventCount'], '0')
def test_prefresh(self):
self.assertEqual(self.index['disabled'], '0') # Index is prefreshed
def test_submit(self):
event_count = int(self.index['totalEventCount'])
self.assertEqual(self.index['sync'], '0')
self.assertEqual(self.index['disabled'], '0')
self.index.submit("Hello again!", sourcetype="Boris", host="meep")
self.assertEventuallyTrue(lambda: self.totalEventCount() == event_count + 1, timeout=50)
def test_submit_namespaced(self):
s = client.connect(**{
"username": self.service.username,
"password": self.service.password,
"owner": "nobody",
"app": "search"
})
i = s.indexes[self.index_name]
event_count = int(i['totalEventCount'])
self.assertEqual(i['sync'], '0')
self.assertEqual(i['disabled'], '0')
i.submit("Hello again namespaced!", sourcetype="Boris", host="meep")
self.assertEventuallyTrue(lambda: self.totalEventCount() == event_count + 1, timeout=50)
def test_submit_via_attach(self):
event_count = int(self.index['totalEventCount'])
cn = self.index.attach()
cn.send(b"Hello Boris!\r\n")
cn.close()
self.assertEventuallyTrue(lambda: self.totalEventCount() == event_count + 1, timeout=60)
def test_submit_via_attach_using_token_header(self):
# Remove the prefix from the token
s = client.connect(**{'token': self.service.token.replace("Splunk ", "")})
i = s.indexes[self.index_name]
event_count = int(i['totalEventCount'])
if s.has_cookies():
del s.http._cookies
cn = i.attach()
cn.send(b"Hello Boris 5!\r\n")
cn.close()
self.assertEventuallyTrue(lambda: self.totalEventCount() == event_count + 1, timeout=60)
def test_submit_via_attached_socket(self):
event_count = int(self.index['totalEventCount'])
f = self.index.attached_socket
with f() as sock:
sock.send(b'Hello world!\r\n')
self.assertEventuallyTrue(lambda: self.totalEventCount() == event_count + 1, timeout=60)
def test_submit_via_attach_with_cookie_header(self):
# Skip this test if running below Splunk 6.2, cookie-auth didn't exist before
splver = self.service.splunk_version
if splver[:2] < (6, 2):
self.skipTest("Skipping cookie-auth tests, running in %d.%d.%d, this feature was added in 6.2+" % splver)
event_count = int(self.service.indexes[self.index_name]['totalEventCount'])
cookie = "%s=%s" % (list(self.service.http._cookies.items())[0])
service = client.Service(**{"cookie": cookie})
service.login()
cn = service.indexes[self.index_name].attach()
cn.send(b"Hello Boris!\r\n")
cn.close()
self.assertEventuallyTrue(lambda: self.totalEventCount() == event_count + 1, timeout=60)
def test_submit_via_attach_with_multiple_cookie_headers(self):
# Skip this test if running below Splunk 6.2, cookie-auth didn't exist before
splver = self.service.splunk_version
if splver[:2] < (6, 2):
self.skipTest("Skipping cookie-auth tests, running in %d.%d.%d, this feature was added in 6.2+" % splver)
event_count = int(self.service.indexes[self.index_name]['totalEventCount'])
service = client.Service(**{"cookie": 'a bad cookie'})
service.http._cookies.update(self.service.http._cookies)
service.login()
cn = service.indexes[self.index_name].attach()
cn.send(b"Hello Boris!\r\n")
cn.close()
self.assertEventuallyTrue(lambda: self.totalEventCount() == event_count + 1, timeout=60)
@pytest.mark.app
def test_upload(self):
self.install_app_from_collection("file_to_upload")
event_count = int(self.index['totalEventCount'])
path = self.pathInApp("file_to_upload", ["log.txt"])
self.index.upload(path)
self.assertEventuallyTrue(lambda: self.totalEventCount() == event_count + 4, timeout=60)
if __name__ == "__main__":
import unittest
unittest.main()