0001-api-accept-HTTP-Basic-authentication-scheme-for-API-.patch
tests/api/test_carddef.py | ||
---|---|---|
262 | 262 |
assert resp.json['err_desc'] == 'unsufficient roles' |
263 | 263 | |
264 | 264 | |
265 |
def test_cards_http_auth_access(pub, local_user): |
|
266 |
pub.role_class.wipe() |
|
267 |
role = pub.role_class(name='test') |
|
268 |
role.store() |
|
269 | ||
270 |
CardDef.wipe() |
|
271 |
carddef = CardDef() |
|
272 |
carddef.name = 'test' |
|
273 |
carddef.fields = [fields.StringField(id='0', label='foobar', varname='foo')] |
|
274 |
carddef.workflow_roles = {'_viewer': role.id} |
|
275 |
carddef.store() |
|
276 | ||
277 |
carddef.data_class().wipe() |
|
278 |
formdata = carddef.data_class()() |
|
279 |
formdata.data = {'0': 'blah'} |
|
280 |
formdata.just_created() |
|
281 |
formdata.store() |
|
282 | ||
283 |
access = ApiAccess() |
|
284 |
access.name = 'test' |
|
285 |
access.access_identifier = 'test' |
|
286 |
access.access_key = '12345' |
|
287 |
access.store() |
|
288 | ||
289 |
app = get_app(pub) |
|
290 |
app.set_authorization(('Basic', ('test', '12345'))) |
|
291 | ||
292 |
# no role restrictions, no admin |
|
293 |
resp = app.get('/api/cards/test/list', status=403) |
|
294 | ||
295 |
# restricted to the correct role, get it |
|
296 |
access.roles = [role] |
|
297 |
access.store() |
|
298 |
resp = app.get('/api/cards/test/list') |
|
299 |
assert len(resp.json['data']) == 1 |
|
300 | ||
301 |
# restricted to another role, do not get it |
|
302 |
role2 = pub.role_class(name='second') |
|
303 |
role2.store() |
|
304 |
access.roles = [role2] |
|
305 |
access.store() |
|
306 |
resp = app.get('/api/cards/test/list', status=403) |
|
307 |
assert resp.json['err_desc'] == 'unsufficient roles' |
|
308 | ||
309 | ||
265 | 310 |
def test_post_invalid_json(pub, local_user): |
266 | 311 |
resp = get_app(pub).post( |
267 | 312 |
'/api/cards/test/submit', params='not a json payload', content_type='application/json', status=400 |
tests/api/test_workflow.py | ||
---|---|---|
319 | 319 |
assert formdef.data_class().get(formdata.id).evolution[-1].who is None |
320 | 320 | |
321 | 321 | |
322 |
def test_workflow_trigger_http_auth_access(pub, local_user): |
|
323 |
pub.role_class.wipe() |
|
324 |
role = pub.role_class(name='xxx') |
|
325 |
role.store() |
|
326 |
role2 = pub.role_class(name='xxx2') |
|
327 |
role2.store() |
|
328 | ||
329 |
workflow = Workflow(name='test') |
|
330 |
st1 = workflow.add_status('Status1', 'st1') |
|
331 |
jump = JumpWorkflowStatusItem() |
|
332 |
jump.trigger = 'XXX' |
|
333 |
jump.status = 'st2' |
|
334 |
st1.items.append(jump) |
|
335 |
jump.parent = st1 |
|
336 |
workflow.add_status('Status2', 'st2') |
|
337 |
workflow.store() |
|
338 | ||
339 |
FormDef.wipe() |
|
340 |
formdef = FormDef() |
|
341 |
formdef.name = 'test' |
|
342 |
formdef.fields = [] |
|
343 |
formdef.workflow_id = workflow.id |
|
344 |
formdef.store() |
|
345 | ||
346 |
formdef.data_class().wipe() |
|
347 |
formdata = formdef.data_class()() |
|
348 |
formdata.just_created() |
|
349 |
formdata.store() |
|
350 | ||
351 |
jump.by = [role.id] |
|
352 |
workflow.store() |
|
353 | ||
354 |
access = ApiAccess() |
|
355 |
access.name = 'test' |
|
356 |
access.access_identifier = 'test' |
|
357 |
access.access_key = '12345' |
|
358 |
access.roles = [role2] |
|
359 |
access.store() |
|
360 | ||
361 |
app = get_app(pub) |
|
362 |
app.set_authorization(('Basic', ('test', '12345'))) |
|
363 |
app.post(formdata.get_url() + 'jump/trigger/XXX/', status=403) |
|
364 |
assert formdef.data_class().get(formdata.id).status == 'wf-st1' # no change |
|
365 | ||
366 |
access.roles = [role] |
|
367 |
access.store() |
|
368 | ||
369 |
app.post(formdata.get_url() + 'jump/trigger/XXX/', headers={'accept': 'application/json'}, status=200) |
|
370 |
assert formdef.data_class().get(formdata.id).status == 'wf-st2' |
|
371 |
assert formdef.data_class().get(formdata.id).evolution[-1].who is None |
|
372 | ||
373 | ||
322 | 374 |
def test_workflow_global_webservice_trigger(pub, local_user): |
323 | 375 |
workflow = Workflow(name='test') |
324 | 376 |
workflow.add_status('Status1', 'st1') |
wcs/api_access.py | ||
---|---|---|
94 | 94 |
user = RestrictedApiUser() |
95 | 95 |
user.roles = [x.id for x in self.get_roles()] |
96 | 96 |
return user |
97 | ||
98 |
@classmethod |
|
99 |
def get_with_credentials(cls, username, password): |
|
100 |
api_access = cls.get_by_identifier(username) |
|
101 |
if not api_access or api_access.access_key != password: |
|
102 |
raise KeyError |
|
103 |
return api_access.get_as_api_user() |
wcs/api_utils.py | ||
---|---|---|
123 | 123 |
def get_user_from_api_query_string(api_name=None): |
124 | 124 |
# check signature or auth header |
125 | 125 |
if not is_url_signed(): |
126 |
user = getattr(get_request(), 'user', None) |
|
127 |
if user and user.is_api_user: |
|
128 |
return user |
|
126 | 129 |
if api_name: |
127 | 130 |
check_http_basic_auth(api_name) |
128 | 131 |
else: |
wcs/qommon/http_request.py | ||
---|---|---|
61 | 61 |
# padding or invalid base64-encoded string). |
62 | 62 |
self._user = None |
63 | 63 |
return |
64 | ||
65 |
from wcs.api_access import ApiAccess |
|
66 | ||
64 | 67 |
from .ident.password_accounts import PasswordAccount |
65 | 68 | |
66 | 69 |
try: |
67 | 70 |
self._user = PasswordAccount.get_with_credentials(username, password) |
68 | 71 |
except KeyError: |
69 |
self._user = None |
|
72 |
try: |
|
73 |
self._user = ApiAccess.get_with_credentials(username, password) |
|
74 |
except KeyError: |
|
75 |
self._user = None |
|
76 | ||
70 | 77 |
return |
71 | 78 | |
72 | 79 |
try: |
73 |
- |