From ad5fd157346ab58758e011607562cbdfc83a8bb0 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Fri, 16 Sep 2016 11:32:54 +0200 Subject: [PATCH] api: check signature's nonce (#10923) It's mandatory to prevent replays. Request is marked as valid after the first check so that multiple call to is_url_signed() on the same request do not fail. Tests had to be modified so they don't use the same signed URL for their web service calls. --- tests/test_api.py | 74 +++++++++++++++++++++++++++++++++++----------- wcs/api_utils.py | 51 ++++++++++++++++++++++++++------ wcs/qommon/http_request.py | 2 ++ wcs/qommon/publisher.py | 22 ++++++++++++++ 4 files changed, 122 insertions(+), 27 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index 9be1db2..9a8c8ff 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -10,6 +10,7 @@ import urlparse import datetime import time import json +import sys from quixote import cleanup, get_publisher from wcs.qommon.http_request import HTTPRequest @@ -23,7 +24,8 @@ from wcs.data_sources import NamedDataSource from wcs.workflows import Workflow, EditableWorkflowStatusItem, WorkflowBackofficeFieldsFormDef from wcs.wf.jump import JumpWorkflowStatusItem from wcs import fields, qommon -from wcs.api_utils import sign_url, get_secret_and_orig +from wcs.api_utils import sign_url, get_secret_and_orig, is_url_signed, DEFAULT_DURATION +from wcs.qommon.errors import AccessForbiddenError from utilities import get_app, create_temporary_pub, clean_temporary_pub @@ -248,6 +250,39 @@ def test_get_user(pub, local_user): assert [x['name'] for x in output.json['user_roles']] == ['Foo bar'] assert [x['slug'] for x in output.json['user_roles']] == ['foo-bar'] + +def test_is_url_signed_check_nonce(pub, local_user): + ORIG = 'xxx' + KEY = 'xxx' + + pub.site_options.add_section('api-secrets') + pub.site_options.set('api-secrets', ORIG, KEY) + # test clean_nonces do not bark when nonces directory is empty + if os.path.exists(os.path.join(pub.app_dir, 'nonces')): + shutil.rmtree(os.path.join(pub.app_dir, 'nonces')) + pub.clean_nonces() + signed_url = sign_url('?format=json&orig=%s&email=%s' + % (ORIG, urllib.quote(local_user.email)), KEY) + req = HTTPRequest(None, {'SCRIPT_NAME': '/', + 'SERVER_NAME': 'example.net', + 'QUERY_STRING': signed_url[1:]}) + req.process_inputs() + pub.set_app_dir(req) + pub._set_request(req) + + assert is_url_signed() + with pytest.raises(AccessForbiddenError) as exc_info: + req.signed = False + is_url_signed() + assert exc_info.value.public_msg == 'nonce already used' + # test that clean nonces works + assert os.listdir(os.path.join(pub.app_dir, 'nonces')) + pub.clean_nonces(delta=0) + assert os.listdir(os.path.join(pub.app_dir, 'nonces')) + pub.clean_nonces(delta=0, now=time.time() + 2 * DEFAULT_DURATION) + assert not os.listdir(os.path.join(pub.app_dir, 'nonces')) + + def test_get_user_compat_endpoint(pub, local_user): signed_url = sign_url( 'http://example.net/user?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), @@ -417,10 +452,11 @@ def test_formdef_submit(pub, local_user): assert resp.json['err'] == 1 assert resp.json['err_desc'] == 'unsigned API call' - signed_url = sign_url('http://example.net/api/formdefs/test/submit' + - '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234') - url = signed_url[len('http://example.net'):] - resp = get_app(pub).post_json(url, {'data': {}}) + def url(): + signed_url = sign_url('http://example.net/api/formdefs/test/submit' + + '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234') + return signed_url[len('http://example.net'):] + resp = get_app(pub).post_json(url(), {'data': {}}) assert resp.json['err'] == 0 assert data_class.get(resp.json['data']['id']).status == 'wf-new' assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id) @@ -428,19 +464,19 @@ def test_formdef_submit(pub, local_user): formdef.disabled = True formdef.store() - resp = get_app(pub).post_json(url, {'data': {}}, status=403) + resp = get_app(pub).post_json(url(), {'data': {}}, status=403) assert resp.json['err'] == 1 assert resp.json['err_desc'] == 'disabled form' formdef.disabled = False formdef.store() - resp = get_app(pub).post_json(url, {'meta': {'backoffice-submission': True}, 'data': {}}, status=403) + resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}}, status=403) formdef.backoffice_submission_roles = ['xx'] formdef.store() - resp = get_app(pub).post_json(url, {'meta': {'backoffice-submission': True}, 'data': {}}, status=403) + resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}}, status=403) formdef.backoffice_submission_roles = [role.id] formdef.store() - resp = get_app(pub).post_json(url, {'meta': {'backoffice-submission': True}, 'data': {}}) + resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}}) assert data_class.get(resp.json['data']['id']).status == 'wf-new' assert data_class.get(resp.json['data']['id']).backoffice_submission is True assert data_class.get(resp.json['data']['id']).user_id is None @@ -448,13 +484,13 @@ def test_formdef_submit(pub, local_user): formdef.enable_tracking_codes = True formdef.store() - resp = get_app(pub).post_json(url, {'data': {}}) + resp = get_app(pub).post_json(url(), {'data': {}}) assert data_class.get(resp.json['data']['id']).tracking_code - resp = get_app(pub).post_json(url, {'meta': {'draft': True}, 'data': {}}) + resp = get_app(pub).post_json(url(), {'meta': {'draft': True}, 'data': {}}) assert data_class.get(resp.json['data']['id']).status == 'draft' - resp = get_app(pub).post_json(url, {'meta': {'backoffice-submission': True}, 'data': {}, + resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}, 'context': {'channel': 'mail', 'comments': 'blah'} }) assert data_class.get(resp.json['data']['id']).status == 'wf-new' assert data_class.get(resp.json['data']['id']).backoffice_submission is True @@ -480,15 +516,17 @@ def test_formdef_submit_only_one(pub, local_user): formdef.store() data_class = formdef.data_class() - signed_url = sign_url('http://example.net/api/formdefs/test/submit' + - '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234') - url = signed_url[len('http://example.net'):] - resp = get_app(pub).post_json(url, {'data': {}}) + def url(): + signed_url = sign_url('http://example.net/api/formdefs/test/submit' + + '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), + '1234') + return signed_url[len('http://example.net'):] + resp = get_app(pub).post_json(url(), {'data': {}}) assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id) assert data_class.count() == 1 - resp = get_app(pub).post_json(url, {'data': {}}, status=403) + resp = get_app(pub).post_json(url(), {'data': {}}, status=403) assert resp.json['err'] == 1 assert resp.json['err_desc'] == 'only one formdata by user is allowed' @@ -496,7 +534,7 @@ def test_formdef_submit_only_one(pub, local_user): formdata.user_id = '1000' # change owner formdata.store() - resp = get_app(pub).post_json(url, {'data': {}}, status=200) + resp = get_app(pub).post_json(url(), {'data': {}}, status=200) assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id) assert data_class.count() == 2 diff --git a/wcs/api_utils.py b/wcs/api_utils.py index 1b00505..d5b61c8 100644 --- a/wcs/api_utils.py +++ b/wcs/api_utils.py @@ -21,12 +21,20 @@ import datetime import urllib import urlparse import random +import os +import errno +import calendar from quixote import get_request, get_publisher from qommon.errors import (AccessForbiddenError, UnknownNameIdAccessForbiddenError) +import qommon.misc +DEFAULT_DURATION = 30 -def is_url_signed(): + +def is_url_signed(utcnow=None, duration=DEFAULT_DURATION): + if get_request().signed: + return True query_string = get_request().get_query() if not query_string: return False @@ -54,14 +62,39 @@ def is_url_signed(): if not isinstance(timestamp, basestring): raise AccessForbiddenError('missing/multiple timestamp field') try: - delta = (datetime.datetime.utcnow().replace(tzinfo=None) - - datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')) - except ValueError: - raise AccessForbiddenError('invalid timestamp field') - MAX_DELTA = 30 - if abs(delta) > datetime.timedelta(seconds=MAX_DELTA): - raise AccessForbiddenError('timestamp delta is more than %s seconds: %s seconds' - % (MAX_DELTA, delta)) + timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ') + except ValueError, e: + raise AccessForbiddenError('invalid timestamp field; %s' % e) + delta = (utcnow or datetime.datetime.utcnow()).replace(tzinfo=None) - timestamp + if abs(delta) > datetime.timedelta(seconds=duration): + raise AccessForbiddenError('timestamp delta is more than %s seconds: %s' + % (duration, delta)) + # check nonce + nonce = get_request().form.get('nonce') + if nonce is not None: + # compute end date of the nonce as an unix timestamp + until = timestamp + datetime.timedelta(seconds=duration) + until = calendar.timegm(until.timetuple()) + # normalize nonce + nonce = nonce[:128] + nonce = qommon.misc.simplify(nonce).replace('/', '-') + nonce_dir = os.path.join(get_publisher().app_dir, 'nonces') + nonce_path = os.path.join(nonce_dir, nonce) + try: + try: + if not os.path.exists(nonce_dir): + os.makedirs(nonce_dir) + except OSError, e: + if e.errno != errno.EEXIST: + raise + os.open(nonce_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY) + # touch file in the future for cleaning_nonces + os.utime(nonce_path, (until, until)) + except OSError, e: + if e.errno != errno.EEXIST: + raise + raise AccessForbiddenError('nonce already used') + get_request().signed = True return True diff --git a/wcs/qommon/http_request.py b/wcs/qommon/http_request.py index 2cc4591..9efda02 100644 --- a/wcs/qommon/http_request.py +++ b/wcs/qommon/http_request.py @@ -24,6 +24,8 @@ from quixote.errors import RequestError from http_response import HTTPResponse class HTTPRequest(quixote.http_request.HTTPRequest): + signed = False + def __init__(self, *args, **kwargs): quixote.http_request.HTTPRequest.__init__(self, *args, **kwargs) self.response = HTTPResponse() diff --git a/wcs/qommon/publisher.py b/wcs/qommon/publisher.py index 8a131e8..1f952d7 100644 --- a/wcs/qommon/publisher.py +++ b/wcs/qommon/publisher.py @@ -688,6 +688,27 @@ class QommonPublisher(Publisher, object): cls.cronjobs = [] cls.cronjobs.append(cronjob) + def clean_nonces(self, delta=60, now=None): + nonce_dir = os.path.join(get_publisher().app_dir, 'nonces') + now = now or time.time() + if not os.path.exists(nonce_dir): + return + cleaning_lock_file = os.path.join(self.app_dir, 'cleaning_nonces.lock') + fd = os.open(cleaning_lock_file, os.O_RDONLY | os.O_CREAT, 0666) + try: + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError: + # lock is currently held, that is fine. + return + try: + for nonce in os.listdir(nonce_dir): + nonce_path = os.path.join(nonce_dir, nonce) + # we need delta so that os.utime in is_url_signed has time to update file timestamp + if delta < now - os.stat(nonce_path)[8]: + os.unlink(nonce_path) + finally: + os.close(fd) + def clean_sessions(self): cleaning_lock_file = os.path.join(self.app_dir, 'cleaning_sessions.lock') fd = os.open(cleaning_lock_file, os.O_RDONLY | os.O_CREAT, 0666) @@ -754,6 +775,7 @@ class QommonPublisher(Publisher, object): @classmethod def register_cronjobs(cls): cls.register_cronjob(CronJob(cls.clean_sessions, minutes=range(0, 60, 5))) + cls.register_cronjob(CronJob(cls.clean_nonces, minutes=range(0, 60, 5))) cls.register_cronjob(CronJob(cls.clean_afterjobs, minutes=[random.randint(0, 59)])) cls.register_cronjob(CronJob(cls.clean_tempfiles, minutes=[random.randint(0, 59)])) -- 2.1.4