Projet

Général

Profil

0002-api-add-roles-based-access-restrictions-48752.patch

Frédéric Péters, 27 avril 2021 07:56

Télécharger (10,4 ko)

Voir les différences:

Subject: [PATCH 2/2] api: add roles-based access restrictions (#48752)

 tests/api/test_carddef.py  | 44 ++++++++++++++++++++++++++++++
 tests/api/test_user.py     | 32 ++++++++++++++++++++++
 tests/api/test_workflow.py | 56 ++++++++++++++++++++++++++++++++++++++
 wcs/api.py                 |  9 ++++--
 wcs/api_access.py          | 13 +++++++++
 wcs/api_utils.py           | 12 ++++++--
 6 files changed, 162 insertions(+), 4 deletions(-)
tests/api/test_carddef.py
13 13
from quixote import get_publisher
14 14

  
15 15
from wcs import fields, qommon
16
from wcs.api_access import ApiAccess
16 17
from wcs.api_utils import sign_url
17 18
from wcs.carddef import CardDef
18 19
from wcs.categories import CardDefCategory
......
218 219
    assert resp.json['data']['creation_time'] <= resp.json['data']['completion_time']
219 220

  
220 221

  
222
def test_cards_retricted_api(pub, local_user):
223
    pub.role_class.wipe()
224
    role = pub.role_class(name='test')
225
    role.store()
226

  
227
    CardDef.wipe()
228
    carddef = CardDef()
229
    carddef.name = 'test'
230
    carddef.fields = [fields.StringField(id='0', label='foobar', varname='foo')]
231
    carddef.workflow_roles = {'_viewer': role.id}
232
    carddef.store()
233

  
234
    carddef.data_class().wipe()
235
    formdata = carddef.data_class()()
236
    formdata.data = {'0': 'blah'}
237
    formdata.just_created()
238
    formdata.store()
239

  
240
    access = ApiAccess()
241
    access.name = 'test'
242
    access.access_identifier = 'test'
243
    access.access_key = '12345'
244
    access.store()
245

  
246
    # no role restrictions, get it
247
    resp = get_app(pub).get(sign_uri('/api/cards/test/list', orig='test', key='12345'))
248
    assert len(resp.json['data']) == 1
249

  
250
    # restricted to the correct role, get it
251
    access.roles = [role]
252
    access.store()
253
    resp = get_app(pub).get(sign_uri('/api/cards/test/list', orig='test', key='12345'))
254
    assert len(resp.json['data']) == 1
255

  
256
    # retricted to another role, do not get it
257
    role2 = pub.role_class(name='second')
258
    role2.store()
259
    access.roles = [role2]
260
    access.store()
261
    resp = get_app(pub).get(sign_uri('/api/cards/test/list', orig='test', key='12345'), status=403)
262
    assert resp.json['err_desc'] == 'unsufficient roles'
263

  
264

  
221 265
def test_post_invalid_json(pub, local_user):
222 266
    resp = get_app(pub).post(
223 267
        '/api/cards/test/submit', params='not a json payload', content_type='application/json', status=400
tests/api/test_user.py
8 8

  
9 9
from wcs import fields
10 10
from wcs.admin.settings import UserFieldsFormDef
11
from wcs.api_access import ApiAccess
11 12
from wcs.formdef import FormDef
12 13
from wcs.qommon.http_request import HTTPRequest
13 14
from wcs.qommon.ident.password_accounts import PasswordAccount
......
334 335
    assert resp2.json['data'][0] == resp.json['data'][1]
335 336
    assert resp2.json['data'][1] == resp.json['data'][0]
336 337

  
338
    # check there is no access with roles-limited API users
339
    role = pub.role_class(name='test')
340
    role.store()
341

  
342
    access = ApiAccess()
343
    access.name = 'test'
344
    access.access_identifier = 'test'
345
    access.access_key = '12345'
346
    access.roles = [role]
347
    access.store()
348

  
349
    resp = get_app(pub).get(sign_uri('/api/user/forms', orig='test', key='12345'), status=403)
350
    assert resp.json['err'] == 1
351
    assert resp.json['err_desc'] == 'restricted API access'
352

  
353

  
354
def test_user_api_with_restricted_access(pub):
355
    role = pub.role_class(name='test')
356
    role.store()
357

  
358
    access = ApiAccess()
359
    access.name = 'test'
360
    access.access_identifier = 'test'
361
    access.access_key = '12345'
362
    access.roles = [role]
363
    access.store()
364

  
365
    resp = get_app(pub).get(sign_uri('/api/user/', orig='test', key='12345'), status=403)
366
    assert resp.json['err'] == 1
367
    assert resp.json['err_desc'] == 'restricted API access'
368

  
337 369

  
338 370
def test_user_forms_limit_offset(pub, local_user):
339 371
    if not pub.is_using_postgresql():
tests/api/test_workflow.py
6 6
from quixote import get_publisher
7 7

  
8 8
from wcs import fields
9
from wcs.api_access import ApiAccess
9 10
from wcs.formdef import FormDef
10 11
from wcs.qommon.http_request import HTTPRequest
11 12
from wcs.qommon.ident.password_accounts import PasswordAccount
......
101 102

  
102 103
    get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'), status=200)
103 104
    assert formdef.data_class().get(formdata.id).status == 'wf-st2'
105
    assert formdef.data_class().get(formdata.id).evolution[-1].who is None
104 106

  
105 107
    # check with trailing slash
106 108
    formdata.store()  # reset
......
263 265
    assert formdef.data_class().get(formdata.id).status == 'wf-st3'
264 266

  
265 267

  
268
def test_workflow_trigger_api_access(pub, local_user):
269
    pub.role_class.wipe()
270
    role = pub.role_class(name='xxx')
271
    role.store()
272
    role2 = pub.role_class(name='xxx2')
273
    role2.store()
274

  
275
    workflow = Workflow(name='test')
276
    st1 = workflow.add_status('Status1', 'st1')
277
    jump = JumpWorkflowStatusItem()
278
    jump.trigger = 'XXX'
279
    jump.status = 'st2'
280
    st1.items.append(jump)
281
    jump.parent = st1
282
    workflow.add_status('Status2', 'st2')
283
    workflow.store()
284

  
285
    FormDef.wipe()
286
    formdef = FormDef()
287
    formdef.name = 'test'
288
    formdef.fields = []
289
    formdef.workflow_id = workflow.id
290
    formdef.store()
291

  
292
    formdef.data_class().wipe()
293
    formdata = formdef.data_class()()
294
    formdata.just_created()
295
    formdata.store()
296

  
297
    jump.by = [role.id]
298
    workflow.store()
299

  
300
    access = ApiAccess()
301
    access.name = 'test'
302
    access.access_identifier = 'test'
303
    access.access_key = '12345'
304
    access.roles = [role2]
305
    access.store()
306

  
307
    get_app(pub).post(
308
        sign_uri(formdata.get_url() + 'jump/trigger/XXX/', orig='test', key='12345'), status=403
309
    )
310
    assert formdef.data_class().get(formdata.id).status == 'wf-st1'  # no change
311

  
312
    access.roles = [role]
313
    access.store()
314

  
315
    get_app(pub).post(
316
        sign_uri(formdata.get_url() + 'jump/trigger/XXX/', orig='test', key='12345'), status=200
317
    )
318
    assert formdef.data_class().get(formdata.id).status == 'wf-st2'
319
    assert formdef.data_class().get(formdata.id).evolution[-1].who is None
320

  
321

  
266 322
def test_workflow_global_webservice_trigger(pub, local_user):
267 323
    workflow = Workflow(name='test')
268 324
    workflow.add_status('Status1', 'st1')
wcs/api.py
313 313
                    )
314 314
            if formdata_user:
315 315
                formdata.user_id = formdata_user[0].id
316
        else:
316
        elif isinstance(user, get_publisher().user_class):
317 317
            formdata.user_id = user.id
318 318

  
319 319
        formdata.store()
......
557 557
                    )
558 558
            if formdata_user:
559 559
                formdata.user_id = formdata_user[0].id
560
        elif user:
560
        elif isinstance(user, get_publisher().user_class):
561 561
            formdata.user_id = user.id
562

  
562 563
        if json_input.get('context'):
563 564
            formdata.submission_context = json_input['context']
564 565
            formdata.submission_channel = formdata.submission_context.pop('channel', None)
......
834 835
        user = self.user or get_user_from_api_query_string() or get_request().user
835 836
        if not user:
836 837
            raise AccessForbiddenError('no user specified')
838
        if not isinstance(user, get_publisher().user_class):
839
            raise AccessForbiddenError('restricted API access')
837 840
        user_info = user.get_substitution_variables(prefix='')
838 841
        del user_info['user']
839 842
        user_info['id'] = user.id
......
882 885
            return json.dumps({'err': 1, 'err_desc': 'unknown NameID', 'data': []})
883 886
        if not user:
884 887
            return json.dumps({'err': 1, 'err_desc': 'no user specified', 'data': []})
888
        if not isinstance(user, get_publisher().user_class):
889
            raise AccessForbiddenError('restricted API access')
885 890

  
886 891
        forms = self.get_user_forms(user)
887 892

  
wcs/api_access.py
80 80
                if role_name:
81 81
                    criterias.append(Equal('name', role_name))
82 82
        return get_publisher().role_class.select([Or(criterias)], order_by='name')
83

  
84
    def get_as_api_user(self):
85
        class RestrictedApiUser:
86
            # kept as inner class so cannot be pickled
87
            id = Ellipsis  # make sure it fails all over the place if used
88
            is_admin = False
89

  
90
            def get_roles(self):
91
                return self.roles
92

  
93
        user = RestrictedApiUser()
94
        user.roles = [x.id for x in self.get_roles()]
95
        return user
wcs/api_utils.py
121 121

  
122 122

  
123 123
def get_user_from_api_query_string(api_name=None):
124
    # check signature or auth header
124 125
    if not is_url_signed():
125 126
        if api_name:
126 127
            check_http_basic_auth(api_name)
127 128
        else:
128 129
            return None
129
    # Signature or auth header are ok.
130
    # Look for the user, by email/NameID.
130

  
131
    # check access restriction defined in API access object
132
    orig = get_request().form.get('orig')
133
    if orig:
134
        api_access = ApiAccess.get_by_identifier(orig)
135
        if api_access and api_access.get_roles():
136
            return api_access.get_as_api_user()
137

  
138
    # get user reference from query string
131 139
    user = None
132 140
    if get_request().form.get('email'):
133 141
        email = get_request().form.get('email')
134
-