Skip to content

Commit 9f86ab2

Browse files
author
Frederick Ross
committed
Made ResultsParser handle sequences of XML fragments.
Added an export endpoint method to Jobs.
1 parent 13bc08d commit 9f86ab2

5 files changed

Lines changed: 204 additions & 26 deletions

File tree

splunklib/binding.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
# License for the specific language governing permissions and limitations
1313
# under the License.
1414

15-
"""This module contains a low-level *binding* interface to the `Splunk REST API
15+
"""A low level binding to Splunk's REST API.
16+
17+
This module contains a low-level *binding* interface to the `Splunk REST API
1618
<http://docs.splunk.com/Documentation/Splunk/latest/RESTAPI/RESTcontents>`_.
1719
1820
This module is designed to enable client-side interaction with the Splunk
@@ -218,17 +220,17 @@ def namespace(**kwargs):
218220
if sharing in ["system"]:
219221
return record({
220222
'sharing': sharing,
221-
'owner': "nobody",
223+
'owner': "nobody",
222224
'app': "system" })
223225
if sharing in ["global", "app"]:
224226
return record({
225227
'sharing': sharing,
226-
'owner': "nobody",
228+
'owner': "nobody",
227229
'app': kwargs.get('app', None)})
228230
if sharing in ["user", None]:
229231
return record({
230232
'sharing': sharing,
231-
'owner': kwargs.get('owner', None),
233+
'owner': kwargs.get('owner', None),
232234
'app': kwargs.get('app', None)})
233235
raise ValueError("Invalid value for argument: 'sharing'")
234236

splunklib/client.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1420,6 +1420,46 @@ def create(self, query, **kwargs):
14201420
sid = _load_sid(response)
14211421
return Job(self.service, PATH_JOBS + sid)
14221422

1423+
def export(self, query, **params):
1424+
"""Run a search and immediately start streaming preview events.
1425+
1426+
Returns an InputStream over the events. The InputStream
1427+
streams XML fragments from the server. The SDK provides
1428+
``results.ResultsReader`` to lazily parse this stream into
1429+
usable Python objects. For example::
1430+
1431+
import splunklib.client as client
1432+
import splunklib.results as results
1433+
s = client.connect(...)
1434+
r = results.ResultsReader(s.jobs.export("search * | head 5"))
1435+
assert r.is_preview == False # The job is finished when we get here
1436+
for kind, event in r:
1437+
assert kind == 'RESULT'
1438+
# events are returned as dicts with strings as values.
1439+
print event
1440+
1441+
``export`` makes a single roundtrip to the server (as opposed
1442+
to two for create followed by preview), plus at most two more
1443+
if autologin is turned on.
1444+
1445+
:raises SyntaxError: on invalid queries.
1446+
1447+
:param query: Splunk search language query to run
1448+
:type query: ``str``
1449+
:param params: Additional arguments to export (see the `REST API docs <http://docs/Documentation/Splunk/4.3.2/RESTAPI/RESTsearch#search.2Fjobs.2Fexport>`_).
1450+
:returns: InputStream over raw XML returned from the server.
1451+
"""
1452+
if "exec_mode" in params:
1453+
raise TypeError("Cannot specify an exec_mode to export.")
1454+
try:
1455+
return self.post(path_segment="export", search=query, **params).body
1456+
except HTTPError as he:
1457+
if he.status == 400 and 'Search operation' in str(he):
1458+
raise SyntaxError(str(he))
1459+
else:
1460+
raise
1461+
1462+
14231463
def oneshot(self, query, **params):
14241464
"""Run a search and directly return an InputStream IO handle over the results.
14251465

splunklib/data.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,15 @@
1212
# License for the specific language governing permissions and limitations
1313
# under the License.
1414

15-
"""This module provides an Atom Feed response loader.
15+
"""Code to read response from splunkd in Atom Feed format.
1616
17-
A simple :func:`load` utility reads Atom Feed XML data (the format returned by
18-
the Splunk REST API), and converts it to a native Python dictionary or list.
17+
The Splunk REST API largely returns Atom (except in a few places where
18+
there are historical inconsistencies that will be ironed out in future
19+
versions).
20+
21+
This module provides one function, :func:`load`, which reads a string
22+
containing the XML of an Atom Feed and returns a dictionary or list
23+
containing the corresponding data.
1924
"""
2025

2126
from xml.etree.ElementTree import XML
@@ -56,14 +61,16 @@ def localname(xname):
5661
return xname if rcurly == -1 else xname[rcurly+1:]
5762

5863
def load(text, match=None):
59-
"""Loads XML text into a native Python structure (*dict* or *list*). If you
60-
provide an optional **match** string (a tag name or path), only the matching
61-
sub-elements are loaded.
62-
63-
:param `text`: The XML text to load.
64-
:type `text`: string
65-
:param `match`: A tag name or path to match (optional).
66-
:type `match`: string
64+
"""Extract Python data structures from Atom XML in the string *text*.
65+
66+
Loads XML text into a native Python structure (`dict` or `list`).
67+
If you provide an optional *match* string (a tag name or path),
68+
only the matching sub-elements are loaded.
69+
70+
:param text: The XML text to load.
71+
:type text: string
72+
:param match: A tag name or path to match (optional).
73+
:type match: string
6774
"""
6875
if text is None: return None
6976
text = text.strip()

splunklib/results.py

Lines changed: 106 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,101 @@
3636
import xml.etree.ElementTree as et
3737

3838
from collections import OrderedDict
39+
try:
40+
from cStringIO import StringIO
41+
except:
42+
from StringIO import StringIO
3943

4044
__all__ = [
4145
"ResultsReader",
4246
"Message"
4347
]
4448

4549
class Message(object):
50+
"""Messages returned splunkd's XML.
51+
52+
**Example**::
53+
54+
m = Message("DEBUG", "There's something in that variable...")
55+
"""
4656
def __init__(self, type_, message):
4757
self.type = type_
4858
self.message = message
4959
def __repr__(self):
50-
print "%s: %s" % (type, message)
60+
return "%s: %s" % (self.type, self.message)
61+
62+
class ConcatenatedStream(object):
63+
"""Lazily concatenate zero or more streams into a stream.
64+
65+
As you read from the concatenated stream, you get characters from
66+
each stream passed to ``ConcatenatedStream``, in order.
67+
68+
**Example**:
69+
70+
from StringIO import StringIO
71+
s = ConcatenatedStream(StringIO("abc"), StringIO("def"))
72+
assert s.read() == "abcdef"
73+
"""
74+
def __init__(self, *streams):
75+
self.streams = list(streams)
76+
77+
def read(self, n=None):
78+
"""Read at most *n* characters from this stream.
79+
80+
If *n* is ``None``, return all available characters.
81+
"""
82+
response = ""
83+
while len(self.streams) > 0 and (n is None or n > 0):
84+
txt = self.streams[0].read(n)
85+
response += txt
86+
if n is not None:
87+
n -= len(txt)
88+
if n > 0 or n is None:
89+
del self.streams[0]
90+
return response
91+
92+
class XMLDTDFilter(object):
93+
"""Lazily remove all XML DTDs from a stream.
94+
95+
All substrings matching the regular expression <?[^>]*> are
96+
removed in their entirety from the stream. No regular expressions
97+
are used, however, so everything still streams properly.
98+
99+
**Example**::
100+
101+
from StringIO import StringIO
102+
s = XMLDTDFilter("<?xml abcd><element><?xml ...></element>")
103+
assert s.read() == "<element></element>"
104+
"""
105+
def __init__(self, stream):
106+
self.stream = stream
107+
108+
def read(self, n=None):
109+
"""Read at most *n* characters from this stream.
110+
111+
If *n* is ``None``, return all available characters.
112+
"""
113+
response = ""
114+
while n is None or n > 0:
115+
c = self.stream.read(1)
116+
if c == "":
117+
break
118+
elif c == "<":
119+
c += self.stream.read(1)
120+
if c == "<?":
121+
while True:
122+
q = self.stream.read(1)
123+
if q == ">":
124+
break
125+
else:
126+
response += c
127+
if n is not None:
128+
n -= len(c)
129+
else:
130+
response += c
131+
if n is not None:
132+
n -= 1
133+
return response
51134

52135
class ResultsReader(object):
53136
"""Lazily yield dicts from a streaming XML results stream.
@@ -84,15 +167,26 @@ class ResultsReader(object):
84167
# function creating that generator. Thus it's all wrapped up for
85168
# the sake of one field.
86169
def __init__(self, stream):
87-
self._gen = self.parse_results(stream)
88-
# splunkd 4.3 returns an empty response body instead of a
89-
# results element with no result elements inside. There is
90-
# no good way to handle it other than failing out and
91-
# trying to get to a sane state.
92-
try:
93-
self.is_preview = self._gen.next()
94-
except StopIteration:
95-
self.is_preview = None
170+
# The search/jobs/exports endpoint, when run with
171+
# earliest_time=rt and latest_time=rt streams a sequence of
172+
# XML documents, each containing a result, as opposed to one
173+
# results element containing lots of results. Python's XML
174+
# parsers are broken, and instead of reading one full document
175+
# and returning the stream that follows untouched, they
176+
# destroy the stream and throw an error. To get around this,
177+
# we remove all the DTD definitions inline, then wrap the
178+
# fragments in a fiction <doc> element to make the parser happy.
179+
stream = XMLDTDFilter(stream)
180+
stream = ConcatenatedStream(StringIO("<doc>"), stream, StringIO("</doc>"))
181+
self._gen = self.parse_results(stream)
182+
# splunkd 4.3 returns an empty response body instead of a
183+
# results element with no result elements inside. There is
184+
# no good way to handle it other than failing out and
185+
# trying to get to a sane state.
186+
try:
187+
self.is_preview = self._gen.next()
188+
except StopIteration:
189+
self.is_preview = None
96190

97191
def __iter__(self):
98192
return self
@@ -146,7 +240,7 @@ def parse_results(self, stream):
146240

147241
elif elem.tag == 'msg':
148242
if event == 'start':
149-
msg_type = elem.attribs['type']
243+
msg_type = elem.attrib['type']
150244
elif event == 'end':
151245
yield Message(msg_type, elem.text.encode('utf8'))
152246
elem.clear()
@@ -160,3 +254,4 @@ def parse_results(self, stream):
160254

161255

162256

257+

tests/test_job.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,17 @@ def test_crud(self):
9090
self.assertTrue(isinstance(result.next(), dict))
9191
self.assertTrue(len(list(result)) <= 3)
9292

93+
result = results.ResultsReader(jobs.export("search index=_internal earliest=-1m | head 3"))
94+
self.assertEqual(result.is_preview, False)
95+
d = result.next()
96+
print d
97+
self.assertTrue(isinstance(d, dict) or isinstance(d, results.Message))
98+
self.assertTrue(len(list(d for d in result if isinstance(d, dict))) <= 3)
99+
93100
self.assertRaises(SyntaxError, jobs.oneshot, "asdaf;lkj2r23=")
94101

102+
self.assertRaises(SyntaxError, jobs.export, "asdaf;lkj2r23=")
103+
95104
# Make sure we can create a job
96105
job = jobs.create("search index=sdk-tests earliest=-1m | head 1")
97106
self.assertTrue(jobs.contains(job.sid))
@@ -176,5 +185,30 @@ def test_results(self):
176185
self.assertTrue(isinstance(result, dict))
177186
self.assertEqual(int(result["count"]), 1)
178187

188+
def test_results_reader(self):
189+
# Run jobs.export("search index=_internal | stats count",
190+
# earliest_time="rt", latest_time="rt") and you get a
191+
# streaming sequence of XML fragments containing results.
192+
with open('streaming_results.xml') as input:
193+
reader = results.ResultsReader(input)
194+
print reader.next()
195+
self.assertTrue(isinstance(reader.next(), dict))
196+
197+
def test_xmldtd_filter(self):
198+
from StringIO import StringIO
199+
s = results.XMLDTDFilter(StringIO("<?xml asdf awe awdf=""><boris>Other stuf</boris><?xml dafawe \n asdfaw > ab"))
200+
self.assertEqual(s.read(3), "<bo")
201+
self.assertEqual(s.read(), "ris>Other stuf</boris> ab")
202+
203+
204+
def test_concatenated_stream(self):
205+
from StringIO import StringIO
206+
s = results.ConcatenatedStream(StringIO("This is a test "),
207+
StringIO("of the emergency broadcast system."))
208+
self.assertEqual(s.read(3), "Thi")
209+
self.assertEqual(s.read(20), 's is a test of the e')
210+
self.assertEqual(s.read(), 'mergency broadcast system.')
211+
212+
179213
if __name__ == "__main__":
180214
testlib.main()

0 commit comments

Comments
 (0)