10 |
10 |
import datetime
|
11 |
11 |
import time
|
12 |
12 |
import json
|
|
13 |
import sys
|
13 |
14 |
|
14 |
15 |
from quixote import cleanup, get_publisher
|
15 |
16 |
from wcs.qommon.http_request import HTTPRequest
|
... | ... | |
23 |
24 |
from wcs.workflows import Workflow, EditableWorkflowStatusItem, WorkflowBackofficeFieldsFormDef
|
24 |
25 |
from wcs.wf.jump import JumpWorkflowStatusItem
|
25 |
26 |
from wcs import fields, qommon
|
26 |
|
from wcs.api_utils import sign_url, get_secret_and_orig
|
|
27 |
from wcs.api_utils import sign_url, get_secret_and_orig, is_url_signed, DEFAULT_DURATION
|
|
28 |
from wcs.qommon.errors import AccessForbiddenError
|
27 |
29 |
|
28 |
30 |
from utilities import get_app, create_temporary_pub, clean_temporary_pub
|
29 |
31 |
|
... | ... | |
248 |
250 |
assert [x['name'] for x in output.json['user_roles']] == ['Foo bar']
|
249 |
251 |
assert [x['slug'] for x in output.json['user_roles']] == ['foo-bar']
|
250 |
252 |
|
|
253 |
|
|
254 |
def test_is_url_signed_check_nonce(pub, local_user):
|
|
255 |
ORIG = 'xxx'
|
|
256 |
KEY = 'xxx'
|
|
257 |
|
|
258 |
pub.site_options.add_section('api-secrets')
|
|
259 |
pub.site_options.set('api-secrets', ORIG, KEY)
|
|
260 |
# test clean_nonces do not bark when nonces directory is empty
|
|
261 |
if os.path.exists(os.path.join(pub.app_dir, 'nonces')):
|
|
262 |
shutil.rmtree(os.path.join(pub.app_dir, 'nonces'))
|
|
263 |
pub.clean_nonces()
|
|
264 |
signed_url = sign_url('?format=json&orig=%s&email=%s'
|
|
265 |
% (ORIG, urllib.quote(local_user.email)), KEY)
|
|
266 |
req = HTTPRequest(None, {'SCRIPT_NAME': '/',
|
|
267 |
'SERVER_NAME': 'example.net',
|
|
268 |
'QUERY_STRING': signed_url[1:]})
|
|
269 |
req.process_inputs()
|
|
270 |
pub.set_app_dir(req)
|
|
271 |
pub._set_request(req)
|
|
272 |
|
|
273 |
assert is_url_signed()
|
|
274 |
with pytest.raises(AccessForbiddenError) as exc_info:
|
|
275 |
req.signed = False
|
|
276 |
is_url_signed()
|
|
277 |
assert exc_info.value.public_msg == 'nonce already used'
|
|
278 |
# test that clean nonces works
|
|
279 |
assert os.listdir(os.path.join(pub.app_dir, 'nonces'))
|
|
280 |
pub.clean_nonces(delta=0)
|
|
281 |
assert os.listdir(os.path.join(pub.app_dir, 'nonces'))
|
|
282 |
pub.clean_nonces(delta=0, now=time.time() + 2 * DEFAULT_DURATION)
|
|
283 |
assert not os.listdir(os.path.join(pub.app_dir, 'nonces'))
|
|
284 |
|
|
285 |
|
251 |
286 |
def test_get_user_compat_endpoint(pub, local_user):
|
252 |
287 |
signed_url = sign_url(
|
253 |
288 |
'http://example.net/user?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
... | ... | |
417 |
452 |
assert resp.json['err'] == 1
|
418 |
453 |
assert resp.json['err_desc'] == 'unsigned API call'
|
419 |
454 |
|
420 |
|
signed_url = sign_url('http://example.net/api/formdefs/test/submit' +
|
421 |
|
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234')
|
422 |
|
url = signed_url[len('http://example.net'):]
|
423 |
|
resp = get_app(pub).post_json(url, {'data': {}})
|
|
455 |
def url():
|
|
456 |
signed_url = sign_url('http://example.net/api/formdefs/test/submit' +
|
|
457 |
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234')
|
|
458 |
return signed_url[len('http://example.net'):]
|
|
459 |
resp = get_app(pub).post_json(url(), {'data': {}})
|
424 |
460 |
assert resp.json['err'] == 0
|
425 |
461 |
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
|
426 |
462 |
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
|
... | ... | |
428 |
464 |
|
429 |
465 |
formdef.disabled = True
|
430 |
466 |
formdef.store()
|
431 |
|
resp = get_app(pub).post_json(url, {'data': {}}, status=403)
|
|
467 |
resp = get_app(pub).post_json(url(), {'data': {}}, status=403)
|
432 |
468 |
assert resp.json['err'] == 1
|
433 |
469 |
assert resp.json['err_desc'] == 'disabled form'
|
434 |
470 |
|
435 |
471 |
formdef.disabled = False
|
436 |
472 |
formdef.store()
|
437 |
|
resp = get_app(pub).post_json(url, {'meta': {'backoffice-submission': True}, 'data': {}}, status=403)
|
|
473 |
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}}, status=403)
|
438 |
474 |
formdef.backoffice_submission_roles = ['xx']
|
439 |
475 |
formdef.store()
|
440 |
|
resp = get_app(pub).post_json(url, {'meta': {'backoffice-submission': True}, 'data': {}}, status=403)
|
|
476 |
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}}, status=403)
|
441 |
477 |
formdef.backoffice_submission_roles = [role.id]
|
442 |
478 |
formdef.store()
|
443 |
|
resp = get_app(pub).post_json(url, {'meta': {'backoffice-submission': True}, 'data': {}})
|
|
479 |
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}})
|
444 |
480 |
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
|
445 |
481 |
assert data_class.get(resp.json['data']['id']).backoffice_submission is True
|
446 |
482 |
assert data_class.get(resp.json['data']['id']).user_id is None
|
... | ... | |
448 |
484 |
|
449 |
485 |
formdef.enable_tracking_codes = True
|
450 |
486 |
formdef.store()
|
451 |
|
resp = get_app(pub).post_json(url, {'data': {}})
|
|
487 |
resp = get_app(pub).post_json(url(), {'data': {}})
|
452 |
488 |
assert data_class.get(resp.json['data']['id']).tracking_code
|
453 |
489 |
|
454 |
|
resp = get_app(pub).post_json(url, {'meta': {'draft': True}, 'data': {}})
|
|
490 |
resp = get_app(pub).post_json(url(), {'meta': {'draft': True}, 'data': {}})
|
455 |
491 |
assert data_class.get(resp.json['data']['id']).status == 'draft'
|
456 |
492 |
|
457 |
|
resp = get_app(pub).post_json(url, {'meta': {'backoffice-submission': True}, 'data': {},
|
|
493 |
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {},
|
458 |
494 |
'context': {'channel': 'mail', 'comments': 'blah'} })
|
459 |
495 |
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
|
460 |
496 |
assert data_class.get(resp.json['data']['id']).backoffice_submission is True
|
... | ... | |
480 |
516 |
formdef.store()
|
481 |
517 |
data_class = formdef.data_class()
|
482 |
518 |
|
483 |
|
signed_url = sign_url('http://example.net/api/formdefs/test/submit' +
|
484 |
|
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234')
|
485 |
|
url = signed_url[len('http://example.net'):]
|
486 |
|
resp = get_app(pub).post_json(url, {'data': {}})
|
|
519 |
def url():
|
|
520 |
signed_url = sign_url('http://example.net/api/formdefs/test/submit' +
|
|
521 |
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
|
522 |
'1234')
|
|
523 |
return signed_url[len('http://example.net'):]
|
|
524 |
resp = get_app(pub).post_json(url(), {'data': {}})
|
487 |
525 |
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
|
488 |
526 |
|
489 |
527 |
assert data_class.count() == 1
|
490 |
528 |
|
491 |
|
resp = get_app(pub).post_json(url, {'data': {}}, status=403)
|
|
529 |
resp = get_app(pub).post_json(url(), {'data': {}}, status=403)
|
492 |
530 |
assert resp.json['err'] == 1
|
493 |
531 |
assert resp.json['err_desc'] == 'only one formdata by user is allowed'
|
494 |
532 |
|
... | ... | |
496 |
534 |
formdata.user_id = '1000' # change owner
|
497 |
535 |
formdata.store()
|
498 |
536 |
|
499 |
|
resp = get_app(pub).post_json(url, {'data': {}}, status=200)
|
|
537 |
resp = get_app(pub).post_json(url(), {'data': {}}, status=200)
|
500 |
538 |
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
|
501 |
539 |
assert data_class.count() == 2
|
502 |
540 |
|