Source code for fedcloudclient.checkin

"""
Implementation of "fedcloud token" commands for interactions with EGI Check-in and
access tokens
"""
import re
import sys
import time
from datetime import datetime

import click
import jwt
import liboidcagent as agent
import requests

from fedcloudclient.decorators import (
    oidc_params,
)

# Minimal lifetime of the access token is 30s and max 24h
_MIN_ACCESS_TOKEN_TIME = 30

VO_PATTERN = "urn:mace:egi.eu:group:(.+?):(.+:)*role=member#aai.egi.eu"





[docs]def decode_token(oidc_access_token): """ Decoding access token to a dict :param oidc_access_token: :return: dict with token info """ try: payload = jwt.decode(oidc_access_token, options={"verify_signature": False}) except jwt.exceptions.InvalidTokenError: print_error("Error: Invalid access token.", False) return None return payload
[docs]def oidc_discover(oidc_url): """ Discover OIDC endpoints :param oidc_url: CheckIn URL :return: JSON object of OIDC configuration """ request = requests.get(oidc_url + "/.well-known/openid-configuration") request.raise_for_status() return request.json()
[docs]def get_token_from_oidc_agent(oidc_agent_account, quiet=False): """ Get access token from oidc-agent :param quiet: :param oidc_agent_account: account name in oidc-agent :return: access token, or None on error """ if oidc_agent_account: try: access_token = agent.get_access_token( oidc_agent_account, min_valid_period=_MIN_ACCESS_TOKEN_TIME, application_hint="fedcloudclient", ) return access_token except agent.OidcAgentError as exception: print_error( "Error getting access token from oidc-agent\n" f"Error message: {exception}", quiet, ) return None
[docs]def get_token_from_mytoken_server(mytoken, mytoken_server, quiet=False): """ Get access token from mytoken server :param quiet: :param mytoken: :param mytoken_server: :return: access token, or None on error """ if mytoken: try: data = { "grant_type": "mytoken", "mytoken": mytoken, } req = requests.post( mytoken_server + "/api/v0/token/access", json=data, ) req.raise_for_status() return req.json().get("access_token") except requests.exceptions.HTTPError as exception: print_error( "Error getting access token from mytoken\n" f"Error message: {exception}", quiet, ) return None
[docs]def check_token(oidc_token, verbose=False): """ Check validity of access token :param verbose: :param oidc_token: the token to check :return: access token, or None on error """ payload = decode_token(oidc_token) if payload is None: return None exp_timestamp = int(payload["exp"]) current_timestamp = int(time.time()) exp_time_in_sec = exp_timestamp - current_timestamp if exp_time_in_sec < _MIN_ACCESS_TOKEN_TIME: print_error("Error: Expired access token.", False) return None if verbose: exp_time_str = datetime.utcfromtimestamp(exp_timestamp).strftime( "%Y-%m-%d %H:%M:%S" ) print(f"Token is valid until {exp_time_str} UTC") if exp_time_in_sec < 24 * 3600: print(f"Token expires in {exp_time_in_sec} seconds") else: exp_time_in_days = exp_time_in_sec // (24 * 3600) print(f"Token expires in {exp_time_in_days} days") return oidc_token
[docs]def get_checkin_id( oidc_token, ): """ Get EGI Check-in ID from access token :param oidc_token: the token :return: Check-in ID """ payload = decode_token(oidc_token) if payload is None: return None return payload["sub"]
[docs]def get_access_token( oidc_access_token, oidc_agent_account, mytoken, mytoken_server, ): """ Get access token Generates new access token from oidc-agent or mytoken Check expiration time of access token Raise error if no valid token exists :param oidc_access_token: :param oidc_agent_account: :param mytoken: :param mytoken_server: :return: access token """ access_token = None # access token via parameter has the highest priority if oidc_access_token: access_token = check_token(oidc_access_token) # then try to get access token from mytoken server if mytoken and access_token is None: access_token = get_token_from_mytoken_server( mytoken, mytoken_server, quiet=False ) # then, try to get access token from oidc-agent if oidc_agent_account and access_token is None: access_token = get_token_from_oidc_agent(oidc_agent_account, quiet=False) if access_token is None: # Nothing available raise SystemExit( "Error: An access token is needed for the operation. You can specify " "access token directly via --oidc-access-token option or use oidc-agent " "via --oidc-agent-account or mytoken via --mytoken" ) return access_token
[docs]def token_list_vos(oidc_access_token): """ List VO memberships in EGI Check-in :param oidc_access_token: :return: list of VO names """ oidc_url = decode_token(oidc_access_token)["iss"] oidc_ep = oidc_discover(oidc_url) request = requests.get( oidc_ep["userinfo_endpoint"], headers={"Authorization": f"Bearer {oidc_access_token}"}, ) request.raise_for_status() vos = set() pattern = re.compile(VO_PATTERN) for claim in request.json().get("eduperson_entitlement", []): vo = pattern.match(claim) if vo: vos.add(vo.groups()[0]) return sorted(vos)
@click.group() def token(): """ Get details of access token """ @token.command() @oidc_params def check(access_token): """ Check validity of access token """ check_token(access_token, verbose=True) @token.command() @oidc_params def list_vos(access_token): """ List VO membership(s) of access token """ vos = token_list_vos(access_token) print("\n".join(vos)) @token.command() @oidc_params def issue(access_token): """ print access token (from mytoken or oidc-agent) """ print(access_token)