0001-cards-csv-import-user-support-48776.patch
tests/backoffice_pages/test_carddata.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | 2 |
import datetime |
3 | 3 |
import os |
4 |
import uuid |
|
4 | 5 | |
5 | 6 |
import pytest |
6 | 7 |
from webtest import Upload |
... | ... | |
377 | 378 |
assert card2.data['5'].tm_mday == 3 |
378 | 379 | |
379 | 380 | |
381 |
def test_backoffice_cards_import_data_csv_user_support(pub, studio): |
|
382 |
user = create_user(pub) |
|
383 |
user.name_identifiers = [str(uuid.uuid4())] |
|
384 |
user.store() |
|
385 | ||
386 |
CardDef.wipe() |
|
387 |
carddef = CardDef() |
|
388 |
carddef.name = 'test' |
|
389 |
carddef.fields = [ |
|
390 |
fields.ItemField(id='1', label='List', |
|
391 |
items=['item1', 'item2']), |
|
392 |
] |
|
393 |
carddef.user_support = 'optional' |
|
394 |
carddef.workflow_roles = {'_editor': user.roles[0]} |
|
395 |
carddef.backoffice_submission_roles = user.roles |
|
396 |
carddef.store() |
|
397 |
carddef.data_class().wipe() |
|
398 | ||
399 |
app = login(get_app(pub)) |
|
400 |
sample_resp = app.get('/backoffice/data/test/data-sample-csv') |
|
401 |
assert sample_resp.text == 'User (email or UUID),List\r\nvalue,value\r\n' |
|
402 |
data = [ |
|
403 |
b'User,List', |
|
404 |
b'%s,item1' % user.email.encode('utf-8'), |
|
405 |
b'%s,item2' % user.nameid.encode('utf-8'), |
|
406 |
b',item1', |
|
407 |
b'foobar,item2', |
|
408 |
b'foobar@mail.com,item2', |
|
409 |
] |
|
410 |
resp = app.get('/backoffice/data/test/import-csv') |
|
411 |
resp.forms[0]['file'] = Upload('test.csv', b'\n'.join(data), |
|
412 |
'text/csv') |
|
413 |
resp = resp.forms[0].submit().follow() |
|
414 |
assert carddef.data_class().count() == 5 |
|
415 |
cards = carddef.data_class().select(order_by='id') |
|
416 |
#assert cards[0].user_id == user.id |
|
417 |
#assert cards[1].user_id == user.id |
|
418 |
#assert cards[2].user_id is None |
|
419 |
#assert cards[3].user_id is None |
|
420 |
#assert cards[4].user_id is None |
|
421 | ||
422 |
# if no user support, user columns is ignored in import |
|
423 |
carddef.user_support = None |
|
424 |
carddef.store() |
|
425 |
carddef.data_class().wipe() |
|
426 | ||
427 |
data = [ |
|
428 |
b'User', |
|
429 |
user.email.encode('utf-8'), |
|
430 |
user.nameid.encode('utf-8'), |
|
431 |
b'foobar', |
|
432 |
b'foobar@mail.com', |
|
433 |
] |
|
434 |
resp = app.get('/backoffice/data/test/import-csv') |
|
435 |
resp.forms[0]['file'] = Upload('test.csv', b'\n'.join(data), |
|
436 |
'text/csv') |
|
437 |
resp = resp.forms[0].submit().follow() |
|
438 |
assert carddef.data_class().count() == 4 |
|
439 |
cards = carddef.data_class().select(order_by='id') |
|
440 |
assert [c.user_id for c in cards] == [None, None, None, None] |
|
441 | ||
442 | ||
380 | 443 |
def test_backoffice_cards_import_data_csv_invalid_columns(pub): |
381 | 444 |
user = create_user(pub) |
382 | 445 |
wcs/backoffice/data_management.py | ||
---|---|---|
140 | 140 |
return r |
141 | 141 | |
142 | 142 |
def get_import_csv_fields(self): |
143 |
class UserField: |
|
144 |
key = 'user' |
|
145 |
id = '_user' |
|
146 |
label = _('User (email or UUID)') |
|
147 |
store_display_value = None |
|
148 |
store_structured_value = None |
|
149 | ||
150 |
def convert_value_from_str(self, x): |
|
151 |
return x |
|
152 | ||
143 | 153 |
# skip non-data fields |
144 |
return [x for x in self.formdef.get_all_fields() if isinstance(x, fields.WidgetField)] |
|
154 |
csv_fields = [x for x in self.formdef.get_all_fields() if isinstance(x, fields.WidgetField)] |
|
155 |
if self.formdef.user_support == 'optional': |
|
156 |
return [UserField()] + csv_fields |
|
157 |
return csv_fields |
|
145 | 158 | |
146 | 159 |
def data_sample_csv(self): |
147 | 160 |
carddef_fields = self.get_import_csv_fields() |
... | ... | |
277 | 290 |
error_message += ' ' + _('(line numbers %s and more)') % ', '.join(incomplete_lines[:5]) |
278 | 291 |
raise ValueError(error_message) |
279 | 292 | |
293 |
def user_lookup(user_value): |
|
294 |
if self.formdef.user_support != 'optional': |
|
295 |
return |
|
296 |
if not user_value: |
|
297 |
return |
|
298 |
users = [] |
|
299 |
if '@' in user_value: |
|
300 |
users = get_publisher().user_class.get_users_with_email(user_value) |
|
301 |
else: |
|
302 |
users = get_publisher().user_class.get_users_with_name_identifier(user_value) |
|
303 |
if not users: |
|
304 |
return |
|
305 |
return users[0] |
|
306 | ||
280 | 307 |
class ImportAction: |
281 | 308 |
def __init__(self, data_class, lines): |
282 | 309 |
self.data_class = data_class |
... | ... | |
285 | 312 |
def execute(self, job=None): |
286 | 313 |
for item in self.lines: |
287 | 314 |
data_instance = self.data_class() |
315 |
user_value = item.pop('_user', None) |
|
316 |
data_instance.user = user_lookup(user_value) |
|
288 | 317 |
data_instance.data = item |
289 | 318 |
data_instance.just_created() |
290 | 319 |
data_instance.store() |
291 |
- |