Projet

Général

Profil

0001-workflow-make-it-possible-to-sign-webservice-calls-6.patch

Frédéric Péters, 25 février 2015 14:27

Télécharger (5,21 ko)

Voir les différences:

Subject: [PATCH 1/2] workflow: make it possible to sign webservice calls
 (#6446)

The sign_* functions have been imported from cmsplugin_blurp.
 tests/test_api.py | 17 +++++++++++++++++
 wcs/api.py        | 29 +++++++++++++++++++++++++++++
 wcs/wf/wscall.py  | 16 ++++++++++++++--
 3 files changed, 60 insertions(+), 2 deletions(-)
tests/test_api.py
11 11
from wcs.formdef import FormDef
12 12
from wcs.categories import Category
13 13
from wcs import fields
14
from wcs.api import sign_url
14 15

  
15 16
from utilities import get_app, create_temporary_pub
16 17

  
......
118 119
    output = get_app(pub).get('/user?%s&signature=%s' % (query, signature))
119 120
    assert output.json['user_display_name'] == u'Jean Darmette'
120 121

  
122
def test_sign_url():
123
    signed_url = sign_url(
124
            'http://example.net/user?format=json&orig=coucou&email=%s' % urllib.quote(user.email),
125
            '1234'
126
            )
127
    url = signed_url[len('http://example.net'):]
128
    output = get_app(pub).get(url)
129
    assert output.json['user_display_name'] == u'Jean Darmette'
130

  
131
    signed_url = sign_url(
132
            'http://example.net/user?format=json&orig=coucou&email=%s' % urllib.quote(user.email),
133
            '12345'
134
            )
135
    url = signed_url[len('http://example.net'):]
136
    output = get_app(pub).get(url, status=403)
137

  
121 138
def test_formdef_list():
122 139
    FormDef.wipe()
123 140
    formdef = FormDef()
wcs/api.py
18 18
import hmac
19 19
import hashlib
20 20
import datetime
21
import urllib
21 22
import urllib2
23
import urlparse
24
import random
22 25

  
23 26
from quixote import get_request, get_publisher, get_response
24 27
from quixote.directory import Directory
......
105 108

  
106 109
    return user
107 110

  
111
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
112
    parsed = urlparse.urlparse(url)
113
    new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
114
    return urlparse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
115

  
116
def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
117
    if timestamp is None:
118
        timestamp = datetime.datetime.utcnow()
119
    timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
120
    if nonce is None:
121
        nonce = hex(random.getrandbits(128))[2:-1]
122
    new_query = query
123
    if new_query:
124
        new_query += '&'
125
    new_query += urllib.urlencode((
126
        ('algo', algo),
127
        ('timestamp', timestamp),
128
        ('nonce', nonce)))
129
    signature = base64.b64encode(sign_string(new_query, key, algo=algo))
130
    new_query += '&signature=' + urllib.quote(signature)
131
    return new_query
132

  
133
def sign_string(s, key, algo='sha256', timedelta=30):
134
    digestmod = getattr(hashlib, algo)
135
    hash = hmac.HMAC(key, digestmod=digestmod, msg=s)
136
    return hash.digest()
108 137

  
109 138
class ApiDirectory(Directory):
110 139
    _q_exports = [('reverse-geocoding', 'reverse_geocoding')]
wcs/wf/wscall.py
18 18

  
19 19
from qommon.form import *
20 20
from qommon.misc import http_get_page, http_post_request, get_variadic_url
21
from wcs.workflows import WorkflowStatusItem, register_item_class
21
from wcs.workflows import WorkflowStatusItem, register_item_class, template_on_formdata
22
from wcs.api import sign_url
22 23

  
23 24
TIMEOUT = 15
24 25

  
......
30 31
    url = None
31 32
    varname = None
32 33
    post = True
34
    request_signature_key = None
33 35

  
34 36
    def get_parameters(self):
35
        return ('url', 'post', 'varname')
37
        return ('url', 'post', 'varname', 'request_signature_key')
36 38

  
37 39
    def add_parameters_widgets(self, form, parameters, prefix='', formdef=None):
38 40
        if 'url' in parameters:
......
46 48
        if 'varname' in parameters:
47 49
            form.add(VarnameWidget, '%svarname' % prefix,
48 50
                     title=_('Variable Name'), value=self.varname)
51
        if 'request_signature_key':
52
            form.add(StringWidget, '%srequest_signature_key' % prefix,
53
                    title=_('Request Signature Key'),
54
                    value=self.request_signature_key)
49 55

  
50 56
    def perform(self, formdata):
51 57
        if not self.url:
......
55 61
        if '[' in url:
56 62
            variables = get_publisher().substitutions.get_context_variables()
57 63
            url = get_variadic_url(url, variables)
64

  
65
        if self.request_signature_key:
66
            signature_key = template_on_formdata(formdata, self.request_signature_key)
67
            if signature_key:
68
                url = sign_url(url, signature_key)
69

  
58 70
        headers = {'Content-type': 'application/json',
59 71
                'Accept': 'application/json'}
60 72
        if self.post:
61
-