10 |
10 |
import datetime
|
11 |
11 |
import time
|
12 |
12 |
import json
|
|
13 |
import sys
|
13 |
14 |
|
14 |
15 |
from quixote import cleanup, get_publisher
|
15 |
16 |
from wcs.qommon.http_request import HTTPRequest
|
... | ... | |
23 |
24 |
from wcs.workflows import Workflow, EditableWorkflowStatusItem, WorkflowBackofficeFieldsFormDef
|
24 |
25 |
from wcs.wf.jump import JumpWorkflowStatusItem
|
25 |
26 |
from wcs import fields, qommon
|
26 |
|
from wcs.api_utils import sign_url, get_secret_and_orig
|
|
27 |
from wcs.api_utils import sign_url, get_secret_and_orig, is_url_signed, DEFAULT_DURATION
|
|
28 |
from wcs.qommon.errors import AccessForbiddenError
|
27 |
29 |
|
28 |
30 |
from utilities import get_app, create_temporary_pub, clean_temporary_pub
|
29 |
31 |
|
... | ... | |
248 |
250 |
assert [x['name'] for x in output.json['user_roles']] == ['Foo bar']
|
249 |
251 |
assert [x['slug'] for x in output.json['user_roles']] == ['foo-bar']
|
250 |
252 |
|
|
253 |
|
|
254 |
def test_is_url_signed_check_nonce(pub, local_user):
|
|
255 |
ORIG = 'xxx'
|
|
256 |
KEY = 'xxx'
|
|
257 |
|
|
258 |
pub.site_options.add_section('api-secrets')
|
|
259 |
pub.site_options.set('api-secrets', ORIG, KEY)
|
|
260 |
# test clean_nonces do not bark when nonces directory is empty
|
|
261 |
if os.path.exists(os.path.join(pub.app_dir, 'nonces')):
|
|
262 |
shutil.rmtree(os.path.join(pub.app_dir, 'nonces'))
|
|
263 |
pub.clean_nonces()
|
|
264 |
signed_url = sign_url('?format=json&orig=%s&email=%s'
|
|
265 |
% (ORIG, urllib.quote(local_user.email)), KEY)
|
|
266 |
req = HTTPRequest(None, {'SCRIPT_NAME': '/',
|
|
267 |
'SERVER_NAME': 'example.net',
|
|
268 |
'QUERY_STRING': signed_url[1:]})
|
|
269 |
req.process_inputs()
|
|
270 |
pub.set_app_dir(req)
|
|
271 |
pub._set_request(req)
|
|
272 |
|
|
273 |
assert is_url_signed()
|
|
274 |
with pytest.raises(AccessForbiddenError):
|
|
275 |
req.signed = False
|
|
276 |
is_url_signed()
|
|
277 |
assert sys.exc_value.public_msg == 'nonce already used'
|
|
278 |
# test that clean nonces works
|
|
279 |
assert os.listdir(os.path.join(pub.app_dir, 'nonces'))
|
|
280 |
pub.clean_nonces(delta=0)
|
|
281 |
assert os.listdir(os.path.join(pub.app_dir, 'nonces'))
|
|
282 |
pub.clean_nonces(delta=0, now=time.time() + 2 * DEFAULT_DURATION)
|
|
283 |
assert not os.listdir(os.path.join(pub.app_dir, 'nonces'))
|
|
284 |
|
|
285 |
|
251 |
286 |
def test_get_user_compat_endpoint(pub, local_user):
|
252 |
287 |
signed_url = sign_url(
|
253 |
288 |
'http://example.net/user?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
... | ... | |
409 |
444 |
assert resp.json['err'] == 1
|
410 |
445 |
assert resp.json['err_desc'] == 'unsigned API call'
|
411 |
446 |
|
412 |
|
signed_url = sign_url('http://example.net/api/formdefs/test/submit' +
|
413 |
|
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234')
|
414 |
|
url = signed_url[len('http://example.net'):]
|
415 |
|
resp = get_app(pub).post_json(url, {'data': {}})
|
|
447 |
def url():
|
|
448 |
signed_url = sign_url('http://example.net/api/formdefs/test/submit' +
|
|
449 |
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234')
|
|
450 |
return signed_url[len('http://example.net'):]
|
|
451 |
resp = get_app(pub).post_json(url(), {'data': {}})
|
416 |
452 |
assert resp.json['err'] == 0
|
417 |
453 |
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
|
418 |
454 |
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
|
... | ... | |
420 |
456 |
|
421 |
457 |
formdef.disabled = True
|
422 |
458 |
formdef.store()
|
423 |
|
resp = get_app(pub).post_json(url, {'data': {}}, status=403)
|
|
459 |
resp = get_app(pub).post_json(url(), {'data': {}}, status=403)
|
424 |
460 |
assert resp.json['err'] == 1
|
425 |
461 |
assert resp.json['err_desc'] == 'disabled form'
|
426 |
462 |
|
427 |
463 |
formdef.disabled = False
|
428 |
464 |
formdef.store()
|
429 |
|
resp = get_app(pub).post_json(url, {'meta': {'backoffice-submission': True}, 'data': {}}, status=403)
|
|
465 |
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}}, status=403)
|
430 |
466 |
formdef.backoffice_submission_roles = ['xx']
|
431 |
467 |
formdef.store()
|
432 |
|
resp = get_app(pub).post_json(url, {'meta': {'backoffice-submission': True}, 'data': {}}, status=403)
|
|
468 |
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}}, status=403)
|
433 |
469 |
formdef.backoffice_submission_roles = [role.id]
|
434 |
470 |
formdef.store()
|
435 |
|
resp = get_app(pub).post_json(url, {'meta': {'backoffice-submission': True}, 'data': {}})
|
|
471 |
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}})
|
436 |
472 |
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
|
437 |
473 |
assert data_class.get(resp.json['data']['id']).backoffice_submission is True
|
438 |
474 |
assert data_class.get(resp.json['data']['id']).user_id is None
|
... | ... | |
440 |
476 |
|
441 |
477 |
formdef.enable_tracking_codes = True
|
442 |
478 |
formdef.store()
|
443 |
|
resp = get_app(pub).post_json(url, {'data': {}})
|
|
479 |
resp = get_app(pub).post_json(url(), {'data': {}})
|
444 |
480 |
assert data_class.get(resp.json['data']['id']).tracking_code
|
445 |
481 |
|
446 |
|
resp = get_app(pub).post_json(url, {'meta': {'draft': True}, 'data': {}})
|
|
482 |
resp = get_app(pub).post_json(url(), {'meta': {'draft': True}, 'data': {}})
|
447 |
483 |
assert data_class.get(resp.json['data']['id']).status == 'draft'
|
448 |
484 |
|
449 |
|
resp = get_app(pub).post_json(url, {'meta': {'backoffice-submission': True}, 'data': {},
|
|
485 |
resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {},
|
450 |
486 |
'context': {'channel': 'mail', 'comments': 'blah'} })
|
451 |
487 |
assert data_class.get(resp.json['data']['id']).status == 'wf-new'
|
452 |
488 |
assert data_class.get(resp.json['data']['id']).backoffice_submission is True
|
... | ... | |
472 |
508 |
formdef.store()
|
473 |
509 |
data_class = formdef.data_class()
|
474 |
510 |
|
475 |
|
signed_url = sign_url('http://example.net/api/formdefs/test/submit' +
|
476 |
|
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234')
|
477 |
|
url = signed_url[len('http://example.net'):]
|
478 |
|
resp = get_app(pub).post_json(url, {'data': {}})
|
|
511 |
def url():
|
|
512 |
signed_url = sign_url('http://example.net/api/formdefs/test/submit' +
|
|
513 |
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
|
514 |
'1234')
|
|
515 |
return signed_url[len('http://example.net'):]
|
|
516 |
resp = get_app(pub).post_json(url(), {'data': {}})
|
479 |
517 |
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
|
480 |
518 |
|
481 |
519 |
assert data_class.count() == 1
|
482 |
520 |
|
483 |
|
resp = get_app(pub).post_json(url, {'data': {}}, status=403)
|
|
521 |
resp = get_app(pub).post_json(url(), {'data': {}}, status=403)
|
484 |
522 |
assert resp.json['err'] == 1
|
485 |
523 |
assert resp.json['err_desc'] == 'only one formdata by user is allowed'
|
486 |
524 |
|
... | ... | |
488 |
526 |
formdata.user_id = '1000' # change owner
|
489 |
527 |
formdata.store()
|
490 |
528 |
|
491 |
|
resp = get_app(pub).post_json(url, {'data': {}}, status=200)
|
|
529 |
resp = get_app(pub).post_json(url(), {'data': {}}, status=200)
|
492 |
530 |
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
|
493 |
531 |
assert data_class.count() == 2
|
494 |
532 |
|