From cde5ba386e22fae9fa3d1f83792809428c88ca84 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Thu, 31 Mar 2016 11:59:58 +0200 Subject: [PATCH 3/4] move API signing functions in their own module (#10444) Having them in the api module leads to circular imports. Also get_secret() has been rewritten as get_secret_and_orig(). --- wcs/api.py | 108 +----------------------------------------- wcs/api_utils.py | 139 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ wcs/wf/roles.py | 17 +------ 3 files changed, 142 insertions(+), 122 deletions(-) create mode 100644 wcs/api_utils.py diff --git a/wcs/api.py b/wcs/api.py index faf82ef..237d874 100644 --- a/wcs/api.py +++ b/wcs/api.py @@ -14,16 +14,10 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, see . -import base64 -import hmac -import hashlib import json -import datetime import time -import urllib import urllib2 import urlparse -import random import sys from quixote import get_request, get_publisher, get_response @@ -37,108 +31,8 @@ from wcs.formdef import FormDef from wcs.roles import Role, logged_users_role from wcs.forms.common import FormStatusPage import wcs.qommon.storage as st +from wcs.api_utils import is_url_signed, get_user_from_api_query_string, sign_url - -def is_url_signed(): - query_string = get_request().get_query() - if not query_string: - return False - signature = get_request().form.get('signature') - if not isinstance(signature, basestring): - return False - # verify signature - orig = get_request().form.get('orig') - if not isinstance(orig, basestring): - raise AccessForbiddenError('missing/multiple orig field') - key = get_publisher().get_site_option(orig, 'api-secrets') - if not key: - raise AccessForbiddenError('invalid orig') - algo = get_request().form.get('algo') - if not isinstance(algo, basestring): - raise AccessForbiddenError('missing/multiple algo field') - try: - algo = getattr(hashlib, algo) - except AttributeError: - raise AccessForbiddenError('invalid algo') - if signature != base64.standard_b64encode(hmac.new(key, - query_string[:query_string.find('&signature=')], - algo).digest()): - raise AccessForbiddenError('invalid signature') - timestamp = get_request().form.get('timestamp') - if not isinstance(timestamp, basestring): - raise AccessForbiddenError('missing/multiple timestamp field') - try: - delta = (datetime.datetime.utcnow().replace(tzinfo=None) - - datetime.datetime.strptime(timestamp, - '%Y-%m-%dT%H:%M:%SZ')) - except ValueError: - raise AccessForbiddenError('invalid timestamp field') - MAX_DELTA = 30 - if abs(delta) > datetime.timedelta(seconds=MAX_DELTA): - raise AccessForbiddenError('timestamp delta is more ' - 'than %s seconds: %s seconds' % (MAX_DELTA, delta)) - return True - -def get_user_from_api_query_string(): - if not is_url_signed(): - return None - # Signature is good. Now looking for the user, by email/NameID. - user = None - if get_request().form.get('email'): - email = get_request().form.get('email') - if not isinstance(email, basestring): - raise AccessForbiddenError('multiple email field') - users = list(get_publisher().user_class.get_users_with_email(email)) - if users: - user = users[0] - else: - raise AccessForbiddenError('unknown email') - elif get_request().form.get('NameID'): - ni = get_request().form.get('NameID') - if not isinstance(ni, basestring): - raise AccessForbiddenError('multiple NameID field') - users = list(get_publisher().user_class.get_users_with_name_identifier(ni)) - if users: - user = users[0] - else: - raise UnknownNameIdAccessForbiddenError('unknown NameID') - elif 'email' in get_request().form or 'NameID' in get_request().form: - # email or NameID were given as empty to the query string, this maps - # the anonymous user case. - return False - - return user - -def sign_url(url, key, algo='sha256', timestamp=None, nonce=None): - parsed = urlparse.urlparse(url) - new_query = sign_query(parsed.query, key, algo, timestamp, nonce) - return urlparse.urlunparse(parsed[:4] + (new_query,) + parsed[5:]) - -def sign_query(query, key, algo='sha256', timestamp=None, nonce=None): - if timestamp is None: - timestamp = datetime.datetime.utcnow() - timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ') - if nonce is None: - nonce = hex(random.getrandbits(128))[2:-1] - new_query = query - if new_query: - new_query += '&' - new_query += urllib.urlencode(( - ('algo', algo), - ('timestamp', timestamp), - ('nonce', nonce))) - signature = base64.b64encode(sign_string(new_query, key, algo=algo)) - new_query += '&signature=' + urllib.quote(signature) - return new_query - -def sign_string(s, key, algo='sha256', timedelta=30): - digestmod = getattr(hashlib, algo) - hash = hmac.HMAC(key, digestmod=digestmod, msg=s) - return hash.digest() - - -# import backoffice.root.FormPage after get_user_from_api_query_string -# to avoid circular dependencies from backoffice.management import FormPage as BackofficeFormPage diff --git a/wcs/api_utils.py b/wcs/api_utils.py new file mode 100644 index 0000000..3d81581 --- /dev/null +++ b/wcs/api_utils.py @@ -0,0 +1,139 @@ +# w.c.s. - web application for online forms +# Copyright (C) 2005-2013 Entr'ouvert +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, see . + +import base64 +import hmac +import hashlib +import datetime +import urllib +import urlparse +import random + +from quixote import get_request, get_publisher +from qommon.errors import (AccessForbiddenError, UnknownNameIdAccessForbiddenError) + + +def is_url_signed(): + query_string = get_request().get_query() + if not query_string: + return False + signature = get_request().form.get('signature') + if not isinstance(signature, basestring): + return False + # verify signature + orig = get_request().form.get('orig') + if not isinstance(orig, basestring): + raise AccessForbiddenError('missing/multiple orig field') + key = get_publisher().get_site_option(orig, 'api-secrets') + if not key: + raise AccessForbiddenError('invalid orig') + algo = get_request().form.get('algo') + if not isinstance(algo, basestring): + raise AccessForbiddenError('missing/multiple algo field') + try: + algo = getattr(hashlib, algo) + except AttributeError: + raise AccessForbiddenError('invalid algo') + if signature != base64.standard_b64encode( + hmac.new(key, query_string[:query_string.find('&signature=')], algo).digest()): + raise AccessForbiddenError('invalid signature') + timestamp = get_request().form.get('timestamp') + if not isinstance(timestamp, basestring): + raise AccessForbiddenError('missing/multiple timestamp field') + try: + delta = (datetime.datetime.utcnow().replace(tzinfo=None) - + datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')) + except ValueError: + raise AccessForbiddenError('invalid timestamp field') + MAX_DELTA = 30 + if abs(delta) > datetime.timedelta(seconds=MAX_DELTA): + raise AccessForbiddenError('timestamp delta is more than %s seconds: %s seconds' + % (MAX_DELTA, delta)) + return True + + +def get_user_from_api_query_string(): + if not is_url_signed(): + return None + # Signature is good. Now looking for the user, by email/NameID. + user = None + if get_request().form.get('email'): + email = get_request().form.get('email') + if not isinstance(email, basestring): + raise AccessForbiddenError('multiple email field') + users = list(get_publisher().user_class.get_users_with_email(email)) + if users: + user = users[0] + else: + raise AccessForbiddenError('unknown email') + elif get_request().form.get('NameID'): + ni = get_request().form.get('NameID') + if not isinstance(ni, basestring): + raise AccessForbiddenError('multiple NameID field') + users = list(get_publisher().user_class.get_users_with_name_identifier(ni)) + if users: + user = users[0] + else: + raise UnknownNameIdAccessForbiddenError('unknown NameID') + elif 'email' in get_request().form or 'NameID' in get_request().form: + # email or NameID were given as empty to the query string, this maps + # the anonymous user case. + return False + + return user + + +def sign_url(url, key, algo='sha256', timestamp=None, nonce=None): + parsed = urlparse.urlparse(url) + new_query = sign_query(parsed.query, key, algo, timestamp, nonce) + return urlparse.urlunparse(parsed[:4] + (new_query,) + parsed[5:]) + + +def sign_query(query, key, algo='sha256', timestamp=None, nonce=None): + if timestamp is None: + timestamp = datetime.datetime.utcnow() + timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ') + if nonce is None: + nonce = hex(random.getrandbits(128))[2:-1] + new_query = query + if new_query: + new_query += '&' + new_query += urllib.urlencode(( + ('algo', algo), + ('timestamp', timestamp), + ('nonce', nonce))) + signature = base64.b64encode(sign_string(new_query, key, algo=algo)) + new_query += '&signature=' + urllib.quote(signature) + return new_query + + +def sign_string(s, key, algo='sha256', timedelta=30): + digestmod = getattr(hashlib, algo) + hash = hmac.HMAC(key, digestmod=digestmod, msg=s) + return hash.digest() + + +class MissingSecret(Exception): + pass + + +def get_secret_and_orig(url): + orig = get_request().get_server().split(':')[0] + target_orig = urlparse.urlparse(url).netloc.rsplit('@', 1)[-1].rsplit(':', 1)[0] + secret = get_publisher().get_site_option(target_orig, 'wscall-secrets') + if not secret: + raise MissingSecret() + return secret, orig diff --git a/wcs/wf/roles.py b/wcs/wf/roles.py index 097ebf9..fb5cdb1 100644 --- a/wcs/wf/roles.py +++ b/wcs/wf/roles.py @@ -25,19 +25,7 @@ from wcs.roles import get_user_roles, Role from qommon.ident.idp import is_idp_managing_user_attributes from qommon.misc import http_post_request, http_delete_request from qommon.publisher import get_cfg, get_logger -from wcs.api import sign_url - - -class MissingSecret(Exception): - pass - - -def get_secret(url): - domain = urlparse.urlparse(url).netloc.split(':')[0] - secret = get_publisher().get_site_option(domain, 'wscall-secrets') - if not secret: - raise MissingSecret() - return secret +from wcs.api_utils import sign_url, get_secret_and_orig def roles_ws_url(role_uuid, user_uuid): @@ -46,8 +34,7 @@ def roles_ws_url(role_uuid, user_uuid): base_url = entity_id.split('idp/saml2/metadata')[0] url = urlparse.urljoin(base_url, '/api/roles/%s/members/%s/' % (urllib.quote(role_uuid), urllib.quote(user_uuid))) - secret = get_secret(url) - orig = get_request().get_server().split(':')[0] + secret, orig = get_secret_and_orig(url) url += '?orig=%s' % orig return sign_url(url, secret) -- 2.1.4