Projet

Général

Profil

0003-move-API-signing-functions-in-their-own-module-10444.patch

Benjamin Dauvergne, 12 avril 2016 11:06

Télécharger (12,1 ko)

Voir les différences:

Subject: [PATCH 3/7] move API signing functions in their own module (#10444)

Having them in the api module leads to circular imports.
Also get_secret() has been rewritten as get_secret_and_orig().
 wcs/api.py       | 108 +-----------------------------------------
 wcs/api_utils.py | 139 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 wcs/wf/roles.py  |  17 +------
 3 files changed, 142 insertions(+), 122 deletions(-)
 create mode 100644 wcs/api_utils.py
wcs/api.py
14 14
# You should have received a copy of the GNU General Public License
15 15
# along with this program; if not, see <http://www.gnu.org/licenses/>.
16 16

  
17
import base64
18
import hmac
19
import hashlib
20 17
import json
21
import datetime
22 18
import time
23
import urllib
24 19
import urllib2
25 20
import urlparse
26
import random
27 21
import sys
28 22

  
29 23
from quixote import get_request, get_publisher, get_response
......
37 31
from wcs.roles import Role, logged_users_role
38 32
from wcs.forms.common import FormStatusPage
39 33
import wcs.qommon.storage as st
34
from wcs.api_utils import is_url_signed, get_user_from_api_query_string, sign_url
40 35

  
41

  
42
def is_url_signed():
43
    query_string = get_request().get_query()
44
    if not query_string:
45
        return False
46
    signature = get_request().form.get('signature')
47
    if not isinstance(signature, basestring):
48
        return False
49
    # verify signature
50
    orig = get_request().form.get('orig')
51
    if not isinstance(orig, basestring):
52
        raise AccessForbiddenError('missing/multiple orig field')
53
    key = get_publisher().get_site_option(orig, 'api-secrets')
54
    if not key:
55
        raise AccessForbiddenError('invalid orig')
56
    algo = get_request().form.get('algo')
57
    if not isinstance(algo, basestring):
58
        raise AccessForbiddenError('missing/multiple algo field')
59
    try:
60
        algo = getattr(hashlib, algo)
61
    except AttributeError:
62
        raise AccessForbiddenError('invalid algo')
63
    if signature != base64.standard_b64encode(hmac.new(key,
64
                            query_string[:query_string.find('&signature=')],
65
                            algo).digest()):
66
        raise AccessForbiddenError('invalid signature')
67
    timestamp = get_request().form.get('timestamp')
68
    if not isinstance(timestamp, basestring):
69
        raise AccessForbiddenError('missing/multiple timestamp field')
70
    try:
71
        delta = (datetime.datetime.utcnow().replace(tzinfo=None) -
72
                 datetime.datetime.strptime(timestamp,
73
                         '%Y-%m-%dT%H:%M:%SZ'))
74
    except ValueError:
75
        raise AccessForbiddenError('invalid timestamp field')
76
    MAX_DELTA = 30
77
    if abs(delta) > datetime.timedelta(seconds=MAX_DELTA):
78
        raise AccessForbiddenError('timestamp delta is more '
79
                'than %s seconds: %s seconds' % (MAX_DELTA, delta))
80
    return True
81

  
82
def get_user_from_api_query_string():
83
    if not is_url_signed():
84
        return None
85
    # Signature is good. Now looking for the user, by email/NameID.
86
    user = None
87
    if get_request().form.get('email'):
88
        email = get_request().form.get('email')
89
        if not isinstance(email, basestring):
90
            raise AccessForbiddenError('multiple email field')
91
        users = list(get_publisher().user_class.get_users_with_email(email))
92
        if users:
93
            user = users[0]
94
        else:
95
            raise AccessForbiddenError('unknown email')
96
    elif get_request().form.get('NameID'):
97
        ni = get_request().form.get('NameID')
98
        if not isinstance(ni, basestring):
99
            raise AccessForbiddenError('multiple NameID field')
100
        users = list(get_publisher().user_class.get_users_with_name_identifier(ni))
101
        if users:
102
            user = users[0]
103
        else:
104
            raise UnknownNameIdAccessForbiddenError('unknown NameID')
105
    elif 'email' in get_request().form or 'NameID' in get_request().form:
106
        # email or NameID were given as empty to the query string, this maps
107
        # the anonymous user case.
108
        return False
109

  
110
    return user
111

  
112
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
113
    parsed = urlparse.urlparse(url)
114
    new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
115
    return urlparse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
116

  
117
def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
118
    if timestamp is None:
119
        timestamp = datetime.datetime.utcnow()
120
    timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
121
    if nonce is None:
122
        nonce = hex(random.getrandbits(128))[2:-1]
123
    new_query = query
124
    if new_query:
125
        new_query += '&'
126
    new_query += urllib.urlencode((
127
        ('algo', algo),
128
        ('timestamp', timestamp),
129
        ('nonce', nonce)))
130
    signature = base64.b64encode(sign_string(new_query, key, algo=algo))
131
    new_query += '&signature=' + urllib.quote(signature)
132
    return new_query
133

  
134
def sign_string(s, key, algo='sha256', timedelta=30):
135
    digestmod = getattr(hashlib, algo)
136
    hash = hmac.HMAC(key, digestmod=digestmod, msg=s)
137
    return hash.digest()
138

  
139

  
140
# import backoffice.root.FormPage after get_user_from_api_query_string
141
# to avoid circular dependencies
142 36
from backoffice.management import FormPage as BackofficeFormPage
143 37

  
144 38

  
wcs/api_utils.py
1
# w.c.s. - web application for online forms
2
# Copyright (C) 2005-2013  Entr'ouvert
3
#
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU General Public License for more details.
13
#
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, see <http://www.gnu.org/licenses/>.
16

  
17
import base64
18
import hmac
19
import hashlib
20
import datetime
21
import urllib
22
import urlparse
23
import random
24

  
25
from quixote import get_request, get_publisher
26
from qommon.errors import (AccessForbiddenError, UnknownNameIdAccessForbiddenError)
27

  
28

  
29
def is_url_signed():
30
    query_string = get_request().get_query()
31
    if not query_string:
32
        return False
33
    signature = get_request().form.get('signature')
34
    if not isinstance(signature, basestring):
35
        return False
36
    # verify signature
37
    orig = get_request().form.get('orig')
38
    if not isinstance(orig, basestring):
39
        raise AccessForbiddenError('missing/multiple orig field')
40
    key = get_publisher().get_site_option(orig, 'api-secrets')
41
    if not key:
42
        raise AccessForbiddenError('invalid orig')
43
    algo = get_request().form.get('algo')
44
    if not isinstance(algo, basestring):
45
        raise AccessForbiddenError('missing/multiple algo field')
46
    try:
47
        algo = getattr(hashlib, algo)
48
    except AttributeError:
49
        raise AccessForbiddenError('invalid algo')
50
    if signature != base64.standard_b64encode(
51
            hmac.new(key, query_string[:query_string.find('&signature=')], algo).digest()):
52
        raise AccessForbiddenError('invalid signature')
53
    timestamp = get_request().form.get('timestamp')
54
    if not isinstance(timestamp, basestring):
55
        raise AccessForbiddenError('missing/multiple timestamp field')
56
    try:
57
        delta = (datetime.datetime.utcnow().replace(tzinfo=None) -
58
                 datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ'))
59
    except ValueError:
60
        raise AccessForbiddenError('invalid timestamp field')
61
    MAX_DELTA = 30
62
    if abs(delta) > datetime.timedelta(seconds=MAX_DELTA):
63
        raise AccessForbiddenError('timestamp delta is more than %s seconds: %s seconds'
64
                                   % (MAX_DELTA, delta))
65
    return True
66

  
67

  
68
def get_user_from_api_query_string():
69
    if not is_url_signed():
70
        return None
71
    # Signature is good. Now looking for the user, by email/NameID.
72
    user = None
73
    if get_request().form.get('email'):
74
        email = get_request().form.get('email')
75
        if not isinstance(email, basestring):
76
            raise AccessForbiddenError('multiple email field')
77
        users = list(get_publisher().user_class.get_users_with_email(email))
78
        if users:
79
            user = users[0]
80
        else:
81
            raise AccessForbiddenError('unknown email')
82
    elif get_request().form.get('NameID'):
83
        ni = get_request().form.get('NameID')
84
        if not isinstance(ni, basestring):
85
            raise AccessForbiddenError('multiple NameID field')
86
        users = list(get_publisher().user_class.get_users_with_name_identifier(ni))
87
        if users:
88
            user = users[0]
89
        else:
90
            raise UnknownNameIdAccessForbiddenError('unknown NameID')
91
    elif 'email' in get_request().form or 'NameID' in get_request().form:
92
        # email or NameID were given as empty to the query string, this maps
93
        # the anonymous user case.
94
        return False
95

  
96
    return user
97

  
98

  
99
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
100
    parsed = urlparse.urlparse(url)
101
    new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
102
    return urlparse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
103

  
104

  
105
def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
106
    if timestamp is None:
107
        timestamp = datetime.datetime.utcnow()
108
    timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
109
    if nonce is None:
110
        nonce = hex(random.getrandbits(128))[2:-1]
111
    new_query = query
112
    if new_query:
113
        new_query += '&'
114
    new_query += urllib.urlencode((
115
        ('algo', algo),
116
        ('timestamp', timestamp),
117
        ('nonce', nonce)))
118
    signature = base64.b64encode(sign_string(new_query, key, algo=algo))
119
    new_query += '&signature=' + urllib.quote(signature)
120
    return new_query
121

  
122

  
123
def sign_string(s, key, algo='sha256', timedelta=30):
124
    digestmod = getattr(hashlib, algo)
125
    hash = hmac.HMAC(key, digestmod=digestmod, msg=s)
126
    return hash.digest()
127

  
128

  
129
class MissingSecret(Exception):
130
    pass
131

  
132

  
133
def get_secret_and_orig(url):
134
    orig = get_request().get_server().split(':')[0]
135
    target_orig = urlparse.urlparse(url).netloc.rsplit('@', 1)[-1].rsplit(':', 1)[0]
136
    secret = get_publisher().get_site_option(target_orig, 'wscall-secrets')
137
    if not secret:
138
        raise MissingSecret()
139
    return secret, orig
wcs/wf/roles.py
25 25
from qommon.ident.idp import is_idp_managing_user_attributes
26 26
from qommon.misc import http_post_request, http_delete_request
27 27
from qommon.publisher import get_cfg, get_logger
28
from wcs.api import sign_url
29

  
30

  
31
class MissingSecret(Exception):
32
    pass
33

  
34

  
35
def get_secret(url):
36
    domain = urlparse.urlparse(url).netloc.split(':')[0]
37
    secret = get_publisher().get_site_option(domain, 'wscall-secrets')
38
    if not secret:
39
        raise MissingSecret()
40
    return secret
28
from wcs.api_utils import sign_url, get_secret_and_orig
41 29

  
42 30

  
43 31
def roles_ws_url(role_uuid, user_uuid):
......
46 34
    base_url = entity_id.split('idp/saml2/metadata')[0]
47 35
    url = urlparse.urljoin(base_url, '/api/roles/%s/members/%s/' % (urllib.quote(role_uuid),
48 36
                                                                     urllib.quote(user_uuid)))
49
    secret = get_secret(url)
50
    orig = get_request().get_server().split(':')[0]
37
    secret, orig = get_secret_and_orig(url)
51 38
    url += '?orig=%s' % orig
52 39
    return sign_url(url, secret)
53 40

  
54
-