0002-csv_import-allow-adding-roles-35773.patch
src/authentic2/csv_import.py | ||
---|---|---|
30 | 30 |
from django.utils import six |
31 | 31 |
from django.utils.translation import ugettext as _ |
32 | 32 | |
33 |
from django_rbac.utils import get_role_model |
|
34 | ||
33 | 35 |
from authentic2 import app_settings |
34 | 36 |
from authentic2.a2_rbac.utils import get_default_ou |
35 | 37 |
from authentic2.custom_user.models import User |
36 | 38 |
from authentic2.forms.profile import modelform_factory, BaseUserForm |
37 | 39 |
from authentic2.models import Attribute, AttributeValue, UserExternalId |
38 | 40 | |
41 |
Role = get_role_model() |
|
42 | ||
39 | 43 | |
40 | 44 |
class UTF8Recoder(object): |
41 | 45 |
def __init__(self, fd): |
... | ... | |
149 | 153 |
unique = attr.ib(default=False, metadata={'flag': True}) |
150 | 154 |
globally_unique = attr.ib(default=False, metadata={'flag': True}) |
151 | 155 |
verified = attr.ib(default=False, metadata={'flag': True}) |
156 |
delete = attr.ib(default=False, metadata={'flag': True}) |
|
157 |
clear = attr.ib(default=False, metadata={'flag': True}) |
|
152 | 158 | |
153 | 159 |
@property |
154 | 160 |
def flags(self): |
... | ... | |
186 | 192 |
return (self.code, self.line, self.column) == (other.code, other.line, other.column) |
187 | 193 | |
188 | 194 | |
195 |
SOURCE_NAME = '_source_name' |
|
196 |
SOURCE_ID = '_source_id' |
|
197 |
SOURCE_COLUMNS = set([SOURCE_NAME, SOURCE_ID]) |
|
198 |
ROLE_NAME = '_role_name' |
|
199 |
ROLE_SLUG = '_role_slug' |
|
200 |
SPECIAL_COLUMNS = SOURCE_COLUMNS | {ROLE_NAME, ROLE_SLUG} |
|
201 | ||
202 | ||
189 | 203 |
class ImportUserForm(BaseUserForm): |
204 |
locals()[ROLE_NAME] = forms.CharField( |
|
205 |
label=_('Role name'), |
|
206 |
required=False) |
|
207 |
locals()[ROLE_SLUG] = forms.CharField( |
|
208 |
label=_('Role slug'), |
|
209 |
required=False) |
|
210 | ||
190 | 211 |
def clean(self): |
191 | 212 |
super(BaseUserForm, self).clean() |
192 | 213 |
self._validate_unique = False |
193 | 214 | |
194 |
SOURCE_NAME = '_source_name' |
|
195 |
SOURCE_ID = '_source_id' |
|
196 |
SOURCE_COLUMNS = set([SOURCE_NAME, SOURCE_ID]) |
|
197 | ||
198 | 215 | |
199 | 216 |
class ImportUserFormWithExternalId(ImportUserForm): |
200 | 217 |
locals()[SOURCE_NAME] = forms.CharField( |
... | ... | |
216 | 233 |
errors = attr.ib(default=[]) |
217 | 234 |
is_valid = attr.ib(default=True) |
218 | 235 |
action = attr.ib(default=None) |
236 |
user_first_seen = attr.ib(default=True) |
|
219 | 237 | |
220 | 238 |
def __getitem__(self, header): |
221 | 239 |
for cell in self.cells: |
... | ... | |
344 | 362 |
self.add_error( |
345 | 363 |
Error('invalid-external-id-pair', |
346 | 364 |
_('You must have a _source_name and a _source_id column'))) |
365 |
if ROLE_NAME in header_names and ROLE_SLUG in header_names: |
|
366 |
self.add_error( |
|
367 |
Error('invalid-role-column', |
|
368 |
_('Either specify role names or role slugs, not both'))) |
|
347 | 369 | |
348 | 370 |
def parse_header(self, head, column): |
349 | 371 |
splitted = head.split() |
... | ... | |
360 | 382 |
if header.name in SOURCE_COLUMNS: |
361 | 383 |
if header.name == SOURCE_ID: |
362 | 384 |
header.key = True |
363 |
else:
|
|
385 |
elif header.name not in SPECIAL_COLUMNS:
|
|
364 | 386 |
try: |
365 | 387 |
if header.name in ['email', 'first_name', 'last_name', 'username']: |
366 | 388 |
User._meta.get_field(header.name) |
... | ... | |
388 | 410 |
self.headers.append(header) |
389 | 411 | |
390 | 412 |
if (not (header.field or header.attribute) |
391 |
and header.name not in SOURCE_COLUMNS):
|
|
413 |
and header.name not in SPECIAL_COLUMNS):
|
|
392 | 414 |
self.add_error(LineError('unknown-or-missing-attribute', |
393 | 415 |
_('unknown or missing attribute "%s"') % head, |
394 | 416 |
line=1, column=column)) |
... | ... | |
463 | 485 |
def username_is_unique(self): |
464 | 486 |
return app_settings.A2_USERNAME_IS_UNIQUE or self.ou.username_is_unique |
465 | 487 | |
488 |
@property |
|
489 |
def allow_duplicate_key(self): |
|
490 |
return ROLE_NAME in self.headers_by_name or ROLE_SLUG in self.headers_by_name |
|
491 | ||
466 | 492 |
def check_unique_constraints(self, row, unique_map, user=None): |
467 | 493 |
ou_users = User.objects.filter(ou=self.ou) |
468 | 494 |
users = User.objects.all() |
... | ... | |
479 | 505 |
else: |
480 | 506 |
continue |
481 | 507 |
if unique_key in unique_map: |
482 |
errors.append( |
|
483 |
Error('unique-constraint-failed', |
|
484 |
_('Unique constraint on column "%(column)s" failed: ' |
|
485 |
'value already appear on line %(line)d') % { |
|
486 |
'column': header.name, |
|
487 |
'line': unique_map[unique_key]})) |
|
508 |
if user and self.allow_duplicate_key: |
|
509 |
row.user_first_seen = False |
|
510 |
else: |
|
511 |
errors.append( |
|
512 |
Error('unique-constraint-failed', |
|
513 |
_('Unique constraint on column "%(column)s" failed: ' |
|
514 |
'value already appear on line %(line)d') % { |
|
515 |
'column': header.name, |
|
516 |
'line': unique_map[unique_key]})) |
|
488 | 517 |
else: |
489 | 518 |
unique_map[unique_key] = row.line |
490 | 519 | |
... | ... | |
500 | 529 |
atvs = AttributeValue.objects.filter(attribute__name=cell.header.name, content=cell.value) |
501 | 530 |
unique = not qs.filter(attribute_values__in=atvs).exists() |
502 | 531 |
if not unique: |
503 |
errors.append( |
|
504 |
Error('unique-constraint-failed', _('Unique constraint on column "%s" failed') % cell.header.name)) |
|
532 |
if user and self.allow_duplicate_key: |
|
533 |
row.user_first_seen = False |
|
534 |
else: |
|
535 |
errors.append( |
|
536 |
Error('unique-constraint-failed', |
|
537 |
_('Unique constraint on column "%s" failed') % cell.header.name)) |
|
505 | 538 |
row.errors.extend(errors) |
506 | 539 |
row.is_valid = row.is_valid and not bool(errors) |
507 | 540 |
return not bool(errors) |
... | ... | |
510 | 543 |
def do_import_row(self, row, unique_map): |
511 | 544 |
if not row.is_valid: |
512 | 545 |
return False |
546 |
success = True |
|
513 | 547 | |
514 | 548 |
for header in self.headers: |
515 | 549 |
if header.key: |
... | ... | |
553 | 587 | |
554 | 588 |
if not self.check_unique_constraints(row, unique_map, user=user): |
555 | 589 |
return False |
590 |
if not row.user_first_seen: |
|
591 |
cell = next(c for c in row.cells if c.header.name in {ROLE_NAME, ROLE_SLUG}) |
|
592 |
return self.add_role(cell, user) |
|
556 | 593 | |
557 | 594 |
if not user: |
558 | 595 |
user = User(ou=self.ou) |
... | ... | |
597 | 634 |
continue |
598 | 635 |
cell.action = 'nothing' |
599 | 636 | |
637 |
for cell in row.cells: |
|
638 |
if cell.header.field or cell.header.attribute: |
|
639 |
continue |
|
640 |
if cell.header.name in {ROLE_NAME, ROLE_SLUG}: |
|
641 |
success &= self.add_role(cell, user, do_clear=True) |
|
642 | ||
600 | 643 |
setattr(self, row.action + 'd', getattr(self, row.action + 'd') + 1) |
644 |
return success |
|
645 | ||
646 |
def add_role(self, cell, user, do_clear=False): |
|
647 |
try: |
|
648 |
if cell.header.name == ROLE_NAME: |
|
649 |
role = Role.objects.get(name=cell.value, ou=self.ou) |
|
650 |
elif cell.header.name == ROLE_SLUG: |
|
651 |
role = Role.objects.get(slug=cell.value, ou=self.ou) |
|
652 |
except Role.DoesNotExist: |
|
653 |
cell.errors.append( |
|
654 |
Error('role-not-found', |
|
655 |
_('Role "%s" does not exist') % cell.value)) |
|
656 |
return False |
|
657 |
if cell.header.delete: |
|
658 |
user.roles.remove(role) |
|
659 |
elif cell.header.clear: |
|
660 |
if do_clear: |
|
661 |
user.roles.clear() |
|
662 |
user.roles.add(role) |
|
663 |
else: |
|
664 |
user.roles.add(role) |
|
665 |
cell.action = 'updated' |
|
601 | 666 |
return True |
src/authentic2/manager/templates/authentic2/manager/user_imports.html | ||
---|---|---|
166 | 166 |
cannot use another key column. |
167 | 167 |
{% endblocktrans %} |
168 | 168 |
</p> |
169 |
<h4 id="help-roles">{% trans "Role operations" %}</h4> |
|
170 |
<p> |
|
171 |
{% blocktrans trimmed %} |
|
172 |
Adding existing roles to users is supported. Use either |
|
173 |
<var>_role_name</var> or <var>_role_slug</var> special columns to |
|
174 |
specify the names or the slugs that should be added to the user. In |
|
175 |
order to add multiple roles, simply add a new line, identical to the |
|
176 |
first one, except for the value of the role cell. These columns also |
|
177 |
accept special flags, as listed below. |
|
178 |
{% endblocktrans %} |
|
179 |
</p> |
|
180 |
<table class="main left"> |
|
181 |
<thead> |
|
182 |
<tr> |
|
183 |
<th>{% trans "Flag" %}</th> |
|
184 |
<th>{% trans "Meaning" %}</th> |
|
185 |
<th>{% trans "Default value" %}</th> |
|
186 |
</tr> |
|
187 |
</thead> |
|
188 |
<tbody> |
|
189 |
<tr> |
|
190 |
<td>delete</td> |
|
191 |
<td> |
|
192 |
{% blocktrans trimmed %} |
|
193 |
Remove role from user instead of adding it. |
|
194 |
{% endblocktrans %} |
|
195 |
</td> |
|
196 |
<td>{% trans "False" %}</td> |
|
197 |
</tr> |
|
198 |
<tr> |
|
199 |
<td>clear</td> |
|
200 |
<td> |
|
201 |
{% blocktrans trimmed %} |
|
202 |
Clear user roles beforehand, so that they will have no more roles |
|
203 |
than those specified in the import file. |
|
204 |
{% endblocktrans %} |
|
205 |
</td> |
|
206 |
<td>{% trans "False" %}</td> |
|
207 |
</tr> |
|
208 |
</tbody> |
|
209 |
</table> |
|
169 | 210 |
<h4>{% trans "Examples" %}</h4> |
170 | 211 |
<p>{% blocktrans trimmed %}Importing first and last name of users keyed by email{% endblocktrans %}</p> |
171 | 212 |
<blockquote> |
... | ... | |
183 | 224 |
<blockquote> |
184 | 225 |
<pre>_source_name,_source_id,email,"family_reference unique",first_name,last_name |
185 | 226 |
app1,1,john.doe@example.com,1234,John,Doe |
227 |
</pre> |
|
228 |
</blockquote> |
|
229 |
<p>{% blocktrans trimmed %}Importing email, first and last name of users |
|
230 |
while adding roles.{% endblocktrans %}</p> |
|
231 |
<blockquote> |
|
232 |
<pre>email key,first_name,last_name,_role_name |
|
233 |
john.doe@example.com,John,Doe,Role1 |
|
234 |
john.doe@example.com,John,Doe,Role2 |
|
186 | 235 |
</pre> |
187 | 236 |
</blockquote> |
188 | 237 |
</div> |
tests/test_csv_import.py | ||
---|---|---|
21 | 21 | |
22 | 22 |
import io |
23 | 23 | |
24 |
from django_rbac.utils import get_role_model |
|
25 | ||
24 | 26 |
from authentic2.custom_user.models import User |
25 | 27 |
from authentic2.models import Attribute |
26 | 28 |
from authentic2.a2_rbac.utils import get_default_ou |
27 | 29 | |
28 | 30 |
from authentic2.csv_import import CsvImporter, UserCsvImporter, CsvHeader, Error, LineError |
29 | 31 | |
32 |
Role = get_role_model() |
|
33 | ||
30 | 34 |
ENCODINGS = [ |
31 | 35 |
'iso-8859-1', |
32 | 36 |
'iso-8859-15', |
... | ... | |
389 | 393 |
importer = user_csv_importer_factory(content) |
390 | 394 |
assert importer.run(), importer.errors |
391 | 395 |
assert not importer.has_errors |
396 | ||
397 | ||
398 |
def test_user_roles_csv(profile, user_csv_importer_factory): |
|
399 |
role_name = 'test_name' |
|
400 |
role_slug = 'test_slug' |
|
401 |
role = Role.objects.create(name=role_name, slug=role_slug, ou=get_default_ou()) |
|
402 |
role2 = Role.objects.create(name='test2', ou=get_default_ou()) |
|
403 |
base_header = 'email key,first_name,last_name,phone,' |
|
404 |
base_user = 'tnoel@entrouvert.com,Thomas,Noël,1234,' |
|
405 | ||
406 |
content_name_add = '\n'.join((base_header + '_role_name', base_user + role_name)) |
|
407 |
importer = user_csv_importer_factory(content_name_add) |
|
408 |
assert importer.run() |
|
409 |
thomas = User.objects.get(email='tnoel@entrouvert.com') |
|
410 |
assert thomas in role.members.all() |
|
411 | ||
412 |
thomas.roles.add(role2) |
|
413 |
importer = user_csv_importer_factory(content_name_add) |
|
414 |
assert importer.run() |
|
415 |
thomas.refresh_from_db() |
|
416 |
assert thomas in role2.members.all() |
|
417 | ||
418 |
content_name_delete = '\n'.join((base_header + '_role_name delete', base_user + role_name)) |
|
419 |
importer = user_csv_importer_factory(content_name_delete) |
|
420 |
assert importer.run() |
|
421 |
thomas.refresh_from_db() |
|
422 |
assert thomas not in role.members.all() |
|
423 |
assert thomas in role2.members.all() |
|
424 | ||
425 |
content_name_clear = '\n'.join((base_header + '_role_name clear', base_user + role_name)) |
|
426 |
importer = user_csv_importer_factory(content_name_clear) |
|
427 |
assert importer.run() |
|
428 |
thomas.refresh_from_db() |
|
429 |
assert thomas in role.members.all() |
|
430 |
assert thomas not in role2.members.all() |
|
431 | ||
432 |
thomas.roles.remove(role) |
|
433 |
content_name_add_multiple = '\n'.join((base_header + '_role_name', base_user + role_name, |
|
434 |
base_user + 'test2')) |
|
435 |
importer = user_csv_importer_factory(content_name_add_multiple) |
|
436 |
assert importer.run() |
|
437 |
thomas.refresh_from_db() |
|
438 |
assert thomas in role.members.all() |
|
439 |
assert thomas in role2.members.all() |
|
440 | ||
441 |
thomas.roles.remove(role) |
|
442 |
thomas.roles.remove(role2) |
|
443 |
content_name_clear_multiple = '\n'.join((base_header + '_role_name clear', |
|
444 |
base_user + role_name, |
|
445 |
base_user + 'test2')) |
|
446 |
importer = user_csv_importer_factory(content_name_clear_multiple) |
|
447 |
assert importer.run() |
|
448 |
thomas.refresh_from_db() |
|
449 |
assert thomas in role.members.all() |
|
450 |
assert thomas in role2.members.all() |
|
451 | ||
452 |
thomas.roles.remove(role) |
|
453 |
content_slug_add = '\n'.join((base_header + '_role_slug', base_user + role_slug)) |
|
454 |
importer = user_csv_importer_factory(content_slug_add) |
|
455 |
assert importer.run() |
|
456 |
thomas.refresh_from_db() |
|
457 |
assert thomas in role.members.all() |
|
458 | ||
459 |
thomas.roles.remove(role) |
|
460 |
content_only_key = '''email key,_role_name |
|
461 |
tnoel@entrouvert.com,test_name''' |
|
462 |
importer = user_csv_importer_factory(content_slug_add) |
|
463 |
assert importer.run() |
|
464 |
thomas.refresh_from_db() |
|
465 |
assert thomas in role.members.all() |
|
466 | ||
467 |
content_name_error = '\n'.join((base_header + '_role_name', base_user + 'bad_name')) |
|
468 |
importer = user_csv_importer_factory(content_name_error) |
|
469 |
assert importer.run() |
|
470 |
assert importer.has_errors |
|
471 |
assert importer.rows[0].cells[-1].errors[0].code == 'role-not-found' |
|
472 | ||
473 |
content_header_error = '\n'.join((base_header + '_role_name,_role_slug', |
|
474 |
base_user + ','.join((role_name, role_slug)))) |
|
475 |
importer = user_csv_importer_factory(content_header_error) |
|
476 |
assert not importer.run() |
|
477 |
assert importer.has_errors |
|
478 |
assert importer.errors[0].code == 'invalid-role-column' |
|
392 |
- |