Project

General

Profile

Download (6.27 KB) Statistics
| Branch: | Tag: | Revision:
from oic.exception import MissingAttribute
from oic import oic
from oic.oauth2 import rndstr, ErrorResponse
from oic.oic import ProviderConfigurationResponse, AuthorizationResponse
from oic.oic import RegistrationResponse
from oic.oic import AuthorizationRequest
from oic.utils.authn.client import CLIENT_AUTHN_METHOD

import logging

logger = logging.getLogger(__name__)

import conf

class OIDCError(Exception):
pass


class Client(oic.Client):
def __init__(self, client_id=None, ca_certs=None,
client_prefs=None, client_authn_method=None, keyjar=None,
verify_ssl=True, behaviour=None):
oic.Client.__init__(self, client_id, ca_certs, client_prefs,
client_authn_method, keyjar, verify_ssl)
if behaviour:
self.behaviour = behaviour

def create_authn_request(self, session, acr_value=None):
session["state"] = rndstr()
session["nonce"] = rndstr()
request_args = {
"response_type": self.behaviour["response_type"],
"scope": self.behaviour["scope"],
"state": session["state"],
# "nonce": session["nonce"],
"redirect_uri": self.registration_response["redirect_uris"][0]
}

if acr_value is not None:
request_args["acr_values"] = acr_value

cis = self.construct_AuthorizationRequest(request_args=request_args)
logger.debug("request: %s" % cis)

url, body, ht_args, cis = self.uri_and_body(AuthorizationRequest, cis,
method="GET",
request_args=request_args)

logger.debug("body: %s" % body)
logger.info("URL: %s" % url)
logger.debug("ht_args: %s" % ht_args)

return str(url), ht_args

def callback(self, response):
"""
This is the method that should be called when an AuthN response has been
received from the OP.

:param response: The URL returned by the OP
:return:
"""
authresp = self.parse_response(AuthorizationResponse, response,
sformat="dict", keyjar=self.keyjar)

if isinstance(authresp, ErrorResponse):
return OIDCError("Access denied")

try:
self.id_token[authresp["state"]] = authresp["id_token"]
except KeyError:
pass

if self.behaviour["response_type"] == "code":
# get the access token
try:
args = {
"grant_type": "authorization_code",
"code": authresp["code"],
"redirect_uri": self.registration_response[
"redirect_uris"][0],
"client_id": self.client_id,
"client_secret": self.client_secret
}

atresp = self.do_access_token_request(
scope="openid", state=authresp["state"], request_args=args,
authn_method=self.registration_response["token_endpoint_auth_method"])
except Exception as err:
logger.error("%s" % err)
raise

if isinstance(atresp, ErrorResponse):
raise OIDCError("Invalid response %s." % atresp["error"])

inforesp = self.do_user_info_request(state=authresp["state"],
behavior='use_authorization_header')

if isinstance(inforesp, ErrorResponse):
raise OIDCError("Invalid response %s." % inforesp["error"])

userinfo = inforesp.to_dict()

logger.debug("UserInfo: %s" % inforesp)

return userinfo

def create_client(**kwargs):
"""
kwargs = config.CLIENT.iteritems
"""
_key_set = set(kwargs.keys())
args = {}
for param in ["verify_ssl"]:
try:
args[param] = kwargs[param]
except KeyError:
pass
else:
_key_set.discard(param)

client = Client(client_authn_method=CLIENT_AUTHN_METHOD,
behaviour=kwargs["behaviour"],
verify_ssl=conf.VERIFY_SSL, **args)

# The behaviour parameter is not significant for the election process
_key_set.discard("behaviour")
for param in ["allow"]:
try:
setattr(client, param, kwargs[param])
except KeyError:
pass
else:
_key_set.discard(param)

if _key_set == set(["client_info"]): # Everything dynamic
# There has to be a userid
if not userid:
raise MissingAttribute("Missing userid specification")

# Find the service that provides information about the OP
issuer = client.wf.discovery_query(userid)
# Gather OP information
_ = client.provider_config(issuer)
# register the client
_ = client.register(client.provider_info["registration_endpoint"],
**kwargs["client_info"])
elif _key_set == set(["client_info", "srv_discovery_url"]):
# Ship the webfinger part
# Gather OP information
_ = client.provider_config(kwargs["srv_discovery_url"])
# register the client
_ = client.register(client.provider_info["registration_endpoint"],
**kwargs["client_info"])
elif _key_set == set(["provider_info", "client_info"]):
client.handle_provider_config(
ProviderConfigurationResponse(**kwargs["provider_info"]),
kwargs["provider_info"]["issuer"])
_ = client.register(client.provider_info["registration_endpoint"],
**kwargs["client_info"])
elif _key_set == set(["provider_info", "client_registration"]):
client.handle_provider_config(
ProviderConfigurationResponse(**kwargs["provider_info"]),
kwargs["provider_info"]["issuer"])
client.store_registration_info(RegistrationResponse(
**kwargs["client_registration"]))
elif _key_set == set(["srv_discovery_url", "client_registration"]):
_ = client.provider_config(kwargs["srv_discovery_url"])
client.store_registration_info(RegistrationResponse(
**kwargs["client_registration"]))
else:
raise Exception("Configuration error ?")

return client
(3-3/4)