0001-workflow-make-it-possible-to-sign-webservice-calls-6.patch
tests/test_api.py | ||
---|---|---|
11 | 11 |
from wcs.formdef import FormDef |
12 | 12 |
from wcs.categories import Category |
13 | 13 |
from wcs import fields |
14 |
from wcs.api import sign_url |
|
14 | 15 | |
15 | 16 |
from utilities import get_app, create_temporary_pub |
16 | 17 | |
... | ... | |
118 | 119 |
output = get_app(pub).get('/user?%s&signature=%s' % (query, signature)) |
119 | 120 |
assert output.json['user_display_name'] == u'Jean Darmette' |
120 | 121 | |
122 |
def test_sign_url(): |
|
123 |
signed_url = sign_url( |
|
124 |
'http://example.net/user?format=json&orig=coucou&email=%s' % urllib.quote(user.email), |
|
125 |
'1234' |
|
126 |
) |
|
127 |
url = signed_url[len('http://example.net'):] |
|
128 |
output = get_app(pub).get(url) |
|
129 |
assert output.json['user_display_name'] == u'Jean Darmette' |
|
130 | ||
131 |
signed_url = sign_url( |
|
132 |
'http://example.net/user?format=json&orig=coucou&email=%s' % urllib.quote(user.email), |
|
133 |
'12345' |
|
134 |
) |
|
135 |
url = signed_url[len('http://example.net'):] |
|
136 |
output = get_app(pub).get(url, status=403) |
|
137 | ||
121 | 138 |
def test_formdef_list(): |
122 | 139 |
FormDef.wipe() |
123 | 140 |
formdef = FormDef() |
wcs/api.py | ||
---|---|---|
18 | 18 |
import hmac |
19 | 19 |
import hashlib |
20 | 20 |
import datetime |
21 |
import urllib |
|
21 | 22 |
import urllib2 |
23 |
import urlparse |
|
24 |
import random |
|
22 | 25 | |
23 | 26 |
from quixote import get_request, get_publisher, get_response |
24 | 27 |
from quixote.directory import Directory |
... | ... | |
105 | 108 | |
106 | 109 |
return user |
107 | 110 | |
111 |
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None): |
|
112 |
parsed = urlparse.urlparse(url) |
|
113 |
new_query = sign_query(parsed.query, key, algo, timestamp, nonce) |
|
114 |
return urlparse.urlunparse(parsed[:4] + (new_query,) + parsed[5:]) |
|
115 | ||
116 |
def sign_query(query, key, algo='sha256', timestamp=None, nonce=None): |
|
117 |
if timestamp is None: |
|
118 |
timestamp = datetime.datetime.utcnow() |
|
119 |
timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ') |
|
120 |
if nonce is None: |
|
121 |
nonce = hex(random.getrandbits(128))[2:-1] |
|
122 |
new_query = query |
|
123 |
if new_query: |
|
124 |
new_query += '&' |
|
125 |
new_query += urllib.urlencode(( |
|
126 |
('algo', algo), |
|
127 |
('timestamp', timestamp), |
|
128 |
('nonce', nonce))) |
|
129 |
signature = base64.b64encode(sign_string(new_query, key, algo=algo)) |
|
130 |
new_query += '&signature=' + urllib.quote(signature) |
|
131 |
return new_query |
|
132 | ||
133 |
def sign_string(s, key, algo='sha256', timedelta=30): |
|
134 |
digestmod = getattr(hashlib, algo) |
|
135 |
hash = hmac.HMAC(key, digestmod=digestmod, msg=s) |
|
136 |
return hash.digest() |
|
108 | 137 | |
109 | 138 |
class ApiDirectory(Directory): |
110 | 139 |
_q_exports = [('reverse-geocoding', 'reverse_geocoding')] |
wcs/wf/wscall.py | ||
---|---|---|
18 | 18 | |
19 | 19 |
from qommon.form import * |
20 | 20 |
from qommon.misc import http_get_page, http_post_request, get_variadic_url |
21 |
from wcs.workflows import WorkflowStatusItem, register_item_class |
|
21 |
from wcs.workflows import WorkflowStatusItem, register_item_class, template_on_formdata |
|
22 |
from wcs.api import sign_url |
|
22 | 23 | |
23 | 24 |
TIMEOUT = 15 |
24 | 25 | |
... | ... | |
30 | 31 |
url = None |
31 | 32 |
varname = None |
32 | 33 |
post = True |
34 |
request_signature_key = None |
|
33 | 35 | |
34 | 36 |
def get_parameters(self): |
35 |
return ('url', 'post', 'varname') |
|
37 |
return ('url', 'post', 'varname', 'request_signature_key')
|
|
36 | 38 | |
37 | 39 |
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None): |
38 | 40 |
if 'url' in parameters: |
... | ... | |
46 | 48 |
if 'varname' in parameters: |
47 | 49 |
form.add(VarnameWidget, '%svarname' % prefix, |
48 | 50 |
title=_('Variable Name'), value=self.varname) |
51 |
if 'request_signature_key': |
|
52 |
form.add(StringWidget, '%srequest_signature_key' % prefix, |
|
53 |
title=_('Request Signature Key'), |
|
54 |
value=self.request_signature_key) |
|
49 | 55 | |
50 | 56 |
def perform(self, formdata): |
51 | 57 |
if not self.url: |
... | ... | |
55 | 61 |
if '[' in url: |
56 | 62 |
variables = get_publisher().substitutions.get_context_variables() |
57 | 63 |
url = get_variadic_url(url, variables) |
64 | ||
65 |
if self.request_signature_key: |
|
66 |
signature_key = template_on_formdata(formdata, self.request_signature_key) |
|
67 |
if signature_key: |
|
68 |
url = sign_url(url, signature_key) |
|
69 | ||
58 | 70 |
headers = {'Content-type': 'application/json', |
59 | 71 |
'Accept': 'application/json'} |
60 | 72 |
if self.post: |
61 |
- |