diff --git a/crowd.py b/crowd.py index 33b450b..7891f63 100644 --- a/crowd.py +++ b/crowd.py @@ -11,6 +11,40 @@ import requests +class CrowdAuthFailure(Exception): + """A failure occurred while performing an authentication operation""" + pass + + +class CrowdAuthDenied(Exception): + """Crowd server refused to perform the operation""" + pass + + +class CrowdUserExists(Exception): + pass + + +class CrowdNoSuchUser(Exception): + pass + + +class CrowdGroupExists(Exception): + pass + + +class CrowdNoSuchGroup(Exception): + pass + + +class CrowdError(Exception): + """Generic exception when unexpected response encountered""" + def __init__(self, message=None): + if not message: + message = "unexpected response from Crowd server" + Exception.__init__(self, message) + + class CrowdServer(object): """Crowd server authentication object. @@ -93,6 +127,9 @@ def auth_ping(self): Returns: bool: True if the application authentication succeeded. + + Raises: + CrowdError: If auth ping could not be completed. """ url = self.rest_url + "/non-existent/location" @@ -100,11 +137,13 @@ def auth_ping(self): if response.status_code == 401: return False - elif response.status_code == 404: + + if response.status_code == 404: + # A 'not found' response indicates we passed app auth return True - else: - # An error encountered - problem with the Crowd server? - return False + + # An error encountered - problem with the Crowd server? + raise CrowdError("unidentified problem") def auth_user(self, username, password): """Authenticate a user account against the Crowd server. @@ -122,19 +161,25 @@ def auth_user(self, username, password): authentication was successful. See the Crowd documentation for the authoritative list of attributes. - None: If authentication failed. + None: If received negative authentication response + + Raises: + CrowdAuthFailure: + If authentication attempt failed (other than negative response) """ response = self._post(self.rest_url + "/authentication", data=json.dumps({"value": password}), params={"username": username}) - # If authentication failed for any reason return None - if not response.ok: - return None + if response.status_code == 200: + return response.json() - # ...otherwise return a dictionary of user attributes - return response.json() + if response.status_code == 400: + j = response.json() + raise CrowdAuthFailure(j['message']) + + raise CrowdError def get_session(self, username, password, remote="127.0.0.1"): """Create a session for a user. @@ -158,7 +203,8 @@ def get_session(self, username, password, remote="127.0.0.1"): authentication was successful. See the Crowd documentation for the authoritative list of attributes. - None: If authentication failed. + Raises: + CrowdAuthFailure: If authentication failed. """ params = { @@ -175,12 +221,14 @@ def get_session(self, username, password, remote="127.0.0.1"): data=json.dumps(params), params={"expand": "user"}) - # If authentication failed for any reason return None - if not response.ok: - return None + if response.status_code == 201: + return response.json() - # Otherwise return the user object - return response.json() + if response.status_code == 400: + j = response.json() + raise CrowdAuthFailure(j['message']) + + raise CrowdError def validate_session(self, token, remote="127.0.0.1"): """Validate a session token. @@ -200,7 +248,8 @@ def validate_session(self, token, remote="127.0.0.1"): authentication was successful. See the Crowd documentation for the authoritative list of attributes. - None: If authentication failed. + Raises: + CrowdAuthFailure: If authentication failed. """ params = { @@ -210,12 +259,12 @@ def validate_session(self, token, remote="127.0.0.1"): } url = self.rest_url + "/session/%s" % token - response = self._post(url, data=json.dumps(params), params={"expand": "user"}) + response = self._post(url, data=json.dumps(params), + params={"expand": "user"}) - # For consistency between methods use None rather than False - # If token validation failed for any reason return None + # If token validation failed for any reason raise exception if not response.ok: - return None + raise CrowdAuthFailure # Otherwise return the user object return response.json() @@ -230,19 +279,17 @@ def terminate_session(self, token): Returns: True: If session terminated - None: If session termination failed + Raises: + CrowdError: If authentication failed. """ url = self.rest_url + "/session/%s" % token response = self._delete(url) - # For consistency between methods use None rather than False - # If token validation failed for any reason return None - if not response.ok: - return None + if response.status_code == 204: + return True - # Otherwise return True - return True + raise CrowdError def add_user(self, username, **kwargs): """Add a user to the directory @@ -260,15 +307,11 @@ def add_user(self, username, **kwargs): Returns: True: Succeeded False: If unsuccessful + + Raises: + CrowdError: If authentication failed. """ - # Check that mandatory elements have been provided - if 'password' not in kwargs: - raise ValueError("missing password") - if 'email' not in kwargs: - raise ValueError("missing email") - - components = ['username', 'password', 'first_name', - 'last_name', 'display_name', 'active'] + # Populate data with default and mandatory values. # A KeyError means a mandatory value was not provided, # so raise a ValueError indicating bad args. @@ -279,11 +322,11 @@ def add_user(self, username, **kwargs): "last-name": username, "display-name": username, "email": kwargs["email"], - "password": { "value": kwargs["password"] }, + "password": {"value": kwargs["password"]}, "active": True } - except KeyError: - return ValueError + except KeyError as e: + raise ValueError("missing %s" % e.message) # Remove special case 'password' del(kwargs["password"]) @@ -298,11 +341,51 @@ def add_user(self, username, **kwargs): response = self._post(self.rest_url + "/user", data=json.dumps(data)) + # Crowd should return 201, 400 or 403 + if response.status_code == 201: return True - return False + if response.status_code == 400: + # User already exists / no password given (but we checked that) + raise CrowdUserExists + + if response.status_code == 403: + raise CrowdAuthDenied("application is not allowed to create " + "a new user") + raise CrowdError + + def remove_user(self, username): + """Remove a user from the directory + + Args: + username: The account username + + Returns: + True: Succeeded + + Raises: + CrowdNoSuchUser: If user did not exist + CrowdAuthDenied: If application not allowed to delete the user + """ + + response = self._delete(self.rest_url + "/user", + params={"username": username}) + + # Crowd should return 204, 403 or 404 + + if response.status_code == 204: + return True + + if response.status_code == 403: + raise CrowdAuthDenied("application is not allowed to delete user") + + if response.status_code == 404: + # User did not exist + raise CrowdNoSuchUser + + raise CrowdError def get_user(self, username): """Retrieve information about a user @@ -310,61 +393,123 @@ def get_user(self, username): Returns: dict: User information - None: If no user or failure occurred + None: If no such user + + Raises: + CrowdError: If unexpected response from Crowd server """ response = self._get(self.rest_url + "/user", params={"username": username, "expand": "attributes"}) - if not response.ok: + if response.status_code == 200: + return response.json() + + if response.status_code == 404: return None - return response.json() + raise CrowdError + + def add_group(self, groupname, **kwargs): + """Creates a group + + Returns: + True: The group was created + + Raises: + CrowdGroupExists: The group already exists + CrowdAuthFail + CrowdError: If unexpected response from Crowd server + """ + + data = { + "name": groupname, + "description": groupname, + "active": True, + "type": "GROUP" + } + # Put values from kwargs into data + for k, v in kwargs.items(): + if k not in data: + raise ValueError("invalid argument %s" % k) + data[k] = v + + response = self._post(self.rest_url + "/group", + data=json.dumps(data)) + + if response.status_code == 201: + return True + + if response.status_code == 400: + raise CrowdGroupExists + + if response.status_code == 403: + raise CrowdAuthFailure + + raise CrowdError("status code %d" % response.status_code) + def get_groups(self, username): - """Retrieves a list of group names that have as a direct member. + """Retrieves a list of group names that have as a + direct member. Returns: list: A list of strings of group names. + + None: If user not found + + Raises: + CrowdError: If unexpected response from Crowd server """ response = self._get(self.rest_url + "/user/group/direct", params={"username": username}) - if not response.ok: + if response.status_code == 200: + return [g['name'] for g in response.json()['groups']] + + if response.status_code == 404: return None - return [g['name'] for g in response.json()['groups']] + raise CrowdError def get_nested_groups(self, username): - """Retrieve a list of all group names that have as a direct or indirect member. + """Retrieve a list of all group names that have as + a direct or indirect member. Args: username: The account username. - Returns: list: A list of strings of group names. + + None: If user not found + + Raises: + CrowdError: If unexpected response from Crowd server """ response = self._get(self.rest_url + "/user/group/nested", params={"username": username}) - if not response.ok: + if response.status_code == 200: + return [g['name'] for g in response.json()['groups']] + + if response.status_code == 404: return None - return [g['name'] for g in response.json()['groups']] + raise CrowdError def get_nested_group_users(self, groupname): - """Retrieves a list of all users that directly or indirectly belong to the given groupname. + """Retrieves a list of all users that directly or indirectly + belong to the given groupname. Args: groupname: The group name. - Returns: list: A list of strings of user names. @@ -380,6 +525,74 @@ def get_nested_group_users(self, groupname): return [u['name'] for u in response.json()['users']] + def add_user_to_group(self, username, groupname): + """Make user a direct member of a group + + Args: + username: The user name. + groupname: The group name. + + Returns: + True: If successful + + Raises: + CrowdNoSuchUser: The user does not exist + CrowdNoSuchGroup: The group does not exist + CrowdUserExists: The user is already a member + CrowdError: Unexpected response + """ + response = self._post(self.rest_url + "/group/user/direct", + data=json.dumps({"name": username}), + params={"groupname": groupname}) + + if response.status_code == 201: + return True + + if response.status_code == 400: + raise CrowdNoSuchUser + + if response.status_code == 404: + raise CrowdNoSuchGroup + + if response.status_code == 409: + raise CrowdUserExists + + raise CrowdError("received server response %d" % response.status_code) + + def remove_user_from_group(self, username, groupname): + """Remove user as a direct member of a group + + Args: + username: The user name. + groupname: The group name. + + Returns: + True: If successful + + Raises: + CrowdNotFound: The user or group does not exist + CrowdUserExists: The user is already a member + CrowdError: Unexpected response + """ + response = self._delete(self.rest_url + "/group/user/direct", + params={"groupname": groupname, + "username": username}) + + if response.status_code == 204: + return True + + if response.status_code == 404: + # user or group does not exist + j = response.json() + if j['message'].lower().startswith('group'): + raise CrowdNoSuchGroup + elif j['message'].lower().startswith('user'): + raise CrowdNoSuchUser + else: + raise CrowdError("unknown server response") + + raise CrowdError + def user_exists(self, username): """Determines if the user exists. diff --git a/tests/crowdserverstub.py b/tests/crowdserverstub.py index 031a2c6..a923230 100644 --- a/tests/crowdserverstub.py +++ b/tests/crowdserverstub.py @@ -298,7 +298,7 @@ def _get_session(self): # Either user may authenticate, used an invalid password, # or user does not exist. if user_authenticated: - response_code = 200 + response_code = 201 token = create_session(username, remote) response = { "token": token, diff --git a/tests/unittests.py b/tests/unittests.py index 9261244..b25a799 100644 --- a/tests/unittests.py +++ b/tests/unittests.py @@ -29,9 +29,10 @@ print("Port {0}".format(PORT)) APP_USER = 'testapp' APP_PASS = 'testpass' -USER = 'user1' +USER = 'pythoncrowdtestuser' PASS = 'pass1' -GROUP = 'group1' +EMAIL = 'me@test.example' +GROUP = 'pythoncrowdtestgroup' class testCrowdAuth(unittest.TestCase): @@ -39,39 +40,72 @@ class testCrowdAuth(unittest.TestCase): @classmethod def setUpClass(cls): - cls.base_url = 'http://localhost:%d' % PORT - cls.crowd = crowd.CrowdServer(cls.base_url, APP_USER, APP_PASS) - - cls.server_thread = threading.Thread( - target=crowdserverstub.run_server, args=(PORT,)) - cls.server_thread.start() + import os + if 'CROWDSERVER' in os.environ: + cls.base_url = os.environ['CROWDSERVER'] + cls.server_thread = None + else: + cls.base_url = 'http://localhost:%d' % PORT + cls.server_thread = threading.Thread( + target=crowdserverstub.run_server, args=(PORT,)) + cls.server_thread.start() + crowdserverstub.add_app(APP_USER, APP_PASS) + # There is a race to start the HTTP server before + # the unit tests begin hitting it. Sleep briefly + time.sleep(0.2) - crowdserverstub.add_app(APP_USER, APP_PASS) - crowdserverstub.add_user(USER, PASS) + cls.crowd = crowd.CrowdServer(cls.base_url, APP_USER, APP_PASS) - # There is a race to start the HTTP server before - # the unit tests begin hitting it. Sleep briefly - time.sleep(0.2) + # Create user account for most tests + try: + cls.crowd.add_user(USER, password=PASS, email=EMAIL) + except crowd.CrowdUserExists: + pass + cls.num_users_created = 0 + try: + cls.crowd.add_group(GROUP) + except crowd.CrowdGroupExists: + pass @classmethod def tearDownClass(cls): - requests.get(cls.base_url + '/terminate') - cls.server_thread.join() + if cls.server_thread: + requests.get(cls.base_url + '/terminate') + cls.server_thread.join() + else: + # Remove users + try: + cls.crowd.remove_user(USER) + except: + pass + for i in xrange(0, cls.num_users_created): + try: + cls.crowd.remove_user(USER + str(i)) + except: + pass + # Remove groups + try: + cls.crowd.remove_group(GROUP) + except: + pass def testStubUserExists(self): """Check that server stub recognises user""" - result = crowdserverstub.user_exists(USER) - self.assertTrue(result) + if self.server_thread: + result = crowdserverstub.user_exists(USER) + self.assertTrue(result) def testStubUserDoesNotExist(self): """Check that server stub does not know invalid user""" - result = crowdserverstub.user_exists('fakeuser') - self.assertFalse(result) + if self.server_thread: + result = crowdserverstub.user_exists('fakeuser') + self.assertFalse(result) def testStubCheckUserAuth(self): """Check that server stub auths our user/pass combination""" - result = crowdserverstub.check_user_auth(USER, PASS) - self.assertTrue(result) + if self.server_thread: + result = crowdserverstub.check_user_auth(USER, PASS) + self.assertTrue(result) def testCrowdObjectSSLVerifyTrue(self): """Check can create Crowd object with ssl_verify=True""" @@ -101,13 +135,13 @@ def testAuthUserValid(self): def testAuthUserInvalidUser(self): """User may not authenticate with invalid username""" - result = self.crowd.auth_user('invaliduser', 'xxxxx') - self.assertIs(result, None) + with self.assertRaises(crowd.CrowdAuthFailure): + result = self.crowd.auth_user('invaliduser', 'xxxxx') def testAuthUserInvalidPass(self): """User may not authenticate with invalid password""" - result = self.crowd.auth_user(USER, 'xxxxx') - self.assertIs(result, None) + with self.assertRaises(crowd.CrowdAuthFailure): + result = self.crowd.auth_user(USER, 'xxxxx') def testCreateSessionValidUser(self): """User may create a session with valid credentials""" @@ -116,13 +150,13 @@ def testCreateSessionValidUser(self): def testCreateSessionInvalidUser(self): """User may not create a session with invalid username""" - result = self.crowd.get_session('invaliduser', 'xxxxx') - self.assertIs(result, None) + with self.assertRaises(crowd.CrowdAuthFailure): + result = self.crowd.get_session('invaliduser', 'xxxxx') def testCreateSessionInvalidPass(self): """User may not create a session with invalid password""" - result = self.crowd.get_session(USER, 'xxxxx') - self.assertIs(result, None) + with self.assertRaises(crowd.CrowdAuthFailure): + result = self.crowd.get_session(USER, 'xxxxx') def testValidateSessionValidUser(self): """Validate a valid session token""" @@ -133,22 +167,29 @@ def testValidateSessionValidUser(self): def testValidateSessionInvalidToken(self): """Detect invalid session token""" - token = '0' * 24 - result = self.crowd.validate_session(token) - self.assertIs(result, None) + with self.assertRaises(crowd.CrowdAuthFailure): + token = '0' * 24 + result = self.crowd.validate_session(token) def testValidateSessionValidUserUTF8(self): """Validate that the library handles UTF-8 in fields properly""" - session = self.crowd.get_session(USER, PASS) + username = USER + "unicode" + email = u'ÜñÍçÔÐê' + try: + self.crowd.add_user(username, password=PASS, email=email) + except crowd.CrowdUserExists: + pass + session = self.crowd.get_session(username, PASS) token = session['token'] result = self.crowd.validate_session(token) - self.assertEquals(result['user']['email'], u'%s@does.not.ëxist' % USER) + self.crowd.remove_user(username) + self.assertEquals(result['user']['email'], email) def testCreateSessionIdentical(self): """Sessions from same remote are identical""" session1 = self.crowd.get_session(USER, PASS, '192.168.99.99') session2 = self.crowd.get_session(USER, PASS, '192.168.99.99') - self.assertEqual(session1, session2) + self.assertEqual(session1['token'], session2['token']) def testCreateSessionMultiple(self): """User may create multiple sessions from different remote""" @@ -166,31 +207,31 @@ def testTerminateSessionValidToken(self): def testTerminateSessionInvalidToken(self): token = '0' * 24 result = self.crowd.terminate_session(token) - self.assertIs(result, None) + self.assertTrue(result) def testGetGroupsNotEmpty(self): - crowdserverstub.add_user_to_group(USER, GROUP) + self.crowd.add_user_to_group(USER, GROUP) result = self.crowd.get_groups(USER) self.assertEqual(set(result), set([GROUP])) - crowdserverstub.remove_user_from_group(USER, GROUP) + self.crowd.remove_user_from_group(USER, GROUP) def testGetNestedGroupsNotEmpty(self): - crowdserverstub.add_user_to_group(USER, GROUP) + self.crowd.add_user_to_group(USER, GROUP) result = self.crowd.get_nested_groups(USER) + self.crowd.remove_user_from_group(USER, GROUP) self.assertEqual(set(result), set([GROUP])) - crowdserverstub.remove_user_from_group(USER, GROUP) def testRemoveUserFromGroup(self): - crowdserverstub.add_user_to_group(USER, GROUP) - crowdserverstub.remove_user_from_group(USER, GROUP) + self.crowd.add_user_to_group(USER, GROUP) + self.crowd.remove_user_from_group(USER, GROUP) result = self.crowd.get_groups(USER) self.assertEqual(set(result), set([])) def testGetNestedGroupUsersNotEmpty(self): - crowdserverstub.add_user_to_group(USER, GROUP) + self.crowd.add_user_to_group(USER, GROUP) result = self.crowd.get_nested_group_users(GROUP) + self.crowd.remove_user_from_group(USER, GROUP) self.assertEqual(set(result), set([USER])) - crowdserverstub.remove_user_from_group(USER, GROUP) def testUserExists(self): result = self.crowd.user_exists(USER) @@ -202,47 +243,37 @@ def testUserAttributesExist(self): self.assertTrue('attributes' in result) def testUserAttributesReturned(self): - crowdserverstub.add_user('attruser', 'mypass', {'something': True}) - result = self.crowd.get_user('attruser') + result = self.crowd.get_user(USER) self.assertIsNotNone(result) self.assertTrue('attributes' in result) - self.assertTrue('something' in result['attributes']) + self.assertTrue('attributes' in result['attributes']) # Yo dawg def testUserCreationSuccess(self): - result = self.crowd.add_user('newuser', - email='me@test.example', - password='hello') + username = USER + "tmp" + result = self.crowd.add_user(username, password=PASS, email=EMAIL) + self.crowd.remove_user(username) self.assertTrue(result) def testUserCreationDuplicate(self): - result = self.crowd.add_user('newuser1', - email='me@test.example', - password='hello') - self.assertTrue(result) - result = self.crowd.add_user('newuser1', - email='me@test.example', - password='hello') - self.assertFalse(result) + with self.assertRaises(crowd.CrowdUserExists): + # USER already created during test setup. + # This is attempting to add the account again. + result = self.crowd.add_user(USER, password=PASS, email=EMAIL) def testUserCreationMissingPassword(self): - def f(): - result = self.crowd.add_user('newuser2', - email='me@test.example') - self.assertRaisesRegexp(ValueError, "missing password", f) + with self.assertRaisesRegexp(ValueError, "missing password"): + result = self.crowd.add_user(USER, email=EMAIL) def testUserCreationMissingEmail(self): - def f(): - result = self.crowd.add_user('newuser', - password='something') - self.assertRaisesRegexp(ValueError, "missing email", f) + with self.assertRaisesRegexp(ValueError, "missing email"): + result = self.crowd.add_user(USER, password=PASS) def testUserCreationInvalidParam(self): - def f(): + with self.assertRaisesRegexp(ValueError, "invalid argument .*"): result = self.crowd.add_user('newuser', - email='me@test.example', - password='hello', + email=EMAIL, + password=PASS, invalid_param='bad argument') - self.assertRaisesRegexp(ValueError, "invalid argument .*", f) if __name__ == "__main__": unittest.main()