diff --git a/splunklib/binding.py b/splunklib/binding.py index 3ae6ef967..4c95aa79e 100644 --- a/splunklib/binding.py +++ b/splunklib/binding.py @@ -33,6 +33,7 @@ import sys import Cookie +from base64 import b64encode from datetime import datetime from functools import wraps from StringIO import StringIO @@ -471,6 +472,7 @@ def __init__(self, handler=None, **kwargs): self.namespace = namespace(**kwargs) self.username = kwargs.get("username", "") self.password = kwargs.get("password", "") + self.basic = kwargs.get("basic", False) self.autologin = kwargs.get("autologin", False) # Store any cookies in the self.http._cookies dict @@ -507,6 +509,9 @@ def _auth_headers(self): """ if self.has_cookies(): return [("Cookie", _make_cookie_header(self.get_cookies().items()))] + elif self.basic and (self.username and self.password): + token = 'Basic %s' % b64encode("%s:%s" % (self.username, self.password)) + return [("Authorization", token)] elif self.token is _NoAuthenticationToken: return [] else: @@ -838,6 +843,11 @@ def login(self): # logged in. return + if self.basic and (self.username and self.password): + # Basic auth mode requested, so this method is a nop as long + # as credentials were passed in. + return + # Only try to get a token and updated cookie if username & password are specified try: response = self.http.post( diff --git a/tests/test_binding.py b/tests/test_binding.py index 2a3f079bc..ef0cacc94 100755 --- a/tests/test_binding.py +++ b/tests/test_binding.py @@ -686,6 +686,32 @@ def test_namespace(self): def test_namespace_fails(self): self.assertRaises(ValueError, binding.namespace, sharing="gobble") +class TestBasicAuthentication(unittest.TestCase): + def setUp(self): + self.opts = testlib.parse([], {}, ".splunkrc") + opts = self.opts.kwargs.copy() + opts["basic"] = True + opts["username"] = self.opts.kwargs["username"] + opts["password"] = self.opts.kwargs["password"] + + self.context = binding.connect(**opts) + import splunklib.client as client + service = client.Service(**opts) + + if getattr(unittest.TestCase, 'assertIsNotNone', None) is None: + def assertIsNotNone(self, obj, msg=None): + if obj is None: + raise self.failureException, (msg or '%r is not None' % obj) + + def test_basic_in_auth_headers(self): + self.assertIsNotNone(self.context._auth_headers) + self.assertNotEqual(self.context._auth_headers, []) + self.assertEqual(len(self.context._auth_headers), 1) + self.assertEqual(len(self.context._auth_headers), 1) + self.assertEqual(self.context._auth_headers[0][0], "Authorization") + self.assertEqual(self.context._auth_headers[0][1][:6], "Basic ") + self.assertEqual(self.context.get("/services").status, 200) + class TestTokenAuthentication(BindingTestCase): def test_preexisting_token(self): token = self.context.token