forked from splunk/splunk-sdk-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjob.py
More file actions
executable file
·275 lines (227 loc) · 8.67 KB
/
job.py
File metadata and controls
executable file
·275 lines (227 loc) · 8.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
#!/usr/bin/env python
#
# Copyright 2011-2015 Splunk, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"): you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""A command line utility for interacting with Splunk search jobs."""
# All job commands operate on search 'specifiers' (spec). A search specifier
# is either a search-id (sid) or the index of the search job in the list of
# jobs, eg: @0 would specify the frist job in the list, @1 the second, and so
# on.
from pprint import pprint
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from splunklib.client import connect
try:
from utils import error, parse, cmdline
except ImportError:
raise Exception("Add the SDK repository to your PYTHONPATH to run the examples "
"(e.g., export PYTHONPATH=~/splunk-sdk-python.")
HELP_EPILOG = """
Commands:
cancel <search>+
create <query> [options]
events <search>+
finalize <search>+
list [<search>]*
pause <search>+
preview <search>+
results <search>+
searchlog <search>+
summary <search>+
perf <search>+
timeline <search>+
touch <search>+
unpause <search>+
A search can be specified either by using it 'search id' ('sid'), or by
using the index in the listing of searches. For example, @5 would refer
to the 5th search job in the list.
Examples:
# Cancel a search
job.py cancel @0
# Create a search
job.py create 'search * | stats count' --search_mode=blocking
# List all searches
job.py list
# List properties of the specified searches
job.py list @3 scheduler__nobody__search_SW5kZXhpbmcgd29ya2xvYWQ_at_1311888600_b18031c8d8f4b4e9
# Get all results for the third search
job.py results @3
"""
FLAGS_CREATE = [
"search", "earliest_time", "latest_time", "now", "time_format",
"exec_mode", "search_mode", "rt_blocking", "rt_queue_size",
"rt_maxblocksecs", "rt_indexfilter", "id", "status_buckets",
"max_count", "max_time", "timeout", "auto_finalize_ec", "enable_lookups",
"reload_macros", "reduce_freq", "spawn_process", "required_field_list",
"rf", "auto_cancel", "auto_pause",
]
FLAGS_EVENTS = [
"offset", "count", "earliest_time", "latest_time", "search",
"time_format", "output_time_format", "field_list", "f", "max_lines",
"truncation_mode", "output_mode", "segmentation"
]
FLAGS_RESULTS = [
"offset", "count", "search", "field_list", "f", "output_mode"
]
FLAGS_TIMELINE = [
"time_format", "output_time_format"
]
FLAGS_SEARCHLOG = [
"attachment"
]
FLAGS_SUMMARY = [
"earliest_time", "latest_time", "time_format", "output_time_format",
"field_list", "f", "search", "top_count", "min_freq"
]
def cmdline(argv, flags):
"""A cmdopts wrapper that takes a list of flags and builds the
corresponding cmdopts rules to match those flags."""
rules = dict([(flag, {'flags': ["--%s" % flag]}) for flag in flags])
return parse(argv, rules)
def output(stream):
"""Write the contents of the given stream to stdout."""
while True:
content = stream.read(1024)
if len(content) == 0: break
sys.stdout.write(content)
class Program:
def __init__(self, service):
self.service = service
def cancel(self, argv):
self.foreach(argv, lambda job: job.cancel())
def create(self, argv):
"""Create a search job."""
opts = cmdline(argv, FLAGS_CREATE)
if len(opts.args) != 1:
error("Command requires a search expression", 2)
query = opts.args[0]
job = self.service.jobs.create(opts.args[0], **opts.kwargs)
print job.sid
def events(self, argv):
"""Retrieve events for the specified search jobs."""
opts = cmdline(argv, FLAGS_EVENTS)
self.foreach(opts.args, lambda job:
output(job.events(**opts.kwargs)))
def finalize(self, argv):
"""Finalize the specified search jobs."""
self.foreach(argv, lambda job: job.finalize())
def foreach(self, argv, func):
"""Apply the function to each job specified in the argument vector."""
if len(argv) == 0:
error("Command requires a search specifier.", 2)
for item in argv:
job = self.lookup(item)
if job is None:
error("Search job '%s' does not exist" % item, 2)
func(job)
def list(self, argv):
"""List all current search jobs if no jobs specified, otherwise
list the properties of the specified jobs."""
def read(job):
for key in sorted(job.content.keys()):
# Ignore some fields that make the output hard to read and
# that are available via other commands.
if key in ["performance"]: continue
print "%s: %s" % (key, job.content[key])
if len(argv) == 0:
index = 0
for job in self.service.jobs:
print "@%d : %s" % (index, job.sid)
index += 1
return
self.foreach(argv, read)
def preview(self, argv):
"""Retrieve the preview for the specified search jobs."""
opts = cmdline(argv, FLAGS_RESULTS)
self.foreach(opts.args, lambda job:
output(job.preview(**opts.kwargs)))
def results(self, argv):
"""Retrieve the results for the specified search jobs."""
opts = cmdline(argv, FLAGS_RESULTS)
self.foreach(opts.args, lambda job:
output(job.results(**opts.kwargs)))
def sid(self, spec):
"""Convert the given search specifier into a search-id (sid)."""
if spec.startswith('@'):
index = int(spec[1:])
jobs = self.service.jobs.list()
if index < len(jobs):
return jobs[index].sid
return spec # Assume it was already a valid sid
def lookup(self, spec):
"""Lookup search job by search specifier."""
return self.service.jobs[self.sid(spec)]
def pause(self, argv):
"""Pause the specified search jobs."""
self.foreach(argv, lambda job: job.pause())
def perf(self, argv):
"""Retrive performance info for the specified search jobs."""
self.foreach(argv, lambda job: pprint(job['performance']))
def run(self, argv):
"""Dispatch the given command."""
command = argv[0]
handlers = {
'cancel': self.cancel,
'create': self.create,
'events': self.events,
'finalize': self.finalize,
'list': self.list,
'pause': self.pause,
'preview': self.preview,
'results': self.results,
'searchlog': self.searchlog,
'summary': self.summary,
'perf': self.perf,
'timeline': self.timeline,
'touch': self.touch,
'unpause': self.unpause,
}
handler = handlers.get(command, None)
if handler is None:
error("Unrecognized command: %s" % command, 2)
handler(argv[1:])
def searchlog(self, argv):
"""Retrieve the searchlog for the specified search jobs."""
opts = cmdline(argv, FLAGS_SEARCHLOG)
self.foreach(opts.args, lambda job:
output(job.searchlog(**opts.kwargs)))
def summary(self, argv):
opts = cmdline(argv, FLAGS_SUMMARY)
self.foreach(opts.args, lambda job:
output(job.summary(**opts.kwargs)))
def timeline(self, argv):
opts = cmdline(argv, FLAGS_TIMELINE)
self.foreach(opts.args, lambda job:
output(job.timeline(**opts.kwargs)))
def touch(self, argv):
self.foreach(argv, lambda job: job.touch())
def unpause(self, argv):
self.foreach(argv, lambda job: job.unpause())
def main():
usage = "usage: %prog [options] <command> [<args>]"
argv = sys.argv[1:]
# Locate the command
index = next((i for i, v in enumerate(argv) if not v.startswith('-')), -1)
if index == -1: # No command
options = argv
command = ["list"]
else:
options = argv[:index]
command = argv[index:]
opts = parse(options, {}, ".splunkrc", usage=usage, epilog=HELP_EPILOG)
service = connect(**opts.kwargs)
program = Program(service)
program.run(command)
if __name__ == "__main__":
main()