#!/usr/bin/env python # # Copyright 2011-2015 Splunk, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"): you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from __future__ import absolute_import from tests import testlib import logging from contextlib import contextmanager import splunklib.client as client from splunklib.six.moves import range collections = [ 'apps', 'event_types', 'indexes', 'inputs', 'jobs', 'loggers', 'messages', 'roles', 'users' ] expected_access_keys = set(['sharing', 'app', 'owner']) expected_fields_keys = set(['required', 'optional', 'wildcard']) class CollectionTestCase(testlib.SDKTestCase): def setUp(self): super(CollectionTestCase, self).setUp() if self.service.splunk_version[0] >= 5 and 'modular_input_kinds' not in collections: collections.append('modular_input_kinds') # Not supported before Splunk 5.0 else: logging.info("Skipping modular_input_kinds; not supported by Splunk %s" % \ '.'.join(str(x) for x in self.service.splunk_version)) for saved_search in self.service.saved_searches: if saved_search.name.startswith('delete-me'): try: for job in saved_search.history(): job.cancel() self.service.saved_searches.delete(saved_search.name) except KeyError: pass def test_metadata(self): self.assertRaises(client.NotSupportedError, self.service.jobs.itemmeta) self.assertRaises(client.NotSupportedError, self.service.loggers.itemmeta) self.assertRaises(TypeError, self.service.inputs.itemmeta) for c in collections: if c in ['jobs', 'loggers', 'inputs', 'modular_input_kinds']: continue coll = getattr(self.service, c) metadata = coll.itemmeta() found_access_keys = set(metadata.access.keys()) found_fields_keys = set(metadata.fields.keys()) self.assertTrue(found_access_keys >= expected_access_keys, msg='metadata.access is missing keys on ' + \ '%s (found: %s, expected: %s)' % \ (coll, found_access_keys, expected_access_keys)) self.assertTrue(found_fields_keys >= expected_fields_keys, msg='metadata.fields is missing keys on ' + \ '%s (found: %s, expected: %s)' % \ (coll, found_fields_keys, expected_fields_keys)) def test_list(self): for coll_name in collections: coll = getattr(self.service, coll_name) expected = [ent.name for ent in coll.list(count=10, sort_mode="auto")] if len(expected) == 0: logging.debug("No entities in collection %s; skipping test.", coll_name) found = [ent.name for ent in coll.list()][:10] self.assertEqual(expected, found, msg='on %s (expected: %s, found: %s)' % \ (coll_name, expected, found)) def test_list_with_count(self): N = 5 for coll_name in collections: coll = getattr(self.service, coll_name) expected = [ent.name for ent in coll.list(count=N+5)][:N] N = len(expected) # in case there are