Projet

Général

Profil

0001-csv_import-allow-adding-roles-35773.patch

Valentin Deniaud, 09 octobre 2019 16:28

Télécharger (11,4 ko)

Voir les différences:

Subject: [PATCH] csv_import: allow adding roles (#35773)

 src/authentic2/csv_import.py | 85 ++++++++++++++++++++++++++++++------
 tests/test_csv_import.py     | 77 ++++++++++++++++++++++++++++++++
 2 files changed, 148 insertions(+), 14 deletions(-)
src/authentic2/csv_import.py
30 30
from django.utils import six
31 31
from django.utils.translation import ugettext as _
32 32

  
33
from django_rbac.utils import get_role_model
34

  
33 35
from authentic2 import app_settings
34 36
from authentic2.a2_rbac.utils import get_default_ou
35 37
from authentic2.custom_user.models import User
36 38
from authentic2.forms.profile import modelform_factory, BaseUserForm
37 39
from authentic2.models import Attribute, AttributeValue, UserExternalId
38 40

  
41
Role = get_role_model()
42

  
39 43

  
40 44
class UTF8Recoder(object):
41 45
    def __init__(self, fd):
......
149 153
    unique = attr.ib(default=False, metadata={'flag': True})
150 154
    globally_unique = attr.ib(default=False, metadata={'flag': True})
151 155
    verified = attr.ib(default=False, metadata={'flag': True})
156
    delete = attr.ib(default=False, metadata={'flag': True})
157
    clear = attr.ib(default=False, metadata={'flag': True})
152 158

  
153 159
    @property
154 160
    def flags(self):
......
186 192
        return (self.code, self.line, self.column) == (other.code, other.line, other.column)
187 193

  
188 194

  
195
SOURCE_NAME = '_source_name'
196
SOURCE_ID = '_source_id'
197
SOURCE_COLUMNS = set([SOURCE_NAME, SOURCE_ID])
198
ROLE_NAME = '_role_name'
199
ROLE_SLUG = '_role_slug'
200
SPECIAL_COLUMNS = SOURCE_COLUMNS | {ROLE_NAME, ROLE_SLUG}
201

  
202

  
189 203
class ImportUserForm(BaseUserForm):
204
    locals()[ROLE_NAME] = forms.CharField(
205
        label=_('Role name'),
206
        required=False)
207
    locals()[ROLE_SLUG] = forms.CharField(
208
        label=_('Role slug'),
209
        required=False)
210

  
190 211
    def clean(self):
191 212
        super(BaseUserForm, self).clean()
192 213
        self._validate_unique = False
193 214

  
194
SOURCE_NAME = '_source_name'
195
SOURCE_ID = '_source_id'
196
SOURCE_COLUMNS = set([SOURCE_NAME, SOURCE_ID])
197

  
198 215

  
199 216
class ImportUserFormWithExternalId(ImportUserForm):
200 217
    locals()[SOURCE_NAME] = forms.CharField(
......
216 233
    errors = attr.ib(default=[])
217 234
    is_valid = attr.ib(default=True)
218 235
    action = attr.ib(default=None)
236
    process_role = attr.ib(default=False)
219 237

  
220 238
    def __getitem__(self, header):
221 239
        for cell in self.cells:
......
343 361
            self.add_error(
344 362
                Error('invalid-external-id-pair',
345 363
                      _('You must have a _source_name and a _source_id column')))
364
        if ROLE_NAME in header_names and ROLE_SLUG in header_names:
365
            self.add_error(
366
                Error('invalid-role-column',
367
                      _('Either specify role names or role slugs, not both')))
346 368

  
347 369
    def parse_header(self, head, column):
348 370
        splitted = head.split()
......
359 381
            if header.name in SOURCE_COLUMNS:
360 382
                if header.name == SOURCE_ID:
361 383
                    header.key = True
362
            else:
384
            elif header.name not in SPECIAL_COLUMNS:
363 385
                try:
364 386
                    if header.name in ['email', 'first_name', 'last_name', 'username']:
365 387
                        User._meta.get_field(header.name)
......
387 409
        self.headers.append(header)
388 410

  
389 411
        if (not (header.field or header.attribute)
390
                and header.name not in SOURCE_COLUMNS):
412
                and header.name not in SPECIAL_COLUMNS):
391 413
            self.add_error(LineError('unknown-or-missing-attribute',
392 414
                                     _('unknown or missing attribute "%s"') % head,
393 415
                                     line=1, column=column))
......
478 500
            else:
479 501
                continue
480 502
            if unique_key in unique_map:
481
                errors.append(
482
                    Error('unique-constraint-failed',
483
                          _('Unique constraint on column "%(column)s" failed: '
484
                            'value already appear on line %(line)d') % {
485
                                'column': header.name,
486
                                'line': unique_map[unique_key]}))
503
                if user and (ROLE_NAME in self.headers_by_name or ROLE_SLUG in self.headers_by_name):
504
                    row.process_role = True
505
                else:
506
                    errors.append(
507
                        Error('unique-constraint-failed',
508
                              _('Unique constraint on column "%(column)s" failed: '
509
                                'value already appear on line %(line)d') % {
510
                                    'column': header.name,
511
                                    'line': unique_map[unique_key]}))
487 512
            else:
488 513
                unique_map[unique_key] = row.line
489 514

  
......
499 524
                atvs = AttributeValue.objects.filter(attribute__name=cell.header.name, content=cell.value)
500 525
                unique = not qs.filter(attribute_values__in=atvs).exists()
501 526
            if not unique:
502
                errors.append(
503
                    Error('unique-constraint-failed', _('Unique constraint on column "%s" failed') % cell.header.name))
527
                if user and (ROLE_NAME in self.headers_by_name or ROLE_SLUG in self.headers_by_name):
528
                    row.process_role = True
529
                else:
530
                    errors.append(
531
                        Error('unique-constraint-failed',
532
                              _('Unique constraint on column "%s" failed') % cell.header.name))
504 533
        row.errors.extend(errors)
505 534
        row.is_valid = row.is_valid and not bool(errors)
506 535
        return not bool(errors)
......
551 580
            user = users[0]
552 581

  
553 582
        if not self.check_unique_constraints(row, unique_map, user=user):
583
            if row.process_role:
584
                self.add_role(row, user)
554 585
            return False
555 586

  
556 587
        if not user:
......
596 627
                    continue
597 628
            cell.action = 'nothing'
598 629

  
630
        self.add_role(row, user)
631

  
599 632
        setattr(self, row.action + 'd', getattr(self, row.action + 'd') + 1)
600 633
        return True
634

  
635
    def add_role(self, row, user):
636
        for cell in row.cells:
637
            if cell.header.field or cell.header.attribute:
638
                continue
639
            if cell.header.name in {ROLE_NAME, ROLE_SLUG}:
640
                try:
641
                    if cell.header.name == ROLE_NAME:
642
                        role = Role.objects.get(name=cell.value, ou=self.ou)
643
                    elif cell.header.name == ROLE_SLUG:
644
                        role = Role.objects.get(slug=cell.value, ou=self.ou)
645
                except Role.DoesNotExist:
646
                    row.errors.append(
647
                        Error('role-not-found',
648
                              _('Role "%s" does not exist') % cell.value))
649
                else:
650
                    if cell.header.delete:
651
                        user.roles.remove(role)
652
                    elif cell.header.clear:
653
                        user.roles.clear()
654
                        user.roles.add(role)
655
                    else:
656
                        user.roles.add(role)
657
                    continue
tests/test_csv_import.py
21 21

  
22 22
import io
23 23

  
24
from django_rbac.utils import get_role_model
25

  
24 26
from authentic2.custom_user.models import User
25 27
from authentic2.models import Attribute
26 28
from authentic2.a2_rbac.utils import get_default_ou
27 29

  
28 30
from authentic2.csv_import import CsvImporter, UserCsvImporter, CsvHeader, Error, LineError
29 31

  
32
Role = get_role_model()
33

  
30 34
ENCODINGS = [
31 35
    'iso-8859-1',
32 36
    'iso-8859-15',
......
389 393
    importer = user_csv_importer_factory(content)
390 394
    assert importer.run(), importer.errors
391 395
    assert not importer.has_errors
396

  
397

  
398
def test_user_roles_csv(profile, user_csv_importer_factory):
399
    assert User.objects.count() == 0
400
    role_name = 'test_name'
401
    role_slug = 'test_slug'
402
    role = Role.objects.create(name=role_name, slug=role_slug, ou=get_default_ou())
403
    role2 = Role.objects.create(name='test2', ou=get_default_ou())
404
    base_header = 'email key,first_name,last_name,phone,'
405
    base_user = 'tnoel@entrouvert.com,Thomas,Noël,1234,'
406

  
407
    content_name_add = '\n'.join((base_header + '_role_name', base_user + role_name))
408
    importer = user_csv_importer_factory(content_name_add)
409
    assert importer.run()
410
    thomas = User.objects.get(email='tnoel@entrouvert.com')
411
    assert thomas in role.members.all()
412

  
413
    thomas.roles.add(role2)
414
    importer = user_csv_importer_factory(content_name_add)
415
    assert importer.run()
416
    thomas.refresh_from_db()
417
    assert thomas in role2.members.all()
418

  
419
    content_name_delete = '\n'.join((base_header + '_role_name delete', base_user + role_name))
420
    importer = user_csv_importer_factory(content_name_delete)
421
    assert importer.run()
422
    thomas.refresh_from_db()
423
    assert thomas not in role.members.all()
424
    assert thomas in role2.members.all()
425

  
426
    content_name_clear = '\n'.join((base_header + '_role_name clear', base_user + role_name))
427
    importer = user_csv_importer_factory(content_name_clear)
428
    assert importer.run()
429
    thomas.refresh_from_db()
430
    assert thomas in role.members.all()
431
    assert thomas not in role2.members.all()
432

  
433
    thomas.roles.remove(role)
434
    content_name_add_multiple = '\n'.join((base_header + '_role_name', base_user + role_name,
435
                                           base_user + 'test2'))
436
    importer = user_csv_importer_factory(content_name_add_multiple)
437
    assert importer.run()
438
    thomas.refresh_from_db()
439
    assert thomas in role.members.all()
440
    assert thomas in role2.members.all()
441

  
442
    thomas.roles.remove(role)
443
    content_slug_add = '\n'.join((base_header + '_role_slug', base_user + role_slug))
444
    importer = user_csv_importer_factory(content_slug_add)
445
    assert importer.run()
446
    thomas.refresh_from_db()
447
    assert thomas in role.members.all()
448

  
449
    thomas.roles.remove(role)
450
    content_only_key = '''email key,_role_name
451
tnoel@entrouvert.com,test_name'''
452
    importer = user_csv_importer_factory(content_slug_add)
453
    assert importer.run()
454
    thomas.refresh_from_db()
455
    assert thomas in role.members.all()
456

  
457
    content_name_error = '\n'.join((base_header + '_role_name', base_user + 'bad_name'))
458
    importer = user_csv_importer_factory(content_name_error)
459
    assert importer.run()
460
    assert importer.has_errors
461
    assert importer.rows[-1].errors[0].code == 'role-not-found'
462

  
463
    content_header_error = '\n'.join((base_header + '_role_name,_role_slug',
464
                                      base_user + ','.join((role_name, role_slug))))
465
    importer = user_csv_importer_factory(content_header_error)
466
    assert not importer.run()
467
    assert importer.has_errors
468
    assert importer.errors[0].code == 'invalid-role-column'
392
-