@@ -64,25 +64,26 @@ def get_client_policies(self, client_id):
64
64
"cid" : self .get_client_id (client_id )}
65
65
data_raw = self .raw_get (url .format (** params_path ))
66
66
return raise_error_from_response (data_raw , KeycloakGetError )
67
-
67
+
68
68
def create_policy (self , client_id , policyname , rolename ):
69
69
try :
70
70
rid = self .get_role_id (rolename )
71
71
payload = {"type" : "role" , "logic" : "POSITIVE" ,
72
- "decisionStrategy" : "UNANIMOUS " , "name" : policyname , "roles" : [{"id" : rid }]}
72
+ "decisionStrategy" : "AFFIRMATIVE " , "name" : policyname , "roles" : [{"id" : rid }]}
73
73
cid = self .get_client_id (client_id )
74
74
url = "admin/realms/{realm-name}/clients/{cid}/authz/resource-server/policy/role"
75
75
params_path = {"realm-name" : self .realm_name , "cid" : cid }
76
76
data_raw = self .raw_post (url .format (** params_path ),
77
- data = json .dumps (payload ))
77
+ data = json .dumps (payload ))
78
78
return raise_error_from_response (data_raw , KeycloakGetError )
79
79
except Exception as e :
80
80
return self .errormsg (str (e ))
81
81
82
82
def delete_policy (self , client_id , policyname ):
83
83
try :
84
84
url = 'admin/realms/{realm-name}/clients/{cid}/authz/resource-server/policy/{pid}'
85
- params_path = {"realm-name" : self .realm_name , "cid" : self .get_client_id (client_id ), 'pid' : self .get_policy_id (client_id , policyname )}
85
+ params_path = {"realm-name" : self .realm_name , "cid" : self .get_client_id (
86
+ client_id ), 'pid' : self .get_policy_id (client_id , policyname )}
86
87
rawdata = self .raw_delete (url .format (** params_path ))
87
88
return raise_error_from_response (rawdata , KeycloakGetError )
88
89
except Exception as e :
@@ -105,7 +106,7 @@ def create_client_permission(self, client_id, resource):
105
106
try :
106
107
cid = self .get_client_id (client_id )
107
108
url = "admin/realms/{realm-name}/clients/{cid}/authz/resource-server/permission/resource"
108
- payload = {"type" : "resource" , "logic" : "POSITIVE" , "decisionStrategy" : "UNANIMOUS " ,
109
+ payload = {"type" : "resource" , "logic" : "POSITIVE" , "decisionStrategy" : "AFFIRMATIVE " ,
109
110
"name" : resource , "resources" : [self .get_resource_id (client_id , resource )], "policies" : []}
110
111
params_path = {"realm-name" : self .realm_name , "cid" : cid }
111
112
data_raw = self .raw_post (url .format (
@@ -120,7 +121,8 @@ def get_resources(self, client_id):
120
121
121
122
def get_resource_id (self , client_id , resourcename ):
122
123
url = "admin/realms/{realm-name}/clients/{cid}/authz/resource-server/resource/search?name={rn}"
123
- params_path = {'realm-name' :self .realm_name , 'cid' : self .get_client_id (client_id ), 'rn' : resourcename }
124
+ params_path = {'realm-name' : self .realm_name ,
125
+ 'cid' : self .get_client_id (client_id ), 'rn' : resourcename }
124
126
raw_data = self .raw_get (url .format (** params_path ))
125
127
if raw_data .status_code == 200 :
126
128
return raw_data .json ()['_id' ]
@@ -134,20 +136,29 @@ def get_permissions(self, client_id, permission=None):
134
136
params_path = {"realm-name" : self .realm_name , "cid" : cid }
135
137
else :
136
138
url = 'admin/realms/{realm-name}/clients/{cid}/authz/resource-server/policy/search?name={pname}'
137
- params_path = {'realm-name' : self .realm_name , 'cid' : cid , 'pname' :permission }
139
+ params_path = {'realm-name' : self .realm_name ,
140
+ 'cid' : cid , 'pname' : permission }
138
141
raw_data = self .raw_get (url .format (** params_path ))
139
142
return raise_error_from_response (raw_data , KeycloakGetError )
140
143
144
+ def delete_permission (self , client_id , permission ):
145
+ url = 'admin/realms/{realm-admin}/clients/{cid}/authz/resource-server/permission/{pid}'
146
+ params_path = {'realm-name' : self .realm_name , 'cid' : self .get_client_id (
147
+ client_id ), 'pid' : self .get_permissions (client_id , permission )['id' ]}
148
+ raw_data = self .raw_delete (url .format (** params_path ))
149
+ return raise_error_from_response (raw_data , KeycloakGetError )
150
+
141
151
# 角色和权限操作
142
152
def op_permission_with_role (self , client_id , permission , role , op = "assign" ):
143
153
url = 'admin/realms/{realm-name}/clients/{cid}/authz/resource-server/permission/resource/{pid}'
144
154
perm = self .get_permissions (client_id , permission )
145
155
src_roles = self .get_permission_roles (client_id , permission )
146
156
src_role_ids = [rid ['id' ] for rid in src_roles ]
147
157
npoid = self .get_policy_id (client_id , role )
148
- payload = {"id" : perm ['id' ], "name" : permission , "type" : "resource" , "logic" : "POSITIVE" , "decisionStrategy" : "UNANIMOUS" }
158
+ payload = {"id" : perm ['id' ], "name" : permission , "type" : "resource" ,
159
+ "logic" : "POSITIVE" , "decisionStrategy" : "AFFIRMATIVE" }
149
160
params_path = {'realm-name' : self .realm_name ,
150
- 'pid' : perm ['id' ], 'cid' : self .get_client_id (client_id )}
161
+ 'pid' : perm ['id' ], 'cid' : self .get_client_id (client_id )}
151
162
if op == "assign" :
152
163
if npoid in src_role_ids :
153
164
return {}
@@ -160,23 +171,33 @@ def op_permission_with_role(self, client_id, permission, role, op="assign"):
160
171
src_role_ids .remove (npoid )
161
172
payload ["resources" ] = [self .get_resource_id (client_id , permission )]
162
173
payload ["policies" ] = src_role_ids
163
- data_raw = self .raw_put (url .format (** params_path ), data = json .dumps (payload ))
174
+ data_raw = self .raw_put (url .format (
175
+ ** params_path ), data = json .dumps (payload ))
164
176
return raise_error_from_response (data_raw , KeycloakGetError )
165
177
166
- # def get_role_permissions(self, rolename):
167
- # rid = self.get_role_id(rolename)
168
- # if not rid:
169
- # return self.errormsg("role {} not found".format(rolename))
170
- # url = "admin/realms/{realm-name}/groups/{rid}/role-mappings/clients/{cid}"
171
- # self.get_admin_cid()
172
- # params_path = {"realm-name": self.realm_name,
173
- # "rid": rid, "cid": self.cid}
174
- # data_raw = self.raw_get(url.format(**params_path))
175
- # return raise_error_from_response(data_raw, KeycloakGetError, expected_code=200)
178
+ def get_role_permissions (self , client_id , rolename ):
179
+ client_settings = self .get_client_authz_settings (
180
+ self .get_client_id (client_id )).json ()
181
+ policies = client_settings .get ("policies" )
182
+ permissions = []
183
+ role = None
184
+ for rolep in policies :
185
+ if rolep ['type' ] == 'role' and not role :
186
+ if rolep ['name' ] == rolename :
187
+ role = rolep
188
+ else :
189
+ if rolep ['type' ] == 'resource' :
190
+ rolepolicies = rolep ['config' ].get (
191
+ 'applyPolicies' ) if rolep ['config' ].get ('applyPolicies' ) else []
192
+ permissions .extend (
193
+ [{'id' : rolep ['id' ], 'name' : rolep ['name' ]}] if rolename in rolepolicies else [])
194
+ role ['permissions' ] = permissions
195
+ return role
176
196
177
197
def get_permission_roles (self , client_id , permission ):
178
198
url = 'admin/realms/{realm-name}/clients/{cid}/authz/resource-server/policy/{pid}/associatedPolicies'
179
- params_path = {'realm-name' : self .realm_name , 'cid' : self .get_client_id (client_id ), 'pid' : self .get_permissions (client_id , permission )['id' ]}
199
+ params_path = {'realm-name' : self .realm_name , 'cid' : self .get_client_id (
200
+ client_id ), 'pid' : self .get_permissions (client_id , permission )['id' ]}
180
201
rawdata = self .raw_get (url .format (** params_path ))
181
202
return raise_error_from_response (rawdata , KeycloakGetError )
182
203
0 commit comments