|
1 |
# hobo - portal to configure and deploy applications
|
|
2 |
# Copyright (C) 2015-2022 Entr'ouvert
|
|
3 |
#
|
|
4 |
# This program is free software: you can redistribute it and/or modify it
|
|
5 |
# under the terms of the GNU Affero General Public License as published
|
|
6 |
# by the Free Software Foundation, either version 3 of the License, or
|
|
7 |
# (at your option) any later version.
|
|
8 |
#
|
|
9 |
# This program is distributed in the hope that it will be useful,
|
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
12 |
# GNU Affero General Public License for more details.
|
|
13 |
#
|
|
14 |
# You should have received a copy of the GNU Affero General Public License
|
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
16 |
|
|
17 |
import base64
|
|
18 |
import datetime
|
|
19 |
import hashlib
|
|
20 |
import hmac
|
|
21 |
import logging
|
|
22 |
import random
|
|
23 |
import urllib
|
|
24 |
|
|
25 |
from django.conf import settings
|
|
26 |
from django.utils.encoding import smart_bytes
|
|
27 |
from django.utils.http import quote, urlencode
|
|
28 |
from requests import Session as RequestsSession
|
|
29 |
from requests.auth import AuthBase
|
|
30 |
|
|
31 |
logger = logging.getLogger(__name__)
|
|
32 |
|
|
33 |
|
|
34 |
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
|
|
35 |
parsed = urllib.parse.urlparse(url)
|
|
36 |
new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
|
|
37 |
return urllib.parse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
|
|
38 |
|
|
39 |
|
|
40 |
def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
|
|
41 |
if timestamp is None:
|
|
42 |
timestamp = datetime.datetime.utcnow()
|
|
43 |
timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
|
|
44 |
if nonce is None:
|
|
45 |
nonce = hex(random.getrandbits(128))[2:]
|
|
46 |
new_query = query
|
|
47 |
if new_query:
|
|
48 |
new_query += '&'
|
|
49 |
new_query += urlencode((('algo', algo), ('timestamp', timestamp)))
|
|
50 |
if nonce: # we don't add nonce if it's an empty string
|
|
51 |
new_query += '&nonce=' + quote(nonce)
|
|
52 |
signature = base64.b64encode(sign_string(new_query, key, algo=algo))
|
|
53 |
new_query += '&signature=' + quote(signature)
|
|
54 |
return new_query
|
|
55 |
|
|
56 |
|
|
57 |
def sign_string(s, key, algo='sha256'):
|
|
58 |
digestmod = getattr(hashlib, algo)
|
|
59 |
if isinstance(key, str):
|
|
60 |
key = key.encode('utf-8')
|
|
61 |
hash = hmac.HMAC(smart_bytes(key), digestmod=digestmod, msg=smart_bytes(s))
|
|
62 |
return hash.digest()
|
|
63 |
|
|
64 |
|
|
65 |
class PublikSignature(AuthBase):
|
|
66 |
def __init__(self, secret):
|
|
67 |
self.secret = secret
|
|
68 |
|
|
69 |
def __call__(self, request):
|
|
70 |
request.url = sign_url(request.url, self.secret)
|
|
71 |
return request
|
|
72 |
|
|
73 |
|
|
74 |
def get_known_service_for_url(url):
|
|
75 |
netloc = urllib.parse.urlparse(url).netloc
|
|
76 |
if hasattr(settings, 'KNOWN_SERVICES'):
|
|
77 |
for services in settings.KNOWN_SERVICES.values():
|
|
78 |
for service in services.values():
|
|
79 |
remote_url = service.get('url')
|
|
80 |
if urllib.parse.urlparse(remote_url).netloc == netloc:
|
|
81 |
return service
|
|
82 |
return None
|
|
83 |
|
|
84 |
|
|
85 |
class Requests(RequestsSession):
|
|
86 |
def request(self, method, url, **kwargs):
|
|
87 |
remote_service = get_known_service_for_url(url)
|
|
88 |
if remote_service:
|
|
89 |
kwargs['auth'] = PublikSignature(remote_service.get('secret'))
|
|
90 |
|
|
91 |
# only keeps the path (URI) in url parameter, scheme and netloc are
|
|
92 |
# in remote_service
|
|
93 |
scheme, netloc, path, params, query, fragment = urllib.parse.urlparse(url)
|
|
94 |
url = urllib.parse.urlunparse(('', '', path, params, query, fragment))
|
|
95 |
|
|
96 |
query_params = {'orig': remote_service.get('orig')}
|
|
97 |
|
|
98 |
remote_service_base_url = remote_service.get('url')
|
|
99 |
scheme, netloc, dummy, params, _dummy2, fragment = urllib.parse.urlparse(remote_service_base_url)
|
|
100 |
|
|
101 |
query = urlencode(query_params)
|
|
102 |
url = urllib.parse.urlunparse((scheme, netloc, path, params, query, fragment))
|
|
103 |
|
|
104 |
else:
|
|
105 |
logger.warning('remote service for url %s is unknown, not performing any signature', url)
|
|
106 |
return super().request(method, url, **kwargs)
|
0 |
|
-
|