Projet

Général

Profil

0006-utils-shamelessly-steal-hobo-s-request-wrapper-69223.patch

Paul Marillonnet, 13 octobre 2022 14:06

Télécharger (4,82 ko)

Voir les différences:

Subject: [PATCH 06/10] utils: shamelessly steal hobo's request wrapper
 (#69223)

    This allows authentic to send publik-signed requests to other
    known services.
    For more info please see
    https://doc-publik.entrouvert.com/dev/wcs/api-webservices/
    authentification/#authentification-par-signature-de-lurl
 src/authentic2/utils/requests_wrapper.py | 106 +++++++++++++++++++++++
 1 file changed, 106 insertions(+)
 create mode 100644 src/authentic2/utils/requests_wrapper.py
src/authentic2/utils/requests_wrapper.py
1
# hobo - portal to configure and deploy applications
2
# Copyright (C) 2015-2022 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import base64
18
import datetime
19
import hashlib
20
import hmac
21
import logging
22
import random
23
import urllib
24

  
25
from django.conf import settings
26
from django.utils.encoding import smart_bytes
27
from django.utils.http import quote, urlencode
28
from requests import Session as RequestsSession
29
from requests.auth import AuthBase
30

  
31
logger = logging.getLogger(__name__)
32

  
33

  
34
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
35
    parsed = urllib.parse.urlparse(url)
36
    new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
37
    return urllib.parse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
38

  
39

  
40
def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
41
    if timestamp is None:
42
        timestamp = datetime.datetime.utcnow()
43
    timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
44
    if nonce is None:
45
        nonce = hex(random.getrandbits(128))[2:]
46
    new_query = query
47
    if new_query:
48
        new_query += '&'
49
    new_query += urlencode((('algo', algo), ('timestamp', timestamp)))
50
    if nonce:  # we don't add nonce if it's an empty string
51
        new_query += '&nonce=' + quote(nonce)
52
    signature = base64.b64encode(sign_string(new_query, key, algo=algo))
53
    new_query += '&signature=' + quote(signature)
54
    return new_query
55

  
56

  
57
def sign_string(s, key, algo='sha256'):
58
    digestmod = getattr(hashlib, algo)
59
    if isinstance(key, str):
60
        key = key.encode('utf-8')
61
    hash = hmac.HMAC(smart_bytes(key), digestmod=digestmod, msg=smart_bytes(s))
62
    return hash.digest()
63

  
64

  
65
class PublikSignature(AuthBase):
66
    def __init__(self, secret):
67
        self.secret = secret
68

  
69
    def __call__(self, request):
70
        request.url = sign_url(request.url, self.secret)
71
        return request
72

  
73

  
74
def get_known_service_for_url(url):
75
    netloc = urllib.parse.urlparse(url).netloc
76
    if hasattr(settings, 'KNOWN_SERVICES'):
77
        for services in settings.KNOWN_SERVICES.values():
78
            for service in services.values():
79
                remote_url = service.get('url')
80
                if urllib.parse.urlparse(remote_url).netloc == netloc:
81
                    return service
82
    return None
83

  
84

  
85
class Requests(RequestsSession):
86
    def request(self, method, url, **kwargs):
87
        remote_service = get_known_service_for_url(url)
88
        if remote_service:
89
            kwargs['auth'] = PublikSignature(remote_service.get('secret'))
90

  
91
            # only keeps the path (URI) in url parameter, scheme and netloc are
92
            # in remote_service
93
            scheme, netloc, path, params, query, fragment = urllib.parse.urlparse(url)
94
            url = urllib.parse.urlunparse(('', '', path, params, query, fragment))
95

  
96
            query_params = {'orig': remote_service.get('orig')}
97

  
98
            remote_service_base_url = remote_service.get('url')
99
            scheme, netloc, dummy, params, _dummy2, fragment = urllib.parse.urlparse(remote_service_base_url)
100

  
101
            query = urlencode(query_params)
102
            url = urllib.parse.urlunparse((scheme, netloc, path, params, query, fragment))
103

  
104
        else:
105
            logger.warning('remote service for url %s is unknown, not performing any signature', url)
106
        return super().request(method, url, **kwargs)
0
-