Projet

Général

Profil

0002-api-add-roles-based-access-restrictions-48752.patch

Frédéric Péters, 03 mai 2021 09:54

Télécharger (10,7 ko)

Voir les différences:

Subject: [PATCH 2/2] api: add roles-based access restrictions (#48752)

 tests/api/test_carddef.py  | 44 ++++++++++++++++++++++++++++++
 tests/api/test_user.py     | 32 ++++++++++++++++++++++
 tests/api/test_workflow.py | 56 ++++++++++++++++++++++++++++++++++++++
 wcs/api.py                 |  9 ++++--
 wcs/api_access.py          | 14 ++++++++++
 wcs/api_utils.py           | 12 ++++++--
 wcs/users.py               |  1 +
 7 files changed, 164 insertions(+), 4 deletions(-)
tests/api/test_carddef.py
13 13
from quixote import get_publisher
14 14

  
15 15
from wcs import fields, qommon
16
from wcs.api_access import ApiAccess
16 17
from wcs.api_utils import sign_url
17 18
from wcs.carddef import CardDef
18 19
from wcs.categories import CardDefCategory
......
218 219
    assert resp.json['data']['creation_time'] <= resp.json['data']['completion_time']
219 220

  
220 221

  
222
def test_cards_restricted_api(pub, local_user):
223
    pub.role_class.wipe()
224
    role = pub.role_class(name='test')
225
    role.store()
226

  
227
    CardDef.wipe()
228
    carddef = CardDef()
229
    carddef.name = 'test'
230
    carddef.fields = [fields.StringField(id='0', label='foobar', varname='foo')]
231
    carddef.workflow_roles = {'_viewer': role.id}
232
    carddef.store()
233

  
234
    carddef.data_class().wipe()
235
    formdata = carddef.data_class()()
236
    formdata.data = {'0': 'blah'}
237
    formdata.just_created()
238
    formdata.store()
239

  
240
    access = ApiAccess()
241
    access.name = 'test'
242
    access.access_identifier = 'test'
243
    access.access_key = '12345'
244
    access.store()
245

  
246
    # no role restrictions, get it
247
    resp = get_app(pub).get(sign_uri('/api/cards/test/list', orig='test', key='12345'))
248
    assert len(resp.json['data']) == 1
249

  
250
    # restricted to the correct role, get it
251
    access.roles = [role]
252
    access.store()
253
    resp = get_app(pub).get(sign_uri('/api/cards/test/list', orig='test', key='12345'))
254
    assert len(resp.json['data']) == 1
255

  
256
    # restricted to another role, do not get it
257
    role2 = pub.role_class(name='second')
258
    role2.store()
259
    access.roles = [role2]
260
    access.store()
261
    resp = get_app(pub).get(sign_uri('/api/cards/test/list', orig='test', key='12345'), status=403)
262
    assert resp.json['err_desc'] == 'unsufficient roles'
263

  
264

  
221 265
def test_post_invalid_json(pub, local_user):
222 266
    resp = get_app(pub).post(
223 267
        '/api/cards/test/submit', params='not a json payload', content_type='application/json', status=400
tests/api/test_user.py
8 8

  
9 9
from wcs import fields
10 10
from wcs.admin.settings import UserFieldsFormDef
11
from wcs.api_access import ApiAccess
11 12
from wcs.categories import Category
12 13
from wcs.formdef import FormDef
13 14
from wcs.qommon.http_request import HTTPRequest
......
335 336
    assert resp2.json['data'][0] == resp.json['data'][1]
336 337
    assert resp2.json['data'][1] == resp.json['data'][0]
337 338

  
339
    # check there is no access with roles-limited API users
340
    role = pub.role_class(name='test')
341
    role.store()
342

  
343
    access = ApiAccess()
344
    access.name = 'test'
345
    access.access_identifier = 'test'
346
    access.access_key = '12345'
347
    access.roles = [role]
348
    access.store()
349

  
350
    resp = get_app(pub).get(sign_uri('/api/user/forms', orig='test', key='12345'), status=403)
351
    assert resp.json['err'] == 1
352
    assert resp.json['err_desc'] == 'restricted API access'
353

  
354

  
355
def test_user_api_with_restricted_access(pub):
356
    role = pub.role_class(name='test')
357
    role.store()
358

  
359
    access = ApiAccess()
360
    access.name = 'test'
361
    access.access_identifier = 'test'
362
    access.access_key = '12345'
363
    access.roles = [role]
364
    access.store()
365

  
366
    resp = get_app(pub).get(sign_uri('/api/user/', orig='test', key='12345'), status=403)
367
    assert resp.json['err'] == 1
368
    assert resp.json['err_desc'] == 'restricted API access'
369

  
338 370

  
339 371
def test_user_forms_limit_offset(pub, local_user):
340 372
    if not pub.is_using_postgresql():
tests/api/test_workflow.py
6 6
from quixote import get_publisher
7 7

  
8 8
from wcs import fields
9
from wcs.api_access import ApiAccess
9 10
from wcs.formdef import FormDef
10 11
from wcs.qommon.http_request import HTTPRequest
11 12
from wcs.qommon.ident.password_accounts import PasswordAccount
......
101 102

  
102 103
    get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'), status=200)
103 104
    assert formdef.data_class().get(formdata.id).status == 'wf-st2'
105
    assert formdef.data_class().get(formdata.id).evolution[-1].who is None
104 106

  
105 107
    # check with trailing slash
106 108
    formdata.store()  # reset
......
263 265
    assert formdef.data_class().get(formdata.id).status == 'wf-st3'
264 266

  
265 267

  
268
def test_workflow_trigger_api_access(pub, local_user):
269
    pub.role_class.wipe()
270
    role = pub.role_class(name='xxx')
271
    role.store()
272
    role2 = pub.role_class(name='xxx2')
273
    role2.store()
274

  
275
    workflow = Workflow(name='test')
276
    st1 = workflow.add_status('Status1', 'st1')
277
    jump = JumpWorkflowStatusItem()
278
    jump.trigger = 'XXX'
279
    jump.status = 'st2'
280
    st1.items.append(jump)
281
    jump.parent = st1
282
    workflow.add_status('Status2', 'st2')
283
    workflow.store()
284

  
285
    FormDef.wipe()
286
    formdef = FormDef()
287
    formdef.name = 'test'
288
    formdef.fields = []
289
    formdef.workflow_id = workflow.id
290
    formdef.store()
291

  
292
    formdef.data_class().wipe()
293
    formdata = formdef.data_class()()
294
    formdata.just_created()
295
    formdata.store()
296

  
297
    jump.by = [role.id]
298
    workflow.store()
299

  
300
    access = ApiAccess()
301
    access.name = 'test'
302
    access.access_identifier = 'test'
303
    access.access_key = '12345'
304
    access.roles = [role2]
305
    access.store()
306

  
307
    get_app(pub).post(
308
        sign_uri(formdata.get_url() + 'jump/trigger/XXX/', orig='test', key='12345'), status=403
309
    )
310
    assert formdef.data_class().get(formdata.id).status == 'wf-st1'  # no change
311

  
312
    access.roles = [role]
313
    access.store()
314

  
315
    get_app(pub).post(
316
        sign_uri(formdata.get_url() + 'jump/trigger/XXX/', orig='test', key='12345'), status=200
317
    )
318
    assert formdef.data_class().get(formdata.id).status == 'wf-st2'
319
    assert formdef.data_class().get(formdata.id).evolution[-1].who is None
320

  
321

  
266 322
def test_workflow_global_webservice_trigger(pub, local_user):
267 323
    workflow = Workflow(name='test')
268 324
    workflow.add_status('Status1', 'st1')
wcs/api.py
313 313
                    )
314 314
            if formdata_user:
315 315
                formdata.user_id = formdata_user[0].id
316
        else:
316
        elif user and not user.is_api_user:
317 317
            formdata.user_id = user.id
318 318

  
319 319
        formdata.store()
......
563 563
                    )
564 564
            if formdata_user:
565 565
                formdata.user_id = formdata_user[0].id
566
        elif user:
566
        elif user and not user.is_api_user:
567 567
            formdata.user_id = user.id
568

  
568 569
        if json_input.get('context'):
569 570
            formdata.submission_context = json_input['context']
570 571
            formdata.submission_channel = formdata.submission_context.pop('channel', None)
......
844 845
        user = self.user or get_user_from_api_query_string() or get_request().user
845 846
        if not user:
846 847
            raise AccessForbiddenError('no user specified')
848
        if user.is_api_user:
849
            raise AccessForbiddenError('restricted API access')
847 850
        user_info = user.get_substitution_variables(prefix='')
848 851
        del user_info['user']
849 852
        user_info['id'] = user.id
......
902 905
            return json.dumps({'err': 1, 'err_desc': 'unknown NameID', 'data': []})
903 906
        if not user:
904 907
            return json.dumps({'err': 1, 'err_desc': 'no user specified', 'data': []})
908
        if user.is_api_user:
909
            raise AccessForbiddenError('restricted API access')
905 910

  
906 911
        forms = self.get_user_forms(user)
907 912

  
wcs/api_access.py
80 80
                if role_name:
81 81
                    criterias.append(Equal('name', role_name))
82 82
        return get_publisher().role_class.select([Or(criterias)], order_by='name')
83

  
84
    def get_as_api_user(self):
85
        class RestrictedApiUser:
86
            # kept as inner class so cannot be pickled
87
            id = Ellipsis  # make sure it fails all over the place if used
88
            is_admin = False
89
            is_api_user = True
90

  
91
            def get_roles(self):
92
                return self.roles
93

  
94
        user = RestrictedApiUser()
95
        user.roles = [x.id for x in self.get_roles()]
96
        return user
wcs/api_utils.py
121 121

  
122 122

  
123 123
def get_user_from_api_query_string(api_name=None):
124
    # check signature or auth header
124 125
    if not is_url_signed():
125 126
        if api_name:
126 127
            check_http_basic_auth(api_name)
127 128
        else:
128 129
            return None
129
    # Signature or auth header are ok.
130
    # Look for the user, by email/NameID.
130

  
131
    # check access restriction defined in API access object
132
    orig = get_request().form.get('orig')
133
    if orig:
134
        api_access = ApiAccess.get_by_identifier(orig)
135
        if api_access and api_access.get_roles():
136
            return api_access.get_as_api_user()
137

  
138
    # get user reference from query string
131 139
    user = None
132 140
    if get_request().form.get('email'):
133 141
        email = get_request().form.get('email')
wcs/users.py
43 43
    deleted_timestamp = None
44 44

  
45 45
    last_seen = None
46
    is_api_user = False
46 47

  
47 48
    default_search_result_template = """{{ user_email|default:"" }}
48 49
{% if user_var_phone %} 📞 {{ user_var_phone }}{% endif %}
49
-