Projet

Général

Profil

0001-api-add-basic-auth-support-to-forms-submit-55858.patch

Lauréline Guérin, 23 septembre 2021 10:10

Télécharger (12,1 ko)

Voir les différences:

Subject: [PATCH] api: add basic auth support to forms submit (#55858)

 tests/api/test_formdef.py | 112 +++++++++++++++++++++++++++-----------
 wcs/api.py                |  37 +++++++------
 2 files changed, 100 insertions(+), 49 deletions(-)
tests/api/test_formdef.py
5 5
import os
6 6
import time
7 7
import urllib.parse
8
from functools import partial
8 9
from unittest import mock
9 10

  
10 11
import pytest
......
12 13
from quixote import get_publisher
13 14

  
14 15
from wcs import fields, qommon
16
from wcs.api_access import ApiAccess
15 17
from wcs.api_utils import sign_url
16 18
from wcs.categories import Category
17 19
from wcs.data_sources import NamedDataSource
......
66 68
    return user
67 69

  
68 70

  
71
def _post_url(url, app, auth, access, user, **kwargs):
72
    if auth == 'http-basic':
73
        app.set_authorization(('Basic', ('test', '12345')))
74
        return app.post(url, **kwargs)
75
    else:
76
        return app.post(
77
            sign_uri(url, user=user, orig=access.access_identifier, key=access.access_key), **kwargs
78
        )
79

  
80

  
81
def _post_json_url(url, app, auth, access, user, **kwargs):
82
    if auth == 'http-basic':
83
        app.set_authorization(('Basic', ('test', '12345')))
84
        return app.post_json(url, **kwargs)
85
    else:
86
        return app.post_json(
87
            sign_uri(url, user=user, orig=access.access_identifier, key=access.access_key), **kwargs
88
        )
89

  
90

  
91
@pytest.fixture
92
def access(pub):
93
    ApiAccess.wipe()
94
    access = ApiAccess()
95
    access.name = 'test'
96
    access.access_identifier = 'test'
97
    access.access_key = '12345'
98
    access.store()
99
    return access
100

  
101

  
69 102
def test_formdef_list(pub):
70 103
    pub.role_class.wipe()
71 104
    role = pub.role_class(name='Foo bar')
......
481 514
    assert resp.json['err_class'] == 'Invalid request'
482 515

  
483 516

  
484
def test_formdef_submit(pub, local_user):
517
@pytest.mark.parametrize('auth', ['signature', 'http-basic'])
518
def test_formdef_submit(pub, local_user, access, auth):
485 519
    pub.role_class.wipe()
486 520
    role = pub.role_class(name='test')
487 521
    role.store()
......
492 526
    formdef = FormDef()
493 527
    formdef.name = 'test'
494 528
    formdef.fields = [fields.StringField(id='0', label='foobar')]
529
    formdef.backoffice_submission_roles = [role.id]
495 530
    formdef.store()
496 531
    data_class = formdef.data_class()
497 532

  
498
    resp = get_app(pub).post_json('/api/formdefs/test/submit', {'data': {}}, status=403)
499
    assert resp.json['err'] == 1
500
    assert resp.json['err_desc'] == 'unsigned API call'
533
    app = get_app(pub)
534
    post_url = partial(_post_url, app=app, auth=auth, access=access, user=local_user)
535
    post_json_url = partial(_post_json_url, app=app, auth=auth, access=access, user=local_user)
501 536

  
502
    def url():
503
        signed_url = sign_url(
504
            'http://example.net/api/formdefs/test/submit'
505
            + '?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
506
            '1234',
507
        )
508
        return signed_url[len('http://example.net') :]
537
    if auth == 'http-basic':
538
        resp = post_json_url('/api/formdefs/test/submit', params={'data': {}}, status=403)
539
        access.roles = [role]
540
        access.store()
509 541

  
510
    resp = get_app(pub).post_json(url(), {'data': {}})
542
    resp = post_json_url('/api/formdefs/test/submit', params={'data': {}})
511 543
    assert resp.json['err'] == 0
512 544
    assert resp.json['data']['url'] == ('http://example.net/test/%s/' % resp.json['data']['id'])
513 545
    assert resp.json['data']['backoffice_url'] == (
......
515 547
    )
516 548
    assert resp.json['data']['api_url'] == ('http://example.net/api/forms/test/%s/' % resp.json['data']['id'])
517 549
    assert data_class.get(resp.json['data']['id']).status == 'wf-new'
518
    assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
550
    if auth == 'signature':
551
        assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
552
    else:
553
        assert data_class.get(resp.json['data']['id']).user_id is None
519 554
    assert data_class.get(resp.json['data']['id']).tracking_code is None
520 555

  
521 556
    local_user2 = get_publisher().user_class()
522 557
    local_user2.name = 'Test'
523 558
    local_user2.email = 'foo@localhost'
524 559
    local_user2.store()
525
    resp = get_app(pub).post_json(url(), {'data': {}, 'user': {'NameID': [], 'email': local_user2.email}})
560
    resp = post_json_url(
561
        '/api/formdefs/test/submit', params={'data': {}, 'user': {'NameID': [], 'email': local_user2.email}}
562
    )
526 563
    assert data_class.get(resp.json['data']['id']).user.email == local_user2.email
527 564

  
528
    resp = get_app(pub).post(
529
        url(), json.dumps({'data': {}}), status=400
565
    resp = post_url(
566
        '/api/formdefs/test/submit', params=json.dumps({'data': {}}), status=400
530 567
    )  # missing Content-Type: application/json header
531 568
    assert resp.json['err_desc'] == 'expected JSON but missing appropriate content-type'
532 569

  
533 570
    # check qualified content type are recognized
534
    resp = get_app(pub).post(url(), json.dumps({'data': {}}), content_type='application/json; charset=utf-8')
571
    resp = post_url(
572
        '/api/formdefs/test/submit',
573
        params=json.dumps({'data': {}}),
574
        content_type='application/json; charset=utf-8',
575
    )
535 576
    assert resp.json['data']['url']
536 577

  
537 578
    formdef.disabled = True
538 579
    formdef.store()
539
    resp = get_app(pub).post_json(url(), {'data': {}}, status=403)
580
    resp = post_json_url('/api/formdefs/test/submit', params={'data': {}}, status=403)
540 581
    assert resp.json['err'] == 1
541 582
    assert resp.json['err_desc'] == 'disabled form'
542 583

  
543 584
    formdef.disabled = False
585
    formdef.backoffice_submission_roles = []
544 586
    formdef.store()
545
    resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}}, status=403)
587
    resp = post_json_url(
588
        '/api/formdefs/test/submit', params={'meta': {'backoffice-submission': True}, 'data': {}}, status=403
589
    )
546 590
    formdef.backoffice_submission_roles = ['xx']
547 591
    formdef.store()
548
    resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}}, status=403)
592
    resp = post_json_url(
593
        '/api/formdefs/test/submit', params={'meta': {'backoffice-submission': True}, 'data': {}}, status=403
594
    )
549 595
    formdef.backoffice_submission_roles = [role.id]
550 596
    formdef.store()
551
    resp = get_app(pub).post_json(url(), {'meta': {'backoffice-submission': True}, 'data': {}})
597
    if auth == 'http-basic':
598
        access.roles = [role]
599
        access.store()
600
    resp = post_json_url(
601
        '/api/formdefs/test/submit', params={'meta': {'backoffice-submission': True}, 'data': {}}
602
    )
552 603
    assert data_class.get(resp.json['data']['id']).status == 'wf-new'
553 604
    assert data_class.get(resp.json['data']['id']).backoffice_submission is True
554 605
    assert data_class.get(resp.json['data']['id']).user_id is None
555
    assert data_class.get(resp.json['data']['id']).submission_agent_id == str(local_user.id)
606
    if auth == 'signature':
607
        assert data_class.get(resp.json['data']['id']).submission_agent_id == str(local_user.id)
608
    else:
609
        assert data_class.get(resp.json['data']['id']).submission_agent_id is None
556 610

  
557 611
    formdef.enable_tracking_codes = True
558 612
    formdef.store()
559
    resp = get_app(pub).post_json(url(), {'data': {}})
613
    resp = post_json_url('/api/formdefs/test/submit', params={'data': {}})
560 614
    assert data_class.get(resp.json['data']['id']).tracking_code
561 615

  
562
    resp = get_app(pub).post_json(url(), {'meta': {'draft': True}, 'data': {}})
616
    resp = post_json_url('/api/formdefs/test/submit', params={'meta': {'draft': True}, 'data': {}})
563 617
    assert data_class.get(resp.json['data']['id']).status == 'draft'
564 618

  
565
    resp = get_app(pub).post_json(
566
        url(),
567
        {
619
    resp = post_json_url(
620
        '/api/formdefs/test/submit',
621
        params={
568 622
            'meta': {'backoffice-submission': True},
569 623
            'data': {},
570 624
            'context': {'channel': 'mail', 'comments': 'blah'},
......
642 696
    formdef.store()
643 697
    data_class = formdef.data_class()
644 698

  
645
    resp = get_app(pub).post_json('/api/formdefs/test/submit', {'data': {}}, status=403)
646
    assert resp.json['err'] == 1
647
    assert resp.json['err_desc'] == 'unsigned API call'
648

  
649 699
    signed_url = sign_url(
650 700
        'http://example.net/api/formdefs/test/submit'
651 701
        + '?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
wcs/api.py
525 525
        get_response().set_content_type('application/json')
526 526
        if self.formdef.is_disabled():
527 527
            raise AccessForbiddenError('disabled form')
528
        if not is_url_signed():
529
            raise AccessForbiddenError('unsigned API call')
530 528
        user = get_user_from_api_query_string()
529
        if user and user.is_api_user:
530
            pass  # API users are ok
531 531
        json_input = get_request().json
532 532
        formdata = self.formdef.data_class()()
533 533

  
......
549 549
        formdata.data = posted_json_data_to_formdata_data(self.formdef, data)
550 550

  
551 551
        meta = json_input.get('meta') or {}
552
        if meta.get('backoffice-submission'):
552
        if meta.get('backoffice-submission') or user and user.is_api_user:
553 553
            if not user:
554 554
                raise AccessForbiddenError('no user set for backoffice submission')
555 555
            if not self.formdef.backoffice_submission_roles:
......
557 557
            if not set(user.get_roles()).intersection(self.formdef.backoffice_submission_roles):
558 558
                raise AccessForbiddenError('not cleared for backoffice submit')
559 559
            formdata.backoffice_submission = True
560
        elif 'user' in json_input:
561
            formdata_user = None
562
            for name_id in json_input['user'].get('NameID') or []:
563
                formdata_user = get_publisher().user_class.get_users_with_name_identifier(name_id)
560
        if not meta.get('backoffice-submission'):
561
            if 'user' in json_input:
562
                formdata_user = None
563
                for name_id in json_input['user'].get('NameID') or []:
564
                    formdata_user = get_publisher().user_class.get_users_with_name_identifier(name_id)
565
                    if formdata_user:
566
                        break
567
                else:
568
                    if json_input['user'].get('email'):
569
                        formdata_user = get_publisher().user_class.get_users_with_email(
570
                            json_input['user'].get('email')
571
                        )
564 572
                if formdata_user:
565
                    break
566
            else:
567
                if json_input['user'].get('email'):
568
                    formdata_user = get_publisher().user_class.get_users_with_email(
569
                        json_input['user'].get('email')
570
                    )
571
            if formdata_user:
572
                formdata.user_id = formdata_user[0].id
573
        elif user and not user.is_api_user:
574
            formdata.user_id = user.id
573
                    formdata.user_id = formdata_user[0].id
574
            elif user and not user.is_api_user:
575
                formdata.user_id = user.id
575 576

  
576 577
        if json_input.get('context'):
577 578
            formdata.submission_context = json_input['context']
......
584 585
            if [x for x in user_forms if not x.is_draft()]:
585 586
                raise AccessForbiddenError('only one formdata by user is allowed')
586 587

  
587
        if meta.get('backoffice-submission'):
588
        if meta.get('backoffice-submission') and not user.is_api_user:
588 589
            # keep track of the agent that did the submit
589 590
            formdata.submission_agent_id = str(user.id)
590 591

  
591
-