0002-api-add-roles-based-access-restrictions-48752.patch
tests/api/test_carddef.py | ||
---|---|---|
13 | 13 |
from quixote import get_publisher |
14 | 14 | |
15 | 15 |
from wcs import fields, qommon |
16 |
from wcs.api_access import ApiAccess |
|
16 | 17 |
from wcs.api_utils import sign_url |
17 | 18 |
from wcs.carddef import CardDef |
18 | 19 |
from wcs.categories import CardDefCategory |
... | ... | |
218 | 219 |
assert resp.json['data']['creation_time'] <= resp.json['data']['completion_time'] |
219 | 220 | |
220 | 221 | |
222 |
def test_cards_restricted_api(pub, local_user): |
|
223 |
pub.role_class.wipe() |
|
224 |
role = pub.role_class(name='test') |
|
225 |
role.store() |
|
226 | ||
227 |
CardDef.wipe() |
|
228 |
carddef = CardDef() |
|
229 |
carddef.name = 'test' |
|
230 |
carddef.fields = [fields.StringField(id='0', label='foobar', varname='foo')] |
|
231 |
carddef.workflow_roles = {'_viewer': role.id} |
|
232 |
carddef.store() |
|
233 | ||
234 |
carddef.data_class().wipe() |
|
235 |
formdata = carddef.data_class()() |
|
236 |
formdata.data = {'0': 'blah'} |
|
237 |
formdata.just_created() |
|
238 |
formdata.store() |
|
239 | ||
240 |
access = ApiAccess() |
|
241 |
access.name = 'test' |
|
242 |
access.access_identifier = 'test' |
|
243 |
access.access_key = '12345' |
|
244 |
access.store() |
|
245 | ||
246 |
# no role restrictions, get it |
|
247 |
resp = get_app(pub).get(sign_uri('/api/cards/test/list', orig='test', key='12345')) |
|
248 |
assert len(resp.json['data']) == 1 |
|
249 | ||
250 |
# restricted to the correct role, get it |
|
251 |
access.roles = [role] |
|
252 |
access.store() |
|
253 |
resp = get_app(pub).get(sign_uri('/api/cards/test/list', orig='test', key='12345')) |
|
254 |
assert len(resp.json['data']) == 1 |
|
255 | ||
256 |
# restricted to another role, do not get it |
|
257 |
role2 = pub.role_class(name='second') |
|
258 |
role2.store() |
|
259 |
access.roles = [role2] |
|
260 |
access.store() |
|
261 |
resp = get_app(pub).get(sign_uri('/api/cards/test/list', orig='test', key='12345'), status=403) |
|
262 |
assert resp.json['err_desc'] == 'unsufficient roles' |
|
263 | ||
264 | ||
221 | 265 |
def test_post_invalid_json(pub, local_user): |
222 | 266 |
resp = get_app(pub).post( |
223 | 267 |
'/api/cards/test/submit', params='not a json payload', content_type='application/json', status=400 |
tests/api/test_user.py | ||
---|---|---|
8 | 8 | |
9 | 9 |
from wcs import fields |
10 | 10 |
from wcs.admin.settings import UserFieldsFormDef |
11 |
from wcs.api_access import ApiAccess |
|
11 | 12 |
from wcs.categories import Category |
12 | 13 |
from wcs.formdef import FormDef |
13 | 14 |
from wcs.qommon.http_request import HTTPRequest |
... | ... | |
335 | 336 |
assert resp2.json['data'][0] == resp.json['data'][1] |
336 | 337 |
assert resp2.json['data'][1] == resp.json['data'][0] |
337 | 338 | |
339 |
# check there is no access with roles-limited API users |
|
340 |
role = pub.role_class(name='test') |
|
341 |
role.store() |
|
342 | ||
343 |
access = ApiAccess() |
|
344 |
access.name = 'test' |
|
345 |
access.access_identifier = 'test' |
|
346 |
access.access_key = '12345' |
|
347 |
access.roles = [role] |
|
348 |
access.store() |
|
349 | ||
350 |
resp = get_app(pub).get(sign_uri('/api/user/forms', orig='test', key='12345'), status=403) |
|
351 |
assert resp.json['err'] == 1 |
|
352 |
assert resp.json['err_desc'] == 'restricted API access' |
|
353 | ||
354 | ||
355 |
def test_user_api_with_restricted_access(pub): |
|
356 |
role = pub.role_class(name='test') |
|
357 |
role.store() |
|
358 | ||
359 |
access = ApiAccess() |
|
360 |
access.name = 'test' |
|
361 |
access.access_identifier = 'test' |
|
362 |
access.access_key = '12345' |
|
363 |
access.roles = [role] |
|
364 |
access.store() |
|
365 | ||
366 |
resp = get_app(pub).get(sign_uri('/api/user/', orig='test', key='12345'), status=403) |
|
367 |
assert resp.json['err'] == 1 |
|
368 |
assert resp.json['err_desc'] == 'restricted API access' |
|
369 | ||
338 | 370 | |
339 | 371 |
def test_user_forms_limit_offset(pub, local_user): |
340 | 372 |
if not pub.is_using_postgresql(): |
tests/api/test_workflow.py | ||
---|---|---|
6 | 6 |
from quixote import get_publisher |
7 | 7 | |
8 | 8 |
from wcs import fields |
9 |
from wcs.api_access import ApiAccess |
|
9 | 10 |
from wcs.formdef import FormDef |
10 | 11 |
from wcs.qommon.http_request import HTTPRequest |
11 | 12 |
from wcs.qommon.ident.password_accounts import PasswordAccount |
... | ... | |
101 | 102 | |
102 | 103 |
get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX'), status=200) |
103 | 104 |
assert formdef.data_class().get(formdata.id).status == 'wf-st2' |
105 |
assert formdef.data_class().get(formdata.id).evolution[-1].who is None |
|
104 | 106 | |
105 | 107 |
# check with trailing slash |
106 | 108 |
formdata.store() # reset |
... | ... | |
263 | 265 |
assert formdef.data_class().get(formdata.id).status == 'wf-st3' |
264 | 266 | |
265 | 267 | |
268 |
def test_workflow_trigger_api_access(pub, local_user): |
|
269 |
pub.role_class.wipe() |
|
270 |
role = pub.role_class(name='xxx') |
|
271 |
role.store() |
|
272 |
role2 = pub.role_class(name='xxx2') |
|
273 |
role2.store() |
|
274 | ||
275 |
workflow = Workflow(name='test') |
|
276 |
st1 = workflow.add_status('Status1', 'st1') |
|
277 |
jump = JumpWorkflowStatusItem() |
|
278 |
jump.trigger = 'XXX' |
|
279 |
jump.status = 'st2' |
|
280 |
st1.items.append(jump) |
|
281 |
jump.parent = st1 |
|
282 |
workflow.add_status('Status2', 'st2') |
|
283 |
workflow.store() |
|
284 | ||
285 |
FormDef.wipe() |
|
286 |
formdef = FormDef() |
|
287 |
formdef.name = 'test' |
|
288 |
formdef.fields = [] |
|
289 |
formdef.workflow_id = workflow.id |
|
290 |
formdef.store() |
|
291 | ||
292 |
formdef.data_class().wipe() |
|
293 |
formdata = formdef.data_class()() |
|
294 |
formdata.just_created() |
|
295 |
formdata.store() |
|
296 | ||
297 |
jump.by = [role.id] |
|
298 |
workflow.store() |
|
299 | ||
300 |
access = ApiAccess() |
|
301 |
access.name = 'test' |
|
302 |
access.access_identifier = 'test' |
|
303 |
access.access_key = '12345' |
|
304 |
access.roles = [role2] |
|
305 |
access.store() |
|
306 | ||
307 |
get_app(pub).post( |
|
308 |
sign_uri(formdata.get_url() + 'jump/trigger/XXX/', orig='test', key='12345'), status=403 |
|
309 |
) |
|
310 |
assert formdef.data_class().get(formdata.id).status == 'wf-st1' # no change |
|
311 | ||
312 |
access.roles = [role] |
|
313 |
access.store() |
|
314 | ||
315 |
get_app(pub).post( |
|
316 |
sign_uri(formdata.get_url() + 'jump/trigger/XXX/', orig='test', key='12345'), status=200 |
|
317 |
) |
|
318 |
assert formdef.data_class().get(formdata.id).status == 'wf-st2' |
|
319 |
assert formdef.data_class().get(formdata.id).evolution[-1].who is None |
|
320 | ||
321 | ||
266 | 322 |
def test_workflow_global_webservice_trigger(pub, local_user): |
267 | 323 |
workflow = Workflow(name='test') |
268 | 324 |
workflow.add_status('Status1', 'st1') |
wcs/api.py | ||
---|---|---|
313 | 313 |
) |
314 | 314 |
if formdata_user: |
315 | 315 |
formdata.user_id = formdata_user[0].id |
316 |
else:
|
|
316 |
elif user and not user.is_api_user:
|
|
317 | 317 |
formdata.user_id = user.id |
318 | 318 | |
319 | 319 |
formdata.store() |
... | ... | |
563 | 563 |
) |
564 | 564 |
if formdata_user: |
565 | 565 |
formdata.user_id = formdata_user[0].id |
566 |
elif user: |
|
566 |
elif user and not user.is_api_user:
|
|
567 | 567 |
formdata.user_id = user.id |
568 | ||
568 | 569 |
if json_input.get('context'): |
569 | 570 |
formdata.submission_context = json_input['context'] |
570 | 571 |
formdata.submission_channel = formdata.submission_context.pop('channel', None) |
... | ... | |
844 | 845 |
user = self.user or get_user_from_api_query_string() or get_request().user |
845 | 846 |
if not user: |
846 | 847 |
raise AccessForbiddenError('no user specified') |
848 |
if user.is_api_user: |
|
849 |
raise AccessForbiddenError('restricted API access') |
|
847 | 850 |
user_info = user.get_substitution_variables(prefix='') |
848 | 851 |
del user_info['user'] |
849 | 852 |
user_info['id'] = user.id |
... | ... | |
902 | 905 |
return json.dumps({'err': 1, 'err_desc': 'unknown NameID', 'data': []}) |
903 | 906 |
if not user: |
904 | 907 |
return json.dumps({'err': 1, 'err_desc': 'no user specified', 'data': []}) |
908 |
if user.is_api_user: |
|
909 |
raise AccessForbiddenError('restricted API access') |
|
905 | 910 | |
906 | 911 |
forms = self.get_user_forms(user) |
907 | 912 |
wcs/api_access.py | ||
---|---|---|
80 | 80 |
if role_name: |
81 | 81 |
criterias.append(Equal('name', role_name)) |
82 | 82 |
return get_publisher().role_class.select([Or(criterias)], order_by='name') |
83 | ||
84 |
def get_as_api_user(self): |
|
85 |
class RestrictedApiUser: |
|
86 |
# kept as inner class so cannot be pickled |
|
87 |
id = Ellipsis # make sure it fails all over the place if used |
|
88 |
is_admin = False |
|
89 |
is_api_user = True |
|
90 | ||
91 |
def get_roles(self): |
|
92 |
return self.roles |
|
93 | ||
94 |
user = RestrictedApiUser() |
|
95 |
user.roles = [x.id for x in self.get_roles()] |
|
96 |
return user |
wcs/api_utils.py | ||
---|---|---|
121 | 121 | |
122 | 122 | |
123 | 123 |
def get_user_from_api_query_string(api_name=None): |
124 |
# check signature or auth header |
|
124 | 125 |
if not is_url_signed(): |
125 | 126 |
if api_name: |
126 | 127 |
check_http_basic_auth(api_name) |
127 | 128 |
else: |
128 | 129 |
return None |
129 |
# Signature or auth header are ok. |
|
130 |
# Look for the user, by email/NameID. |
|
130 | ||
131 |
# check access restriction defined in API access object |
|
132 |
orig = get_request().form.get('orig') |
|
133 |
if orig: |
|
134 |
api_access = ApiAccess.get_by_identifier(orig) |
|
135 |
if api_access and api_access.get_roles(): |
|
136 |
return api_access.get_as_api_user() |
|
137 | ||
138 |
# get user reference from query string |
|
131 | 139 |
user = None |
132 | 140 |
if get_request().form.get('email'): |
133 | 141 |
email = get_request().form.get('email') |
wcs/users.py | ||
---|---|---|
43 | 43 |
deleted_timestamp = None |
44 | 44 | |
45 | 45 |
last_seen = None |
46 |
is_api_user = False |
|
46 | 47 | |
47 | 48 |
default_search_result_template = """{{ user_email|default:"" }} |
48 | 49 |
{% if user_var_phone %} 📞 {{ user_var_phone }}{% endif %} |
49 |
- |