0001-csv_import-allow-adding-roles-35773.patch
src/authentic2/csv_import.py | ||
---|---|---|
30 | 30 |
from django.utils import six |
31 | 31 |
from django.utils.translation import ugettext as _ |
32 | 32 | |
33 |
from django_rbac.utils import get_role_model |
|
34 | ||
33 | 35 |
from authentic2 import app_settings |
34 | 36 |
from authentic2.a2_rbac.utils import get_default_ou |
35 | 37 |
from authentic2.custom_user.models import User |
36 | 38 |
from authentic2.forms.profile import modelform_factory, BaseUserForm |
37 | 39 |
from authentic2.models import Attribute, AttributeValue, UserExternalId |
38 | 40 | |
41 |
Role = get_role_model() |
|
42 | ||
39 | 43 | |
40 | 44 |
class UTF8Recoder(object): |
41 | 45 |
def __init__(self, fd): |
... | ... | |
149 | 153 |
unique = attr.ib(default=False, metadata={'flag': True}) |
150 | 154 |
globally_unique = attr.ib(default=False, metadata={'flag': True}) |
151 | 155 |
verified = attr.ib(default=False, metadata={'flag': True}) |
156 |
delete = attr.ib(default=False, metadata={'flag': True}) |
|
157 |
clear = attr.ib(default=False, metadata={'flag': True}) |
|
152 | 158 | |
153 | 159 |
@property |
154 | 160 |
def flags(self): |
... | ... | |
186 | 192 |
return (self.code, self.line, self.column) == (other.code, other.line, other.column) |
187 | 193 | |
188 | 194 | |
195 |
SOURCE_NAME = '_source_name' |
|
196 |
SOURCE_ID = '_source_id' |
|
197 |
SOURCE_COLUMNS = set([SOURCE_NAME, SOURCE_ID]) |
|
198 |
ROLE_NAME = '_role_name' |
|
199 |
ROLE_SLUG = '_role_slug' |
|
200 |
SPECIAL_COLUMNS = SOURCE_COLUMNS | {ROLE_NAME, ROLE_SLUG} |
|
201 | ||
202 | ||
189 | 203 |
class ImportUserForm(BaseUserForm): |
204 |
locals()[ROLE_NAME] = forms.CharField( |
|
205 |
label=_('Role name'), |
|
206 |
required=False) |
|
207 |
locals()[ROLE_SLUG] = forms.CharField( |
|
208 |
label=_('Role slug'), |
|
209 |
required=False) |
|
210 | ||
190 | 211 |
def clean(self): |
191 | 212 |
super(BaseUserForm, self).clean() |
192 | 213 |
self._validate_unique = False |
193 | 214 | |
194 |
SOURCE_NAME = '_source_name' |
|
195 |
SOURCE_ID = '_source_id' |
|
196 |
SOURCE_COLUMNS = set([SOURCE_NAME, SOURCE_ID]) |
|
197 | ||
198 | 215 | |
199 | 216 |
class ImportUserFormWithExternalId(ImportUserForm): |
200 | 217 |
locals()[SOURCE_NAME] = forms.CharField( |
... | ... | |
216 | 233 |
errors = attr.ib(default=[]) |
217 | 234 |
is_valid = attr.ib(default=True) |
218 | 235 |
action = attr.ib(default=None) |
236 |
process_role = attr.ib(default=False) |
|
219 | 237 | |
220 | 238 |
def __getitem__(self, header): |
221 | 239 |
for cell in self.cells: |
... | ... | |
343 | 361 |
self.add_error( |
344 | 362 |
Error('invalid-external-id-pair', |
345 | 363 |
_('You must have a _source_name and a _source_id column'))) |
364 |
if ROLE_NAME in header_names and ROLE_SLUG in header_names: |
|
365 |
self.add_error( |
|
366 |
Error('invalid-role-column', |
|
367 |
_('Either specify role names or role slugs, not both'))) |
|
346 | 368 | |
347 | 369 |
def parse_header(self, head, column): |
348 | 370 |
splitted = head.split() |
... | ... | |
359 | 381 |
if header.name in SOURCE_COLUMNS: |
360 | 382 |
if header.name == SOURCE_ID: |
361 | 383 |
header.key = True |
362 |
else:
|
|
384 |
elif header.name not in SPECIAL_COLUMNS:
|
|
363 | 385 |
try: |
364 | 386 |
if header.name in ['email', 'first_name', 'last_name', 'username']: |
365 | 387 |
User._meta.get_field(header.name) |
... | ... | |
387 | 409 |
self.headers.append(header) |
388 | 410 | |
389 | 411 |
if (not (header.field or header.attribute) |
390 |
and header.name not in SOURCE_COLUMNS):
|
|
412 |
and header.name not in SPECIAL_COLUMNS):
|
|
391 | 413 |
self.add_error(LineError('unknown-or-missing-attribute', |
392 | 414 |
_('unknown or missing attribute "%s"') % head, |
393 | 415 |
line=1, column=column)) |
... | ... | |
478 | 500 |
else: |
479 | 501 |
continue |
480 | 502 |
if unique_key in unique_map: |
481 |
errors.append( |
|
482 |
Error('unique-constraint-failed', |
|
483 |
_('Unique constraint on column "%(column)s" failed: ' |
|
484 |
'value already appear on line %(line)d') % { |
|
485 |
'column': header.name, |
|
486 |
'line': unique_map[unique_key]})) |
|
503 |
if user and (ROLE_NAME in self.headers_by_name or ROLE_SLUG in self.headers_by_name): |
|
504 |
row.process_role = True |
|
505 |
else: |
|
506 |
errors.append( |
|
507 |
Error('unique-constraint-failed', |
|
508 |
_('Unique constraint on column "%(column)s" failed: ' |
|
509 |
'value already appear on line %(line)d') % { |
|
510 |
'column': header.name, |
|
511 |
'line': unique_map[unique_key]})) |
|
487 | 512 |
else: |
488 | 513 |
unique_map[unique_key] = row.line |
489 | 514 | |
... | ... | |
499 | 524 |
atvs = AttributeValue.objects.filter(attribute__name=cell.header.name, content=cell.value) |
500 | 525 |
unique = not qs.filter(attribute_values__in=atvs).exists() |
501 | 526 |
if not unique: |
502 |
errors.append( |
|
503 |
Error('unique-constraint-failed', _('Unique constraint on column "%s" failed') % cell.header.name)) |
|
527 |
if user and (ROLE_NAME in self.headers_by_name or ROLE_SLUG in self.headers_by_name): |
|
528 |
row.process_role = True |
|
529 |
else: |
|
530 |
errors.append( |
|
531 |
Error('unique-constraint-failed', |
|
532 |
_('Unique constraint on column "%s" failed') % cell.header.name)) |
|
504 | 533 |
row.errors.extend(errors) |
505 | 534 |
row.is_valid = row.is_valid and not bool(errors) |
506 | 535 |
return not bool(errors) |
... | ... | |
551 | 580 |
user = users[0] |
552 | 581 | |
553 | 582 |
if not self.check_unique_constraints(row, unique_map, user=user): |
583 |
if row.process_role: |
|
584 |
self.add_role(row, user) |
|
554 | 585 |
return False |
555 | 586 | |
556 | 587 |
if not user: |
... | ... | |
596 | 627 |
continue |
597 | 628 |
cell.action = 'nothing' |
598 | 629 | |
630 |
self.add_role(row, user) |
|
631 | ||
599 | 632 |
setattr(self, row.action + 'd', getattr(self, row.action + 'd') + 1) |
600 | 633 |
return True |
634 | ||
635 |
def add_role(self, row, user): |
|
636 |
for cell in row.cells: |
|
637 |
if cell.header.field or cell.header.attribute: |
|
638 |
continue |
|
639 |
if cell.header.name in {ROLE_NAME, ROLE_SLUG}: |
|
640 |
try: |
|
641 |
if cell.header.name == ROLE_NAME: |
|
642 |
role = Role.objects.get(name=cell.value, ou=self.ou) |
|
643 |
elif cell.header.name == ROLE_SLUG: |
|
644 |
role = Role.objects.get(slug=cell.value, ou=self.ou) |
|
645 |
except Role.DoesNotExist: |
|
646 |
row.errors.append( |
|
647 |
Error('role-not-found', |
|
648 |
_('Role "%s" does not exist') % cell.value)) |
|
649 |
else: |
|
650 |
if cell.header.delete: |
|
651 |
user.roles.remove(role) |
|
652 |
elif cell.header.clear: |
|
653 |
user.roles.clear() |
|
654 |
user.roles.add(role) |
|
655 |
else: |
|
656 |
user.roles.add(role) |
|
657 |
continue |
tests/test_csv_import.py | ||
---|---|---|
21 | 21 | |
22 | 22 |
import io |
23 | 23 | |
24 |
from django_rbac.utils import get_role_model |
|
25 | ||
24 | 26 |
from authentic2.custom_user.models import User |
25 | 27 |
from authentic2.models import Attribute |
26 | 28 |
from authentic2.a2_rbac.utils import get_default_ou |
27 | 29 | |
28 | 30 |
from authentic2.csv_import import CsvImporter, UserCsvImporter, CsvHeader, Error, LineError |
29 | 31 | |
32 |
Role = get_role_model() |
|
33 | ||
30 | 34 |
ENCODINGS = [ |
31 | 35 |
'iso-8859-1', |
32 | 36 |
'iso-8859-15', |
... | ... | |
389 | 393 |
importer = user_csv_importer_factory(content) |
390 | 394 |
assert importer.run(), importer.errors |
391 | 395 |
assert not importer.has_errors |
396 | ||
397 | ||
398 |
def test_user_roles_csv(profile, user_csv_importer_factory): |
|
399 |
assert User.objects.count() == 0 |
|
400 |
role_name = 'test_name' |
|
401 |
role_slug = 'test_slug' |
|
402 |
role = Role.objects.create(name=role_name, slug=role_slug, ou=get_default_ou()) |
|
403 |
role2 = Role.objects.create(name='test2', ou=get_default_ou()) |
|
404 |
base_header = 'email key,first_name,last_name,phone,' |
|
405 |
base_user = 'tnoel@entrouvert.com,Thomas,Noël,1234,' |
|
406 | ||
407 |
content_name_add = '\n'.join((base_header + '_role_name', base_user + role_name)) |
|
408 |
importer = user_csv_importer_factory(content_name_add) |
|
409 |
assert importer.run() |
|
410 |
thomas = User.objects.get(email='tnoel@entrouvert.com') |
|
411 |
assert thomas in role.members.all() |
|
412 | ||
413 |
thomas.roles.add(role2) |
|
414 |
importer = user_csv_importer_factory(content_name_add) |
|
415 |
assert importer.run() |
|
416 |
thomas.refresh_from_db() |
|
417 |
assert thomas in role2.members.all() |
|
418 | ||
419 |
content_name_delete = '\n'.join((base_header + '_role_name delete', base_user + role_name)) |
|
420 |
importer = user_csv_importer_factory(content_name_delete) |
|
421 |
assert importer.run() |
|
422 |
thomas.refresh_from_db() |
|
423 |
assert thomas not in role.members.all() |
|
424 |
assert thomas in role2.members.all() |
|
425 | ||
426 |
content_name_clear = '\n'.join((base_header + '_role_name clear', base_user + role_name)) |
|
427 |
importer = user_csv_importer_factory(content_name_clear) |
|
428 |
assert importer.run() |
|
429 |
thomas.refresh_from_db() |
|
430 |
assert thomas in role.members.all() |
|
431 |
assert thomas not in role2.members.all() |
|
432 | ||
433 |
thomas.roles.remove(role) |
|
434 |
content_name_add_multiple = '\n'.join((base_header + '_role_name', base_user + role_name, |
|
435 |
base_user + 'test2')) |
|
436 |
importer = user_csv_importer_factory(content_name_add_multiple) |
|
437 |
assert importer.run() |
|
438 |
thomas.refresh_from_db() |
|
439 |
assert thomas in role.members.all() |
|
440 |
assert thomas in role2.members.all() |
|
441 | ||
442 |
thomas.roles.remove(role) |
|
443 |
content_slug_add = '\n'.join((base_header + '_role_slug', base_user + role_slug)) |
|
444 |
importer = user_csv_importer_factory(content_slug_add) |
|
445 |
assert importer.run() |
|
446 |
thomas.refresh_from_db() |
|
447 |
assert thomas in role.members.all() |
|
448 | ||
449 |
thomas.roles.remove(role) |
|
450 |
content_only_key = '''email key,_role_name |
|
451 |
tnoel@entrouvert.com,test_name''' |
|
452 |
importer = user_csv_importer_factory(content_slug_add) |
|
453 |
assert importer.run() |
|
454 |
thomas.refresh_from_db() |
|
455 |
assert thomas in role.members.all() |
|
456 | ||
457 |
content_name_error = '\n'.join((base_header + '_role_name', base_user + 'bad_name')) |
|
458 |
importer = user_csv_importer_factory(content_name_error) |
|
459 |
assert importer.run() |
|
460 |
assert importer.has_errors |
|
461 |
assert importer.rows[-1].errors[0].code == 'role-not-found' |
|
462 | ||
463 |
content_header_error = '\n'.join((base_header + '_role_name,_role_slug', |
|
464 |
base_user + ','.join((role_name, role_slug)))) |
|
465 |
importer = user_csv_importer_factory(content_header_error) |
|
466 |
assert not importer.run() |
|
467 |
assert importer.has_errors |
|
468 |
assert importer.errors[0].code == 'invalid-role-column' |
|
392 |
- |