1 |
1 |
# -*- coding: utf-8 -*-
|
2 |
2 |
|
|
3 |
import base64
|
|
4 |
import json
|
3 |
5 |
import os
|
|
6 |
import time
|
4 |
7 |
|
|
8 |
import mock
|
5 |
9 |
import pytest
|
|
10 |
from django.utils.encoding import force_text
|
|
11 |
from django.utils.six import StringIO
|
|
12 |
from django.utils.six.moves.urllib import parse as urllib
|
6 |
13 |
from quixote import get_publisher
|
7 |
14 |
from utilities import clean_temporary_pub, create_temporary_pub, get_app
|
8 |
15 |
|
9 |
|
from wcs import fields
|
|
16 |
from wcs import fields, qommon
|
|
17 |
from wcs.api_utils import sign_url
|
10 |
18 |
from wcs.carddef import CardDef
|
11 |
19 |
from wcs.categories import CardDefCategory
|
|
20 |
from wcs.data_sources import NamedDataSource
|
|
21 |
from wcs.qommon.form import PicklableUpload
|
12 |
22 |
from wcs.qommon.http_request import HTTPRequest
|
13 |
23 |
from wcs.roles import Role
|
|
24 |
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef
|
14 |
25 |
|
15 |
26 |
from .utils import sign_uri
|
16 |
27 |
|
... | ... | |
182 |
193 |
)
|
183 |
194 |
assert carddef.data_class().count() == 2
|
184 |
195 |
assert set([x.data['0'] for x in carddef.data_class().select()]) == {'first entry', 'second entry'}
|
|
196 |
|
|
197 |
|
|
198 |
def test_post_invalid_json(pub, local_user):
|
|
199 |
resp = get_app(pub).post(
|
|
200 |
'/api/cards/test/submit', params='not a json payload', content_type='application/json', status=400
|
|
201 |
)
|
|
202 |
assert resp.json['err'] == 1
|
|
203 |
assert resp.json['err_class'] == 'Invalid request'
|
|
204 |
|
|
205 |
|
|
206 |
def test_card_submit(pub, local_user):
|
|
207 |
Role.wipe()
|
|
208 |
role = Role(name='test')
|
|
209 |
role.store()
|
|
210 |
local_user.roles = [role.id]
|
|
211 |
local_user.store()
|
|
212 |
|
|
213 |
CardDef.wipe()
|
|
214 |
carddef = CardDef()
|
|
215 |
carddef.name = 'test'
|
|
216 |
carddef.fields = [fields.StringField(id='0', label='foobar')]
|
|
217 |
carddef.store()
|
|
218 |
|
|
219 |
data_class = carddef.data_class()
|
|
220 |
|
|
221 |
resp = get_app(pub).post_json('/api/cards/test/submit', {'data': {}}, status=403)
|
|
222 |
assert resp.json['err'] == 1
|
|
223 |
assert resp.json['err_desc'] == 'unsigned API call'
|
|
224 |
|
|
225 |
def url():
|
|
226 |
signed_url = sign_url(
|
|
227 |
'http://example.net/api/cards/test/submit'
|
|
228 |
+ '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
|
229 |
'1234',
|
|
230 |
)
|
|
231 |
return signed_url[len('http://example.net') :]
|
|
232 |
|
|
233 |
resp = get_app(pub).post_json(url(), {'data': {}}, status=403)
|
|
234 |
assert resp.json['err'] == 1
|
|
235 |
assert resp.json['err_desc'] == 'cannot create card'
|
|
236 |
|
|
237 |
carddef.backoffice_submission_roles = [role.id]
|
|
238 |
carddef.store()
|
|
239 |
resp = get_app(pub).post_json(url(), {'data': {}})
|
|
240 |
assert resp.json['err'] == 0
|
|
241 |
assert resp.json['data']['url'] == (
|
|
242 |
'http://example.net/backoffice/data/test/%s/' % resp.json['data']['id']
|
|
243 |
)
|
|
244 |
assert resp.json['data']['backoffice_url'] == (
|
|
245 |
'http://example.net/backoffice/data/test/%s/' % resp.json['data']['id']
|
|
246 |
)
|
|
247 |
assert resp.json['data']['api_url'] == ('http://example.net/api/cards/test/%s/' % resp.json['data']['id'])
|
|
248 |
assert data_class.get(resp.json['data']['id']).status == 'wf-recorded'
|
|
249 |
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
|
|
250 |
assert data_class.get(resp.json['data']['id']).tracking_code is None
|
|
251 |
|
|
252 |
local_user2 = get_publisher().user_class()
|
|
253 |
local_user2.name = 'Test'
|
|
254 |
local_user2.email = 'foo@localhost'
|
|
255 |
local_user2.store()
|
|
256 |
resp = get_app(pub).post_json(url(), {'data': {}, 'user': {'NameID': [], 'email': local_user2.email}})
|
|
257 |
assert data_class.get(resp.json['data']['id']).user.email == local_user2.email
|
|
258 |
|
|
259 |
resp = get_app(pub).post(
|
|
260 |
url(), json.dumps({'data': {}}), status=400
|
|
261 |
) # missing Content-Type: application/json header
|
|
262 |
assert resp.json['err_desc'] == 'expected JSON but missing appropriate content-type'
|
|
263 |
|
|
264 |
# check qualified content type are recognized
|
|
265 |
resp = get_app(pub).post(url(), json.dumps({'data': {}}), content_type='application/json; charset=utf-8')
|
|
266 |
assert resp.json['data']['url']
|
|
267 |
|
|
268 |
|
|
269 |
def test_carddef_submit_with_varname(pub, local_user):
|
|
270 |
NamedDataSource.wipe()
|
|
271 |
data_source = NamedDataSource(name='foobar')
|
|
272 |
source = [{'id': '1', 'text': 'foo', 'more': 'XXX'}, {'id': '2', 'text': 'bar', 'more': 'YYY'}]
|
|
273 |
data_source.data_source = {'type': 'formula', 'value': repr(source)}
|
|
274 |
data_source.store()
|
|
275 |
|
|
276 |
data_source = NamedDataSource(name='foobar_jsonp')
|
|
277 |
data_source.data_source = {'type': 'formula', 'value': 'http://example.com/jsonp'}
|
|
278 |
data_source.store()
|
|
279 |
|
|
280 |
Role.wipe()
|
|
281 |
role = Role(name='test')
|
|
282 |
role.store()
|
|
283 |
local_user.roles = [role.id]
|
|
284 |
local_user.store()
|
|
285 |
|
|
286 |
CardDef.wipe()
|
|
287 |
carddef = CardDef()
|
|
288 |
carddef.name = 'test'
|
|
289 |
carddef.fields = [
|
|
290 |
fields.StringField(id='0', label='foobar0', varname='foobar0'),
|
|
291 |
fields.ItemField(id='1', label='foobar1', varname='foobar1', data_source={'type': 'foobar'}),
|
|
292 |
fields.ItemField(id='2', label='foobar2', varname='foobar2', data_source={'type': 'foobar_jsonp'}),
|
|
293 |
fields.DateField(id='3', label='foobar3', varname='date'),
|
|
294 |
fields.FileField(id='4', label='foobar4', varname='file'),
|
|
295 |
fields.MapField(id='5', label='foobar5', varname='map'),
|
|
296 |
fields.StringField(id='6', label='foobar6', varname='foobar6'),
|
|
297 |
]
|
|
298 |
carddef.backoffice_submission_roles = [role.id]
|
|
299 |
carddef.store()
|
|
300 |
data_class = carddef.data_class()
|
|
301 |
|
|
302 |
signed_url = sign_url(
|
|
303 |
'http://example.net/api/cards/test/submit'
|
|
304 |
+ '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
|
305 |
'1234',
|
|
306 |
)
|
|
307 |
url = signed_url[len('http://example.net') :]
|
|
308 |
payload = {
|
|
309 |
'data': {
|
|
310 |
'foobar0': 'xxx',
|
|
311 |
'foobar1': '1',
|
|
312 |
'foobar1_structured': {
|
|
313 |
'id': '1',
|
|
314 |
'text': 'foo',
|
|
315 |
'more': 'XXX',
|
|
316 |
},
|
|
317 |
'foobar2': 'bar',
|
|
318 |
'foobar2_raw': '10',
|
|
319 |
'date': '1970-01-01',
|
|
320 |
'file': {
|
|
321 |
'filename': 'test.txt',
|
|
322 |
'content': force_text(base64.b64encode(b'test')),
|
|
323 |
},
|
|
324 |
'map': {
|
|
325 |
'lat': 1.5,
|
|
326 |
'lon': 2.25,
|
|
327 |
},
|
|
328 |
}
|
|
329 |
}
|
|
330 |
resp = get_app(pub).post_json(url, payload)
|
|
331 |
assert resp.json['err'] == 0
|
|
332 |
assert data_class.get(resp.json['data']['id']).status == 'wf-recorded'
|
|
333 |
assert data_class.get(resp.json['data']['id']).user_id == str(local_user.id)
|
|
334 |
assert data_class.get(resp.json['data']['id']).tracking_code is None
|
|
335 |
assert data_class.get(resp.json['data']['id']).data['0'] == 'xxx'
|
|
336 |
assert data_class.get(resp.json['data']['id']).data['1'] == '1'
|
|
337 |
assert data_class.get(resp.json['data']['id']).data['1_structured'] == source[0]
|
|
338 |
assert data_class.get(resp.json['data']['id']).data['2'] == '10'
|
|
339 |
assert data_class.get(resp.json['data']['id']).data['2_display'] == 'bar'
|
|
340 |
assert data_class.get(resp.json['data']['id']).data['3'] == time.struct_time(
|
|
341 |
(1970, 1, 1, 0, 0, 0, 3, 1, -1)
|
|
342 |
)
|
|
343 |
|
|
344 |
assert data_class.get(resp.json['data']['id']).data['4'].orig_filename == 'test.txt'
|
|
345 |
assert data_class.get(resp.json['data']['id']).data['4'].get_content() == b'test'
|
|
346 |
assert data_class.get(resp.json['data']['id']).data['5'] == '1.5;2.25'
|
|
347 |
# test bijectivity
|
|
348 |
assert (
|
|
349 |
carddef.fields[3].get_json_value(data_class.get(resp.json['data']['id']).data['3'])
|
|
350 |
== payload['data']['date']
|
|
351 |
)
|
|
352 |
for k in payload['data']['file']:
|
|
353 |
data = data_class.get(resp.json['data']['id']).data['4']
|
|
354 |
assert carddef.fields[4].get_json_value(data)[k] == payload['data']['file'][k]
|
|
355 |
assert (
|
|
356 |
carddef.fields[5].get_json_value(data_class.get(resp.json['data']['id']).data['5'])
|
|
357 |
== payload['data']['map']
|
|
358 |
)
|
|
359 |
|
|
360 |
|
|
361 |
def test_carddef_submit_from_wscall(pub, local_user):
|
|
362 |
NamedDataSource.wipe()
|
|
363 |
data_source = NamedDataSource(name='foobar')
|
|
364 |
source = [{'id': '1', 'text': 'foo', 'more': 'XXX'}, {'id': '2', 'text': 'bar', 'more': 'YYY'}]
|
|
365 |
data_source.data_source = {'type': 'formula', 'value': repr(source)}
|
|
366 |
data_source.store()
|
|
367 |
|
|
368 |
data_source = NamedDataSource(name='foobar_jsonp')
|
|
369 |
data_source.data_source = {'type': 'formula', 'value': 'http://example.com/jsonp'}
|
|
370 |
data_source.store()
|
|
371 |
|
|
372 |
Role.wipe()
|
|
373 |
role = Role(name='test')
|
|
374 |
role.store()
|
|
375 |
local_user.roles = [role.id]
|
|
376 |
local_user.store()
|
|
377 |
|
|
378 |
local_user2 = get_publisher().user_class()
|
|
379 |
local_user2.name = 'Jean Darmette 2'
|
|
380 |
local_user2.email = 'jean.darmette2@triffouilis.fr'
|
|
381 |
local_user2.name_identifiers = ['0123456789bis']
|
|
382 |
local_user2.store()
|
|
383 |
|
|
384 |
workflow = Workflow.get_default_workflow()
|
|
385 |
workflow.id = '2'
|
|
386 |
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
|
|
387 |
workflow.backoffice_fields_formdef.fields = [
|
|
388 |
fields.StringField(id='bo1', label='1st backoffice field', type='string', varname='backoffice_blah'),
|
|
389 |
]
|
|
390 |
workflow.store()
|
|
391 |
|
|
392 |
CardDef.wipe()
|
|
393 |
carddef = CardDef()
|
|
394 |
carddef.name = 'test'
|
|
395 |
carddef.fields = [
|
|
396 |
fields.StringField(id='0', label='foobar0', varname='foobar0'),
|
|
397 |
fields.ItemField(id='1', label='foobar1', varname='foobar1', data_source={'type': 'foobar'}),
|
|
398 |
fields.ItemField(id='2', label='foobar2', varname='foobar2', data_source={'type': 'foobar_jsonp'}),
|
|
399 |
fields.DateField(id='3', label='foobar3', varname='date'),
|
|
400 |
fields.FileField(id='4', label='foobar4', varname='file'),
|
|
401 |
fields.MapField(id='5', label='foobar5', varname='map'),
|
|
402 |
fields.StringField(id='6', label='foobar6', varname='foobar6'),
|
|
403 |
]
|
|
404 |
carddef.backoffice_submission_roles = [role.id]
|
|
405 |
carddef.workflow = workflow
|
|
406 |
carddef.store()
|
|
407 |
|
|
408 |
carddata = carddef.data_class()()
|
|
409 |
upload = PicklableUpload('test.txt', 'text/plain', 'ascii')
|
|
410 |
upload.receive([b'test'])
|
|
411 |
carddata.data = {
|
|
412 |
'0': 'xxx',
|
|
413 |
'1': '1',
|
|
414 |
'1_display': '1',
|
|
415 |
'1_structured': {
|
|
416 |
'id': '1',
|
|
417 |
'text': 'foo',
|
|
418 |
'more': 'XXX',
|
|
419 |
},
|
|
420 |
'2': '10',
|
|
421 |
'2_display': 'bar',
|
|
422 |
'3': time.strptime('1970-01-01', '%Y-%m-%d'),
|
|
423 |
'4': upload,
|
|
424 |
'5': '1.5;2.25',
|
|
425 |
'bo1': 'backoffice field',
|
|
426 |
}
|
|
427 |
carddata.just_created()
|
|
428 |
carddata.store()
|
|
429 |
|
|
430 |
def url():
|
|
431 |
signed_url = sign_url(
|
|
432 |
'http://example.net/api/cards/test/submit?orig=coucou&email=%s' % urllib.quote(local_user.email),
|
|
433 |
'1234',
|
|
434 |
)
|
|
435 |
return signed_url[len('http://example.net') :]
|
|
436 |
|
|
437 |
payload = json.loads(json.dumps(carddata.get_json_export_dict(), cls=qommon.misc.JSONEncoder))
|
|
438 |
|
|
439 |
resp = get_app(pub).post_json(url(), payload)
|
|
440 |
assert resp.json['err'] == 0
|
|
441 |
new_carddata = carddef.data_class().get(resp.json['data']['id'])
|
|
442 |
assert new_carddata.data['0'] == carddata.data['0']
|
|
443 |
assert new_carddata.data['1'] == carddata.data['1']
|
|
444 |
assert new_carddata.data['1_display'] == carddata.data['1_display']
|
|
445 |
assert new_carddata.data['1_structured'] == carddata.data['1_structured']
|
|
446 |
assert new_carddata.data['2'] == carddata.data['2']
|
|
447 |
assert new_carddata.data['2_display'] == carddata.data['2_display']
|
|
448 |
assert new_carddata.data['3'] == carddata.data['3']
|
|
449 |
assert new_carddata.data['4'].get_content() == carddata.data['4'].get_content()
|
|
450 |
assert new_carddata.data['5'] == carddata.data['5']
|
|
451 |
assert new_carddata.data['bo1'] == carddata.data['bo1']
|
|
452 |
assert not new_carddata.data.get('6')
|
|
453 |
assert new_carddata.user_id == str(local_user.id)
|
|
454 |
|
|
455 |
# add an extra attribute
|
|
456 |
payload['extra'] = {'foobar6': 'YYY'}
|
|
457 |
resp = get_app(pub).post_json(url(), payload)
|
|
458 |
assert resp.json['err'] == 0
|
|
459 |
new_carddata = carddef.data_class().get(resp.json['data']['id'])
|
|
460 |
assert new_carddata.data['0'] == carddata.data['0']
|
|
461 |
assert new_carddata.data['6'] == 'YYY'
|
|
462 |
|
|
463 |
# add user
|
|
464 |
carddata.user_id = local_user2.id
|
|
465 |
carddata.store()
|
|
466 |
payload = json.loads(json.dumps(carddata.get_json_export_dict(), cls=qommon.misc.JSONEncoder))
|
|
467 |
|
|
468 |
resp = get_app(pub).post_json(url(), payload)
|
|
469 |
assert resp.json['err'] == 0
|
|
470 |
new_carddata = carddef.data_class().get(resp.json['data']['id'])
|
|
471 |
assert str(new_carddata.user_id) == str(local_user2.id)
|
|
472 |
|
|
473 |
# test missing map data
|
|
474 |
del carddata.data['5']
|
|
475 |
payload = json.loads(json.dumps(carddata.get_json_export_dict(), cls=qommon.misc.JSONEncoder))
|
|
476 |
|
|
477 |
resp = get_app(pub).post_json(url(), payload)
|
|
478 |
assert resp.json['err'] == 0
|
|
479 |
new_carddata = carddef.data_class().get(resp.json['data']['id'])
|
|
480 |
assert new_carddata.data.get('5') is None
|
|
481 |
|
|
482 |
|
|
483 |
def test_formdef_submit_structured(pub, local_user):
|
|
484 |
Role.wipe()
|
|
485 |
role = Role(name='test')
|
|
486 |
role.store()
|
|
487 |
local_user.roles = [role.id]
|
|
488 |
local_user.store()
|
|
489 |
|
|
490 |
CardDef.wipe()
|
|
491 |
carddef = CardDef()
|
|
492 |
carddef.name = 'test'
|
|
493 |
carddef.fields = [
|
|
494 |
fields.ItemField(
|
|
495 |
id='0',
|
|
496 |
label='foobar',
|
|
497 |
varname='foobar',
|
|
498 |
data_source={
|
|
499 |
'type': 'json',
|
|
500 |
'value': 'http://datasource.com',
|
|
501 |
},
|
|
502 |
),
|
|
503 |
fields.ItemField(
|
|
504 |
id='1',
|
|
505 |
label='foobar1',
|
|
506 |
varname='foobar1',
|
|
507 |
data_source={
|
|
508 |
'type': 'formula',
|
|
509 |
'value': '[dict(id=i, text=\'label %s\' % i, foo=i) for i in range(10)]',
|
|
510 |
},
|
|
511 |
),
|
|
512 |
]
|
|
513 |
carddef.backoffice_submission_roles = [role.id]
|
|
514 |
carddef.store()
|
|
515 |
data_class = carddef.data_class()
|
|
516 |
|
|
517 |
signed_url = sign_url(
|
|
518 |
'http://example.net/api/cards/test/submit'
|
|
519 |
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email),
|
|
520 |
'1234',
|
|
521 |
)
|
|
522 |
url = signed_url[len('http://example.net') :]
|
|
523 |
|
|
524 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
|
|
525 |
urlopen.side_effect = lambda *args: StringIO(
|
|
526 |
'''\
|
|
527 |
{"data": [{"id": 0, "text": "zéro", "foo": "bar"}, \
|
|
528 |
{"id": 1, "text": "uné", "foo": "bar1"}, \
|
|
529 |
{"id": 2, "text": "deux", "foo": "bar2"}]}'''
|
|
530 |
)
|
|
531 |
resp = get_app(pub).post_json(
|
|
532 |
url,
|
|
533 |
{
|
|
534 |
'data': {
|
|
535 |
'0': '0',
|
|
536 |
"1": '3',
|
|
537 |
}
|
|
538 |
},
|
|
539 |
)
|
|
540 |
|
|
541 |
formdata = data_class.get(resp.json['data']['id'])
|
|
542 |
assert formdata.status == 'wf-recorded'
|
|
543 |
assert formdata.data['0'] == '0'
|
|
544 |
assert formdata.data['0_display'] == 'zéro'
|
|
545 |
assert formdata.data['0_structured'] == {
|
|
546 |
'id': 0,
|
|
547 |
'text': 'zéro',
|
|
548 |
'foo': 'bar',
|
|
549 |
}
|
|
550 |
assert formdata.data['1'] == '3'
|
|
551 |
assert formdata.data['1_display'] == 'label 3'
|
|
552 |
assert formdata.data['1_structured'] == {
|
|
553 |
'id': 3,
|
|
554 |
'text': 'label 3',
|
|
555 |
'foo': 3,
|
|
556 |
}
|