0001-api-check-signature-s-nonce-10923.patch
tests/test_api.py | ||
---|---|---|
10 | 10 |
import datetime |
11 | 11 |
import time |
12 | 12 |
import json |
13 |
import sys |
|
13 | 14 | |
14 | 15 |
from quixote import cleanup, get_publisher |
15 | 16 |
from wcs.qommon.errors import AccessForbiddenError |
... | ... | |
260 | 261 |
with pytest.raises(AccessForbiddenError): |
261 | 262 |
test_is_url_signed(utcnow=datetime.datetime(1970, 1, 1, 0, 0, 31)) |
262 | 263 | |
264 |
def test_is_url_signed_check_nonce(pub, local_user): |
|
265 |
pub.site_options.add_section('api-secrets') |
|
266 |
pub.site_options.set('api-secrets', 'xxx', 'xxx') |
|
267 |
# test clean_nonces do not bark when nonces directory is empty |
|
268 |
if os.path.exists(os.path.join(pub.app_dir, 'nonces')): |
|
269 |
shutil.rmtree(os.path.join(pub.app_dir, 'nonces')) |
|
270 |
pub.clean_nonces() |
|
271 |
signed_url = sign_url( |
|
272 |
'?format=json&orig=xxx&email=%s' % urllib.quote(local_user.email), |
|
273 |
'xxx', duration=1) |
|
274 |
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net', |
|
275 |
'QUERY_STRING': signed_url[1:]}) |
|
276 |
req.process_inputs() |
|
277 |
pub.set_app_dir(req) |
|
278 |
pub._set_request(req) |
|
279 | ||
280 |
assert is_url_signed() |
|
281 |
with pytest.raises(AccessForbiddenError): |
|
282 |
req.signed = False |
|
283 |
is_url_signed() |
|
284 |
assert sys.exc_value.public_msg == 'nonce already used' |
|
285 |
# test that clean nonces works |
|
286 |
assert os.listdir(os.path.join(pub.app_dir, 'nonces')) |
|
287 |
time.sleep(3.) |
|
288 |
pub.clean_nonces(delta=0) |
|
289 |
assert not os.listdir(os.path.join(pub.app_dir, 'nonces')) |
|
290 | ||
263 | 291 |
def test_get_user_compat_endpoint(pub, local_user): |
264 | 292 |
signed_url = sign_url( |
265 | 293 |
'http://example.net/user?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), |
... | ... | |
415 | 443 |
assert resp.json['err'] == 1 |
416 | 444 |
assert resp.json['err_desc'] == 'unsigned API call' |
417 | 445 | |
418 |
signed_url = sign_url('http://example.net/api/formdefs/test/submit' + |
|
419 |
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234') |
|
420 |
url = signed_url[len('http://example.net'):] |
|
421 |
resp = get_app(pub).post_json(url, {'data': {}}) |
|
446 |
def url(): |
|
447 |
signed_url = sign_url('http://example.net/api/formdefs/test/submit' + |
|
448 |
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234') |
|
449 |
return signed_url[len('http://example.net'):] |
|
450 |
resp = get_app(pub).post_json(url(), {'data': {}}) |
|
422 | 451 |
assert resp.json['err'] == 0 |
423 | 452 |
assert data_class.get(resp.json['data']['id']).status == 'wf-new' |
424 | 453 |
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id) |
... | ... | |
426 | 455 | |
427 | 456 |
formdef.disabled = True |
428 | 457 |
formdef.store() |
429 |
resp = get_app(pub).post_json(url, {'data': {}}, status=403) |
|
458 |
resp = get_app(pub).post_json(url(), {'data': {}}, status=403)
|
|
430 | 459 |
assert resp.json['err'] == 1 |
431 | 460 |
assert resp.json['err_desc'] == 'disabled form' |
432 | 461 | |
433 | 462 |
formdef.disabled = False |
434 | 463 |
formdef.store() |
435 |
resp = get_app(pub).post_json(url, {'meta': {'backoffice-submission': True}, 'data': {}}, status=403) |
|
464 |
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}}, status=403)
|
|
436 | 465 |
formdef.backoffice_submission_roles = ['xx'] |
437 | 466 |
formdef.store() |
438 |
resp = get_app(pub).post_json(url, {'meta': {'backoffice-submission': True}, 'data': {}}, status=403) |
|
467 |
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}}, status=403)
|
|
439 | 468 |
formdef.backoffice_submission_roles = [role.id] |
440 | 469 |
formdef.store() |
441 |
resp = get_app(pub).post_json(url, {'meta': {'backoffice-submission': True}, 'data': {}}) |
|
470 |
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}})
|
|
442 | 471 |
assert data_class.get(resp.json['data']['id']).status == 'wf-new' |
443 | 472 |
assert data_class.get(resp.json['data']['id']).backoffice_submission is True |
444 | 473 |
assert data_class.get(resp.json['data']['id']).user_id is None |
... | ... | |
446 | 475 | |
447 | 476 |
formdef.enable_tracking_codes = True |
448 | 477 |
formdef.store() |
449 |
resp = get_app(pub).post_json(url, {'data': {}}) |
|
478 |
resp = get_app(pub).post_json(url(), {'data': {}})
|
|
450 | 479 |
assert data_class.get(resp.json['data']['id']).tracking_code |
451 | 480 | |
452 |
resp = get_app(pub).post_json(url, {'meta': {'draft': True}, 'data': {}}) |
|
481 |
resp = get_app(pub).post_json(url(), {'meta': {'draft': True}, 'data': {}})
|
|
453 | 482 |
assert data_class.get(resp.json['data']['id']).status == 'draft' |
454 | 483 | |
455 |
resp = get_app(pub).post_json(url, {'meta': {'backoffice-submission': True}, 'data': {}, |
|
484 |
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {},
|
|
456 | 485 |
'context': {'channel': 'mail', 'comments': 'blah'} }) |
457 | 486 |
assert data_class.get(resp.json['data']['id']).status == 'wf-new' |
458 | 487 |
assert data_class.get(resp.json['data']['id']).backoffice_submission is True |
wcs/api_utils.py | ||
---|---|---|
21 | 21 |
import urllib |
22 | 22 |
import urlparse |
23 | 23 |
import random |
24 |
import os |
|
25 |
import errno |
|
26 |
import calendar |
|
24 | 27 | |
25 | 28 |
from quixote import get_request, get_publisher |
26 | 29 |
from qommon.errors import (AccessForbiddenError, UnknownNameIdAccessForbiddenError) |
30 |
import qommon.misc |
|
27 | 31 | |
28 | 32 |
DEFAULT_DURATION = 30 |
29 | 33 | |
30 | 34 | |
31 | 35 |
def is_url_signed(utcnow=None): |
36 |
if getattr(get_request(), 'signed', False): |
|
37 |
return True |
|
32 | 38 |
query_string = get_request().get_query() |
33 | 39 |
if not query_string: |
34 | 40 |
return False |
... | ... | |
67 | 73 |
if abs(delta) > datetime.timedelta(seconds=duration): |
68 | 74 |
raise AccessForbiddenError('timestamp delta is more than %s seconds: %s' |
69 | 75 |
% (duration, delta)) |
76 |
# check nonce |
|
77 |
nonce = get_request().form.get('nonce') |
|
78 |
if not nonce is None: |
|
79 |
# compute end date of the nonce as an unix timestamp |
|
80 |
until = timestamp + datetime.timedelta(seconds=duration) |
|
81 |
until = calendar.timegm(until.timetuple()) |
|
82 |
# normalize nonce |
|
83 |
nonce = nonce[:128] |
|
84 |
nonce = qommon.misc.simplify(nonce).replace('/', '-') |
|
85 |
nonce_dir = os.path.join(get_publisher().app_dir, 'nonces') |
|
86 |
nonce_path = os.path.join(nonce_dir, nonce) |
|
87 |
try: |
|
88 |
try: |
|
89 |
if not os.path.exists(nonce_dir): |
|
90 |
os.makedirs(nonce_dir) |
|
91 |
except OSError, e: |
|
92 |
if e.errno != errno.EEXIST: |
|
93 |
raise |
|
94 |
os.open(nonce_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY) |
|
95 |
# touch file in the future for cleaning_nonces |
|
96 |
os.utime(nonce_path, (until, until)) |
|
97 |
except OSError, e: |
|
98 |
if e.errno != errno.EEXIST: |
|
99 |
raise |
|
100 |
raise AccessForbiddenError('nonce already used') |
|
101 |
get_request().signed = True |
|
70 | 102 |
return True |
71 | 103 | |
72 | 104 |
wcs/qommon/publisher.py | ||
---|---|---|
679 | 679 |
cls.cronjobs = [] |
680 | 680 |
cls.cronjobs.append(cronjob) |
681 | 681 | |
682 |
def clean_nonces(self, delta=60): |
|
683 |
nonce_dir = os.path.join(get_publisher().app_dir, 'nonces') |
|
684 |
now = time.time() |
|
685 |
if not os.path.exists(nonce_dir): |
|
686 |
return |
|
687 |
cleaning_lock_file = os.path.join(self.app_dir, 'cleaning_nonces.lock') |
|
688 |
fd = os.open(cleaning_lock_file, os.O_RDONLY | os.O_CREAT, 0666) |
|
689 |
try: |
|
690 |
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) |
|
691 |
except IOError: |
|
692 |
# lock is currently held, that is fine. |
|
693 |
return |
|
694 |
try: |
|
695 |
for nonce in os.listdir(nonce_dir): |
|
696 |
nonce_path = os.path.join(nonce_dir, nonce) |
|
697 |
# we need delta so that os.utime in is_url_signed has time to update file timestamp |
|
698 |
if delta < now - os.stat(nonce_path)[8]: |
|
699 |
os.unlink(nonce_path) |
|
700 |
finally: |
|
701 |
os.close(fd) |
|
702 | ||
682 | 703 |
def clean_sessions(self): |
683 | 704 |
cleaning_lock_file = os.path.join(self.app_dir, 'cleaning_sessions.lock') |
684 | 705 |
fd = os.open(cleaning_lock_file, os.O_RDONLY | os.O_CREAT, 0666) |
... | ... | |
745 | 766 |
@classmethod |
746 | 767 |
def register_cronjobs(cls): |
747 | 768 |
cls.register_cronjob(CronJob(cls.clean_sessions, minutes=range(0, 60, 5))) |
769 |
cls.register_cronjob(CronJob(cls.clean_nonces, minutes=range(0, 60, 5))) |
|
748 | 770 |
cls.register_cronjob(CronJob(cls.clean_afterjobs, minutes=[random.randint(0, 59)])) |
749 | 771 |
cls.register_cronjob(CronJob(cls.clean_tempfiles, minutes=[random.randint(0, 59)])) |
750 | 772 | |
751 |
- |