Skip to content

Commit

Permalink
Test for user admin roles
Browse files Browse the repository at this point in the history
Change-Id: I8a94a510da05cee5d232761e9102e9dd0bc78e5b
Reviewed-on: https://review.couchbase.org/c/TAF/+/222814
Tested-by: Build Bot <[email protected]>
Reviewed-by: shaazin19 <[email protected]>
  • Loading branch information
mohsin-couchbase committed Feb 6, 2025
1 parent 05b21e0 commit 144b94a
Show file tree
Hide file tree
Showing 3 changed files with 716 additions and 3 deletions.
8 changes: 8 additions & 0 deletions conf/security/py-user_admin_role.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
security.user_admin_role.UserAdminRole:
test_local_user_management,nodes_init=1
test_group_management,nodes_init=1
test_external_user_management,nodes_init=1
test_privilege_escalation,nodes_init=1
test_user_backup_restore,nodes_init=1
test_security_admin_permissions,nodes_init=1
test_security_admin_general_security_apis,nodes_init=1
84 changes: 81 additions & 3 deletions lib/membase/api/rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3514,10 +3514,16 @@ def retrive_all_user_role(self):

'Get list of current users and rols assigned to them'

def retrieve_user_roles(self):
def retrieve_user_roles(self, username=None, password=None):
if not username:
username = self.username
if not password:
password = self.password
headers = self._create_headers(username, password)

url = "settings/rbac/users"
api = self.baseUrl + url
status, content, header = self._http_request(api, 'GET')
status, content, header = self._http_request(api, 'GET', headers=headers)
if not status:
raise Exception(content)
return json.loads(content)
Expand Down Expand Up @@ -3571,7 +3577,7 @@ def add_external_user(self, username, roles):
def check_user_permission(self, user_id, password, permission_set):
url = "pools/default/checkPermissions/"
api = self.baseUrl + url
authorization = base64.encodestring('%s:%s' % (user_id, password))
authorization = base64.b64encode('%s:%s' % (user_id, password))
header = {'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': 'Basic %s' % authorization,
'Accept': '*/*'}
Expand All @@ -3587,6 +3593,7 @@ def check_user_permission(self, user_id, password, permission_set):
if roles=<empty> user will be created with no roles'''

def add_set_builtin_user(self, user_id, payload):

url = "settings/rbac/users/local/" + user_id
api = self.baseUrl + url
status, content, header = self._http_request(api, 'PUT', payload)
Expand All @@ -3599,22 +3606,71 @@ def add_set_builtin_user(self, user_id, payload):
'''

def delete_builtin_user(self, user_id):

url = "settings/rbac/users/local/" + user_id
api = self.baseUrl + url
status, content, header = self._http_request(api, 'DELETE')
if not status:
self.log.error("%s - %s" % (user_id, content))
raise Exception(content)
return json.loads(content)

'''
Delete external user
'''
def delete_external_user(self, user_id):

url = "settings/rbac/users/external/" + user_id
api = self.baseUrl + url
status, content, header = self._http_request(api, 'DELETE')
if not status:
self.log.error("%s - %s" % (user_id, content))
raise Exception(content)
return json.loads(content)

def get_builtin_user(self, user_id):
""" Gets the user's rbac settings """

url = "settings/rbac/users/local/" + user_id
api = self.baseUrl + url
status, content, _ = self._http_request(api, 'GET')
if not status:
raise Exception(content)
return json.loads(content)

'''
Get external user
'''
def get_external_user(self, user_id):

url = "settings/rbac/users/local/" + user_id
api = self.baseUrl + url
status, content, _ = self._http_request(api, 'GET')
if not status:
raise Exception(content)
return json.loads(content)

def backup_users(self):
url = "settings/rbac/backup"
api = self.baseUrl + url
status, content, headers = self._http_request(api, 'GET')
if not status:
raise Exception(content)
return json.loads(content)

def restore_users(self, backup_data):
url = "settings/rbac/backup"
api = self.baseUrl + url
json_data = json.dumps(backup_data)
payload = {
"backup": json_data
}
payload = urllib.urlencode(payload)
status, content, header = self._http_request(api, 'PUT', payload)
if not status:
raise Exception(content)
return json.loads(content)

'''
Update user password
'''
Expand All @@ -3634,14 +3690,36 @@ def add_set_bulitin_group(self, group_name, payload):
status, content, header = self._http_request(api, 'PUT', payload)
if not status:
self.log.error("%s - %s" % (group_name, content))
raise Exception(content)
return json.loads(content)

def get_builtin_group(self, group_name):

url = "settings/rbac/groups/" + group_name
api = self.baseUrl + url
status, content, header = self._http_request(api, 'GET')
if not status:
self.log.error("%s - %s" % (group_name, content))
raise Exception(content)
return json.loads(content)

def list_groups(self):

url = "settings/rbac/groups"
api = self.baseUrl + url
status, content, header = self._http_request(api, 'GET')
if not status:
raise Exception(content)
return json.loads(content)

def delete_builtin_group(self, group_name):

url = "settings/rbac/groups/" + group_name
api = self.baseUrl + url
status, content, header = self._http_request(api, 'DELETE')
if not status:
self.log.error("%s - %s" % (group_name, content))
raise Exception(content)
return json.loads(content)

def change_password_policy(self, min_length, enforce_uppercase="false",
Expand Down
Loading

0 comments on commit 144b94a

Please sign in to comment.