Projet

Général

Profil

0002-csv_import-allow-adding-roles-35773.patch

Valentin Deniaud, 21 octobre 2019 17:36

Télécharger (15,1 ko)

Voir les différences:

Subject: [PATCH 2/3] csv_import: allow adding roles (#35773)

 src/authentic2/csv_import.py                  | 93 ++++++++++++++++---
 .../authentic2/manager/user_imports.html      | 49 ++++++++++
 tests/test_csv_import.py                      | 87 +++++++++++++++++
 3 files changed, 215 insertions(+), 14 deletions(-)
src/authentic2/csv_import.py
30 30
from django.utils import six
31 31
from django.utils.translation import ugettext as _
32 32

  
33
from django_rbac.utils import get_role_model
34

  
33 35
from authentic2 import app_settings
34 36
from authentic2.a2_rbac.utils import get_default_ou
35 37
from authentic2.custom_user.models import User
36 38
from authentic2.forms.profile import modelform_factory, BaseUserForm
37 39
from authentic2.models import Attribute, AttributeValue, UserExternalId
38 40

  
41
Role = get_role_model()
42

  
39 43

  
40 44
class UTF8Recoder(object):
41 45
    def __init__(self, fd):
......
149 153
    unique = attr.ib(default=False, metadata={'flag': True})
150 154
    globally_unique = attr.ib(default=False, metadata={'flag': True})
151 155
    verified = attr.ib(default=False, metadata={'flag': True})
156
    delete = attr.ib(default=False, metadata={'flag': True})
157
    clear = attr.ib(default=False, metadata={'flag': True})
152 158

  
153 159
    @property
154 160
    def flags(self):
......
186 192
        return (self.code, self.line, self.column) == (other.code, other.line, other.column)
187 193

  
188 194

  
195
SOURCE_NAME = '_source_name'
196
SOURCE_ID = '_source_id'
197
SOURCE_COLUMNS = set([SOURCE_NAME, SOURCE_ID])
198
ROLE_NAME = '_role_name'
199
ROLE_SLUG = '_role_slug'
200
SPECIAL_COLUMNS = SOURCE_COLUMNS | {ROLE_NAME, ROLE_SLUG}
201

  
202

  
189 203
class ImportUserForm(BaseUserForm):
204
    locals()[ROLE_NAME] = forms.CharField(
205
        label=_('Role name'),
206
        required=False)
207
    locals()[ROLE_SLUG] = forms.CharField(
208
        label=_('Role slug'),
209
        required=False)
210

  
190 211
    def clean(self):
191 212
        super(BaseUserForm, self).clean()
192 213
        self._validate_unique = False
193 214

  
194
SOURCE_NAME = '_source_name'
195
SOURCE_ID = '_source_id'
196
SOURCE_COLUMNS = set([SOURCE_NAME, SOURCE_ID])
197

  
198 215

  
199 216
class ImportUserFormWithExternalId(ImportUserForm):
200 217
    locals()[SOURCE_NAME] = forms.CharField(
......
216 233
    errors = attr.ib(default=[])
217 234
    is_valid = attr.ib(default=True)
218 235
    action = attr.ib(default=None)
236
    user_first_seen = attr.ib(default=True)
219 237

  
220 238
    def __getitem__(self, header):
221 239
        for cell in self.cells:
......
344 362
            self.add_error(
345 363
                Error('invalid-external-id-pair',
346 364
                      _('You must have a _source_name and a _source_id column')))
365
        if ROLE_NAME in header_names and ROLE_SLUG in header_names:
366
            self.add_error(
367
                Error('invalid-role-column',
368
                      _('Either specify role names or role slugs, not both')))
347 369

  
348 370
    def parse_header(self, head, column):
349 371
        splitted = head.split()
......
360 382
            if header.name in SOURCE_COLUMNS:
361 383
                if header.name == SOURCE_ID:
362 384
                    header.key = True
363
            else:
385
            elif header.name not in SPECIAL_COLUMNS:
364 386
                try:
365 387
                    if header.name in ['email', 'first_name', 'last_name', 'username']:
366 388
                        User._meta.get_field(header.name)
......
388 410
        self.headers.append(header)
389 411

  
390 412
        if (not (header.field or header.attribute)
391
                and header.name not in SOURCE_COLUMNS):
413
                and header.name not in SPECIAL_COLUMNS):
392 414
            self.add_error(LineError('unknown-or-missing-attribute',
393 415
                                     _('unknown or missing attribute "%s"') % head,
394 416
                                     line=1, column=column))
......
463 485
    def username_is_unique(self):
464 486
        return app_settings.A2_USERNAME_IS_UNIQUE or self.ou.username_is_unique
465 487

  
488
    @property
489
    def allow_duplicate_key(self):
490
        return ROLE_NAME in self.headers_by_name or ROLE_SLUG in self.headers_by_name
491

  
466 492
    def check_unique_constraints(self, row, unique_map, user=None):
467 493
        ou_users = User.objects.filter(ou=self.ou)
468 494
        users = User.objects.all()
......
479 505
            else:
480 506
                continue
481 507
            if unique_key in unique_map:
482
                errors.append(
483
                    Error('unique-constraint-failed',
484
                          _('Unique constraint on column "%(column)s" failed: '
485
                            'value already appear on line %(line)d') % {
486
                                'column': header.name,
487
                                'line': unique_map[unique_key]}))
508
                if user and self.allow_duplicate_key:
509
                    row.user_first_seen = False
510
                else:
511
                    errors.append(
512
                        Error('unique-constraint-failed',
513
                              _('Unique constraint on column "%(column)s" failed: '
514
                                'value already appear on line %(line)d') % {
515
                                    'column': header.name,
516
                                    'line': unique_map[unique_key]}))
488 517
            else:
489 518
                unique_map[unique_key] = row.line
490 519

  
......
500 529
                atvs = AttributeValue.objects.filter(attribute__name=cell.header.name, content=cell.value)
501 530
                unique = not qs.filter(attribute_values__in=atvs).exists()
502 531
            if not unique:
503
                errors.append(
504
                    Error('unique-constraint-failed', _('Unique constraint on column "%s" failed') % cell.header.name))
532
                if user and self.allow_duplicate_key:
533
                    row.user_first_seen = False
534
                else:
535
                    errors.append(
536
                        Error('unique-constraint-failed',
537
                              _('Unique constraint on column "%s" failed') % cell.header.name))
505 538
        row.errors.extend(errors)
506 539
        row.is_valid = row.is_valid and not bool(errors)
507 540
        return not bool(errors)
......
510 543
    def do_import_row(self, row, unique_map):
511 544
        if not row.is_valid:
512 545
            return False
546
        success = True
513 547

  
514 548
        for header in self.headers:
515 549
            if header.key:
......
553 587

  
554 588
        if not self.check_unique_constraints(row, unique_map, user=user):
555 589
            return False
590
        if not row.user_first_seen:
591
            cell = next(c for c in row.cells if c.header.name in {ROLE_NAME, ROLE_SLUG})
592
            return self.add_role(cell, user)
556 593

  
557 594
        if not user:
558 595
            user = User(ou=self.ou)
......
597 634
                    continue
598 635
            cell.action = 'nothing'
599 636

  
637
        for cell in row.cells:
638
            if cell.header.field or cell.header.attribute:
639
                continue
640
            if cell.header.name in {ROLE_NAME, ROLE_SLUG}:
641
                success &= self.add_role(cell, user, do_clear=True)
642

  
600 643
        setattr(self, row.action + 'd', getattr(self, row.action + 'd') + 1)
644
        return success
645

  
646
    def add_role(self, cell, user, do_clear=False):
647
        try:
648
            if cell.header.name == ROLE_NAME:
649
                role = Role.objects.get(name=cell.value, ou=self.ou)
650
            elif cell.header.name == ROLE_SLUG:
651
                role = Role.objects.get(slug=cell.value, ou=self.ou)
652
        except Role.DoesNotExist:
653
            cell.errors.append(
654
                Error('role-not-found',
655
                      _('Role "%s" does not exist') % cell.value))
656
            return False
657
        if cell.header.delete:
658
            user.roles.remove(role)
659
        elif cell.header.clear:
660
            if do_clear:
661
                user.roles.clear()
662
            user.roles.add(role)
663
        else:
664
            user.roles.add(role)
665
        cell.action = 'updated'
601 666
        return True
src/authentic2/manager/templates/authentic2/manager/user_imports.html
166 166
        cannot use another key column.
167 167
      {% endblocktrans %}
168 168
    </p>
169
    <h4 id="help-roles">{% trans "Role operations" %}</h4>
170
    <p>
171
      {% blocktrans trimmed %}
172
        Adding existing roles to users is supported. Use either
173
        <var>_role_name</var> or <var>_role_slug</var> special columns to
174
        specify the names or the slugs that should be added to the user. In
175
        order to add multiple roles, simply add a new line, identical to the
176
        first one, except for the value of the role cell. These columns also
177
        accept special flags, as listed below.
178
      {% endblocktrans %}
179
    </p>
180
    <table class="main left">
181
      <thead>
182
        <tr>
183
          <th>{% trans "Flag" %}</th>
184
          <th>{% trans "Meaning" %}</th>
185
          <th>{% trans "Default value" %}</th>
186
        </tr>
187
      </thead>
188
      <tbody>
189
        <tr>
190
          <td>delete</td>
191
          <td>
192
            {% blocktrans trimmed %}
193
              Remove role from user instead of adding it.
194
            {% endblocktrans %}
195
          </td>
196
          <td>{% trans "False" %}</td>
197
        </tr>
198
        <tr>
199
          <td>clear</td>
200
          <td>
201
            {% blocktrans trimmed %}
202
            Clear user roles beforehand, so that they will have no more roles
203
            than those specified in the import file.
204
            {% endblocktrans %}
205
          </td>
206
          <td>{% trans "False" %}</td>
207
        </tr>
208
      </tbody>
209
    </table>
169 210
    <h4>{% trans "Examples" %}</h4>
170 211
    <p>{% blocktrans trimmed %}Importing first and last name of users keyed by email{% endblocktrans %}</p>
171 212
    <blockquote>
......
183 224
    <blockquote>
184 225
      <pre>_source_name,_source_id,email,"family_reference unique",first_name,last_name
185 226
app1,1,john.doe@example.com,1234,John,Doe
227
</pre>
228
    </blockquote>
229
    <p>{% blocktrans trimmed %}Importing email, first and last name of users
230
    while adding roles.{% endblocktrans %}</p>
231
    <blockquote>
232
      <pre>email key,first_name,last_name,_role_name
233
john.doe@example.com,John,Doe,Role1
234
john.doe@example.com,John,Doe,Role2
186 235
</pre>
187 236
    </blockquote>
188 237
  </div>
tests/test_csv_import.py
21 21

  
22 22
import io
23 23

  
24
from django_rbac.utils import get_role_model
25

  
24 26
from authentic2.custom_user.models import User
25 27
from authentic2.models import Attribute
26 28
from authentic2.a2_rbac.utils import get_default_ou
27 29

  
28 30
from authentic2.csv_import import CsvImporter, UserCsvImporter, CsvHeader, Error, LineError
29 31

  
32
Role = get_role_model()
33

  
30 34
ENCODINGS = [
31 35
    'iso-8859-1',
32 36
    'iso-8859-15',
......
389 393
    importer = user_csv_importer_factory(content)
390 394
    assert importer.run(), importer.errors
391 395
    assert not importer.has_errors
396

  
397

  
398
def test_user_roles_csv(profile, user_csv_importer_factory):
399
    role_name = 'test_name'
400
    role_slug = 'test_slug'
401
    role = Role.objects.create(name=role_name, slug=role_slug, ou=get_default_ou())
402
    role2 = Role.objects.create(name='test2', ou=get_default_ou())
403
    base_header = 'email key,first_name,last_name,phone,'
404
    base_user = 'tnoel@entrouvert.com,Thomas,Noël,1234,'
405

  
406
    content_name_add = '\n'.join((base_header + '_role_name', base_user + role_name))
407
    importer = user_csv_importer_factory(content_name_add)
408
    assert importer.run()
409
    thomas = User.objects.get(email='tnoel@entrouvert.com')
410
    assert thomas in role.members.all()
411

  
412
    thomas.roles.add(role2)
413
    importer = user_csv_importer_factory(content_name_add)
414
    assert importer.run()
415
    thomas.refresh_from_db()
416
    assert thomas in role2.members.all()
417

  
418
    content_name_delete = '\n'.join((base_header + '_role_name delete', base_user + role_name))
419
    importer = user_csv_importer_factory(content_name_delete)
420
    assert importer.run()
421
    thomas.refresh_from_db()
422
    assert thomas not in role.members.all()
423
    assert thomas in role2.members.all()
424

  
425
    content_name_clear = '\n'.join((base_header + '_role_name clear', base_user + role_name))
426
    importer = user_csv_importer_factory(content_name_clear)
427
    assert importer.run()
428
    thomas.refresh_from_db()
429
    assert thomas in role.members.all()
430
    assert thomas not in role2.members.all()
431

  
432
    thomas.roles.remove(role)
433
    content_name_add_multiple = '\n'.join((base_header + '_role_name', base_user + role_name,
434
                                           base_user + 'test2'))
435
    importer = user_csv_importer_factory(content_name_add_multiple)
436
    assert importer.run()
437
    thomas.refresh_from_db()
438
    assert thomas in role.members.all()
439
    assert thomas in role2.members.all()
440

  
441
    thomas.roles.remove(role)
442
    thomas.roles.remove(role2)
443
    content_name_clear_multiple = '\n'.join((base_header + '_role_name clear',
444
                                             base_user + role_name,
445
                                             base_user + 'test2'))
446
    importer = user_csv_importer_factory(content_name_clear_multiple)
447
    assert importer.run()
448
    thomas.refresh_from_db()
449
    assert thomas in role.members.all()
450
    assert thomas in role2.members.all()
451

  
452
    thomas.roles.remove(role)
453
    content_slug_add = '\n'.join((base_header + '_role_slug', base_user + role_slug))
454
    importer = user_csv_importer_factory(content_slug_add)
455
    assert importer.run()
456
    thomas.refresh_from_db()
457
    assert thomas in role.members.all()
458

  
459
    thomas.roles.remove(role)
460
    content_only_key = '''email key,_role_name
461
tnoel@entrouvert.com,test_name'''
462
    importer = user_csv_importer_factory(content_slug_add)
463
    assert importer.run()
464
    thomas.refresh_from_db()
465
    assert thomas in role.members.all()
466

  
467
    content_name_error = '\n'.join((base_header + '_role_name', base_user + 'bad_name'))
468
    importer = user_csv_importer_factory(content_name_error)
469
    assert importer.run()
470
    assert importer.has_errors
471
    assert importer.rows[0].cells[-1].errors[0].code == 'role-not-found'
472

  
473
    content_header_error = '\n'.join((base_header + '_role_name,_role_slug',
474
                                      base_user + ','.join((role_name, role_slug))))
475
    importer = user_csv_importer_factory(content_header_error)
476
    assert not importer.run()
477
    assert importer.has_errors
478
    assert importer.errors[0].code == 'invalid-role-column'
392
-