Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions fedcloudclient/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
import jwt
import liboidcagent as agent
import requests
import os
import re

from fedcloudclient.conf import CONF as CONF
from fedcloudclient.exception import TokenError
from fedcloudclient.logger import log_and_raise



class Token:
"""
Abstract object for managing tokens
Expand All @@ -23,6 +26,7 @@ def get_token_type(self):
...



class OIDCToken(Token):
"""
OIDC tokens. Managing access tokens, oidc-agent account and mytoken
Expand All @@ -35,6 +39,7 @@ def __init__(self, access_token=None):
self.oidc_agent_account = None
self.mytoken = None
self.user_id = None
self._VO_PATTERN = "urn:mace:egi.eu:group:(.+?):(.+:)*role=member#aai.egi.eu"

def get_token(self):
"""
Expand Down Expand Up @@ -67,6 +72,7 @@ def get_user_id(self) -> str:
Return use ID
:return:
"""

if not self.payload:
self.decode_token()
return self.user_id
Expand All @@ -87,6 +93,8 @@ def get_token_from_oidc_agent(self, oidc_agent_account: str) -> str:
)
self.access_token = access_token
self.oidc_agent_account = oidc_agent_account


return access_token
except agent.OidcAgentError as exception:
error_msg = f"Error getting access token from oidc-agent: {exception}"
Expand Down Expand Up @@ -140,6 +148,9 @@ def multiple_token(self, access_token: str, oidc_agent_account: str, mytoken: st
"""
if mytoken:
try:

"""need to implement from mytoken and check"""

self.get_token_from_mytoken(mytoken)
return
except TokenError:
Expand All @@ -154,3 +165,42 @@ def multiple_token(self, access_token: str, oidc_agent_account: str, mytoken: st
self.access_token = access_token
return
log_and_raise("Cannot get access token", TokenError)

def oidc_discover(self) -> dict:
"""
:param oidc_url: CheckIn URL get from payload
:return: JSON object of OIDC configuration
"""
oidc_url=self.payload["iss"]
request = requests.get(oidc_url + "/.well-known/openid-configuration")
request.raise_for_status()
self.request_json=request.json()
return self.request_json

def token_list_vos(self):
"""
List VO memberships in EGI Check-in
:return: list of VO names
"""

oidc_ep = self.request_json
z_user_info=oidc_ep["userinfo_endpoint"]
z_head={"Authorization": f"Bearer {self.access_token}"}

request = requests.get(
oidc_ep["userinfo_endpoint"],
headers={"Authorization": f"Bearer {self.access_token}"},
)

request.raise_for_status()
vos = set()
pattern = re.compile(self._VO_PATTERN)
for claim in request.json().get("eduperson_entitlement", []):
vo = pattern.match(claim)
if vo:
vos.add(vo.groups()[0])
request.raise_for_status()

return sorted(vos)


96 changes: 88 additions & 8 deletions fedcloudclient/auth_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,103 @@
Testing unit for auth.py
"""
import os
from colorama import init as colorama_init
from colorama import Fore
from colorama import Style

import fedcloudclient.auth as auth
from fedcloudclient.conf import CONF as CONF

VO_PATTERN = "urn:mace:egi.eu:group:(.+?):(.+:)*role=member#aai.egi.eu"

def get_token_from_mytoken_decode_verify(mytoken: str, user_id: str):
def verify_MYTOKEN(mytoken: str) -> str:
"""
Get access token from mytoken server, decode, get user ID and verify
:return:
"""

token = auth.OIDCToken()
token.get_token_from_mytoken(mytoken)
token_id = token.get_user_id()
assert token_id == user_id
try:
access_token_mytoken=token.get_token_from_mytoken(mytoken, None)
return access_token_mytoken
except:
return print(f"No MYTOKEN")


def verify_OIDC_AGENT(user_id:str) -> str:
token = auth.OIDCToken()
try:
access_token_oidc=token.get_token_from_oidc_agent(user_id)
return access_token_oidc
except:
return print(f"No OIDC_AGENT_ACCOUNT")



def verify_ACCESS_TOKEN(access_token:str) -> str:
token = auth.OIDCToken()
try:
token.access_token=access_token
return token.access_token
except:
return print(f"Error with ACCESS_TOKEN")

def verify_user_id(access_token:str) -> str:
token = auth.OIDCToken()
token.access_token=access_token
try:
user_id=token.get_user_id()
return user_id
except:
print("No user_id!")

def verify_pyload(access_token:str) -> dict:
token = auth.OIDCToken()
token.access_token=access_token
#try:
user_id=token.get_user_id()
payload=token.payload
request_json=token.oidc_discover()
list_vos=token.token_list_vos()
return payload,request_json,list_vos
#except:
# print("No user_id!")


def printing_dict(var_dict:dict):
for idx, item in enumerate(var_dict):
print(f"{item}:\t {var_dict[item]}")


if __name__ == "__main__":
mytoken = os.environ["FEDCLOUD_MYTOKEN"]
user_id = os.environ["FEDCLOUD_ID"]
get_token_from_mytoken_decode_verify(mytoken, user_id)
print(f"Start of verifying auth.py")

access_token= os.environ.get("ACCESS_TOKEN","")
access_token_check=verify_ACCESS_TOKEN(access_token)

mytoken=os.environ.get("FEDCLOUD_MYTOKEN","")
access_token_mytok=verify_MYTOKEN(mytoken)

oidc_agent_name=os.environ.get("OIDC_AGENT_ACCOUNT","")
access_token_oidc=verify_OIDC_AGENT(oidc_agent_name)

user_id=verify_user_id(access_token_oidc)
payload,request_json,list_vos=verify_pyload(access_token_oidc)


print(f"{type(payload)}")
printing_dict(payload)
print("-------------------------------------------------")
printing_dict(request_json)
print("-------------------------------------------------")
print(list_vos)
print(f"Break")










9 changes: 6 additions & 3 deletions fedcloudclient/checkin.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def print_error(message, quiet):
print(message, file=sys.stderr)


""" Included in auth.py, line 50"""
def decode_token(oidc_access_token):
"""
Decoding access token to a dict
Expand All @@ -44,6 +45,7 @@ def decode_token(oidc_access_token):
return payload



def oidc_discover(oidc_url):
"""
Discover OIDC endpoints
Expand All @@ -57,6 +59,7 @@ def oidc_discover(oidc_url):
return request.json()


""" Included in auth.py, line 74"""
def get_token_from_oidc_agent(oidc_agent_account, quiet=False):
"""
Get access token from oidc-agent
Expand All @@ -82,6 +85,7 @@ def get_token_from_oidc_agent(oidc_agent_account, quiet=False):
return None


""" Included in auth.py, line 99"""
def get_token_from_mytoken_server(mytoken, mytoken_server, quiet=False):
"""
Get access token from mytoken server
Expand Down Expand Up @@ -149,7 +153,7 @@ def check_token(oidc_token, verbose=False):

def get_checkin_id(
oidc_token,
):
):
"""
Get EGI Check-in ID from access token

Expand All @@ -167,8 +171,7 @@ def get_access_token(
oidc_access_token,
oidc_agent_account,
mytoken,
mytoken_server,
):
mytoken_server,):
"""
Get access token
Generates new access token from oidc-agent
Expand Down
4 changes: 3 additions & 1 deletion fedcloudclient/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import yaml
from tabulate import tabulate

from fedcloudclient.exception import ConfigError
#from fedcloudclient.exception import ConfigError

DEFAULT_CONFIG_LOCATION = Path.home() / ".config/fedcloud/config.yaml"
DEFAULT_SETTINGS = {
Expand Down Expand Up @@ -137,13 +137,15 @@ def create(config_file: str):
envvar="FEDCLOUD_CONFIG_FILE",
show_default=True,
)

@click.option(
"--output-format",
"-f",
required=False,
help="Output format",
type=click.Choice(["text", "YAML", "JSON"], case_sensitive=False),
)

def show(config_file: str, output_format: str):
"""Show actual client configuration """
saved_config = load_config(config_file)
Expand Down
13 changes: 12 additions & 1 deletion fedcloudclient/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def get_shell_type():

return Shell.LINUX


""" Imported to the sites """
def print_set_env_command(name, value):
"""
Print command to set environment variable,
Expand All @@ -62,3 +62,14 @@ def print_comment(comment):
print(f"# {comment!s}")
else:
print(f"rem {comment!s}")


out_1=Shell(1)

print(type(out_1))
print(Shell.LINUX)

print(print_comment({"gewgweg": False}))
print(f"Done")


Loading