forked from splunk/splunk-sdk-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathresults.py
More file actions
343 lines (283 loc) · 12.5 KB
/
results.py
File metadata and controls
343 lines (283 loc) · 12.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# Copyright © 2011-2024 Splunk, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"): you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""The **splunklib.results** module provides a streaming XML reader for Splunk
search results.
Splunk search results can be returned in a variety of formats including XML,
JSON, and CSV. To make it easier to stream search results in XML format, they
are returned as a stream of XML *fragments*, not as a single XML document. This
module supports incrementally reading one result record at a time from such a
result stream. This module also provides a friendly iterator-based interface for
accessing search results while avoiding buffering the result set, which can be
very large.
To use the reader, instantiate :class:`JSONResultsReader` on a search result stream
as follows:::
reader = ResultsReader(result_stream)
for item in reader:
print(item)
print(f"Results are a preview: {reader.is_preview}")
"""
from io import BufferedReader, BytesIO
import xml.etree.ElementTree as et
from collections import OrderedDict
from json import loads as json_loads
__all__ = [
"ResultsReader",
"Message",
"JSONResultsReader"
]
import deprecation
class Message:
"""This class represents informational messages that Splunk interleaves in the results stream.
``Message`` takes two arguments: a string giving the message type (e.g., "DEBUG"), and
a string giving the message itself.
**Example**::
m = Message("DEBUG", "There's something in that variable...")
"""
def __init__(self, type_, message):
self.type = type_
self.message = message
def __repr__(self):
return f"{self.type}: {self.message}"
def __eq__(self, other):
return (self.type, self.message) == (other.type, other.message)
def __hash__(self):
return hash((self.type, self.message))
class _ConcatenatedStream:
"""Lazily concatenate zero or more streams into a stream.
As you read from the concatenated stream, you get characters from
each stream passed to ``_ConcatenatedStream``, in order.
**Example**::
from StringIO import StringIO
s = _ConcatenatedStream(StringIO("abc"), StringIO("def"))
assert s.read() == "abcdef"
"""
def __init__(self, *streams):
self.streams = list(streams)
def read(self, n=None):
"""Read at most *n* characters from this stream.
If *n* is ``None``, return all available characters.
"""
response = b""
while len(self.streams) > 0 and (n is None or n > 0):
txt = self.streams[0].read(n)
response += txt
if n is not None:
n -= len(txt)
if n is None or n > 0:
del self.streams[0]
return response
class _XMLDTDFilter:
"""Lazily remove all XML DTDs from a stream.
All substrings matching the regular expression <?[^>]*> are
removed in their entirety from the stream. No regular expressions
are used, however, so everything still streams properly.
**Example**::
from StringIO import StringIO
s = _XMLDTDFilter("<?xml abcd><element><?xml ...></element>")
assert s.read() == "<element></element>"
"""
def __init__(self, stream):
self.stream = stream
def read(self, n=None):
"""Read at most *n* characters from this stream.
If *n* is ``None``, return all available characters.
"""
response = b""
while n is None or n > 0:
c = self.stream.read(1)
if c == b"":
break
if c == b"<":
c += self.stream.read(1)
if c == b"<?":
while True:
q = self.stream.read(1)
if q == b">":
break
else:
response += c
if n is not None:
n -= len(c)
else:
response += c
if n is not None:
n -= 1
return response
@deprecation.deprecated(details="Use the JSONResultsReader function instead in conjuction with the 'output_mode' query param set to 'json'")
class ResultsReader:
"""This class returns dictionaries and Splunk messages from an XML results
stream.
``ResultsReader`` is iterable, and returns a ``dict`` for results, or a
:class:`Message` object for Splunk messages. This class has one field,
``is_preview``, which is ``True`` when the results are a preview from a
running search, or ``False`` when the results are from a completed search.
This function has no network activity other than what is implicit in the
stream it operates on.
:param `stream`: The stream to read from (any object that supports
``.read()``).
**Example**::
import results
response = ... # the body of an HTTP response
reader = results.ResultsReader(response)
for result in reader:
if isinstance(result, dict):
print(f"Result: {result}")
elif isinstance(result, results.Message):
print(f"Message: {result}")
print(f"is_preview = {reader.is_preview}")
"""
# Be sure to update the docstrings of client.Jobs.oneshot,
# client.Job.results_preview and client.Job.results to match any
# changes made to ResultsReader.
#
# This wouldn't be a class, just the _parse_results function below,
# except that you cannot get the current generator inside the
# function creating that generator. Thus it's all wrapped up for
# the sake of one field.
def __init__(self, stream):
# The search/jobs/exports endpoint, when run with
# earliest_time=rt and latest_time=rt streams a sequence of
# XML documents, each containing a result, as opposed to one
# results element containing lots of results. Python's XML
# parsers are broken, and instead of reading one full document
# and returning the stream that follows untouched, they
# destroy the stream and throw an error. To get around this,
# we remove all the DTD definitions inline, then wrap the
# fragments in a fiction <doc> element to make the parser happy.
stream = _XMLDTDFilter(stream)
stream = _ConcatenatedStream(BytesIO(b"<doc>"), stream, BytesIO(b"</doc>"))
self.is_preview = None
self._gen = self._parse_results(stream)
def __iter__(self):
return self
def __next__(self):
return next(self._gen)
def _parse_results(self, stream):
"""Parse results and messages out of *stream*."""
result = None
values = None
try:
for event, elem in et.iterparse(stream, events=('start', 'end')):
if elem.tag == 'results' and event == 'start':
# The wrapper element is a <results preview="0|1">. We
# don't care about it except to tell is whether these
# are preview results, or the final results from the
# search.
is_preview = elem.attrib['preview'] == '1'
self.is_preview = is_preview
if elem.tag == 'result':
if event == 'start':
result = OrderedDict()
elif event == 'end':
yield result
result = None
elem.clear()
elif elem.tag == 'field' and result is not None:
# We need the 'result is not None' check because
# 'field' is also the element name in the <meta>
# header that gives field order, which is not what we
# want at all.
if event == 'start':
values = []
elif event == 'end':
field_name = elem.attrib['k']
if len(values) == 1:
result[field_name] = values[0]
else:
result[field_name] = values
# Calling .clear() is necessary to let the
# element be garbage collected. Otherwise
# arbitrarily large results sets will use
# arbitrarily large memory intead of
# streaming.
elem.clear()
elif elem.tag in ('text', 'v') and event == 'end':
text = "".join(elem.itertext())
values.append(text)
elem.clear()
elif elem.tag == 'msg':
if event == 'start':
msg_type = elem.attrib['type']
elif event == 'end':
text = elem.text if elem.text is not None else ""
yield Message(msg_type, text)
elem.clear()
except SyntaxError as pe:
# This is here to handle the same incorrect return from
# splunk that is described in __init__.
if 'no element found' in pe.msg:
return
else:
raise
class JSONResultsReader:
"""This class returns dictionaries and Splunk messages from a JSON results
stream.
``JSONResultsReader`` is iterable, and returns a ``dict`` for results, or a
:class:`Message` object for Splunk messages. This class has one field,
``is_preview``, which is ``True`` when the results are a preview from a
running search, or ``False`` when the results are from a completed search.
This function has no network activity other than what is implicit in the
stream it operates on.
:param `stream`: The stream to read from (any object that supports``.read()``).
**Example**::
import results
response = ... # the body of an HTTP response
reader = results.JSONResultsReader(response)
for result in reader:
if isinstance(result, dict):
print(f"Result: {result}")
elif isinstance(result, results.Message):
print(f"Message: {result}")
print(f"is_preview = {reader.is_preview}")
"""
# Be sure to update the docstrings of client.Jobs.oneshot,
# client.Job.results_preview and client.Job.results to match any
# changes made to JSONResultsReader.
#
# This wouldn't be a class, just the _parse_results function below,
# except that you cannot get the current generator inside the
# function creating that generator. Thus it's all wrapped up for
# the sake of one field.
def __init__(self, stream):
# The search/jobs/exports endpoint, when run with
# earliest_time=rt and latest_time=rt, output_mode=json, streams a sequence of
# JSON documents, each containing a result, as opposed to one
# results element containing lots of results.
stream = BufferedReader(stream)
self.is_preview = None
self._gen = self._parse_results(stream)
def __iter__(self):
return self
def __next__(self):
return next(self._gen)
def _parse_results(self, stream):
"""Parse results and messages out of *stream*."""
msg_type = None
text = None
for line in stream.readlines():
strip_line = line.strip()
if strip_line.__len__() == 0: continue
parsed_line = json_loads(strip_line)
if "preview" in parsed_line:
self.is_preview = parsed_line["preview"]
if "messages" in parsed_line and parsed_line["messages"].__len__() > 0:
for message in parsed_line["messages"]:
msg_type = message.get("type", "Unknown Message Type")
text = message.get("text")
yield Message(msg_type, text)
if "result" in parsed_line:
yield parsed_line["result"]
if "results" in parsed_line:
for result in parsed_line["results"]:
yield result