Projet

Général

Profil

0001-bdd-user-migration-from-csv-9717.patch

Josué Kouka, 26 janvier 2016 16:57

Télécharger (11,5 ko)

Voir les différences:

Subject: [PATCH] bdd user migration from csv (#9717)

 mandayejs/applications.py                          | 25 +++++--
 .../management/commands/migrate-ldap-users.py      | 49 -------------
 .../mandaye/management/commands/migrate-users.py   | 80 ++++++++++++++++++++++
 tests/csv_users.csv                                |  4 ++
 tests/ldap_users.txt                               |  9 +--
 tests/tests.py                                     | 60 +++++++++++++---
 6 files changed, 157 insertions(+), 70 deletions(-)
 delete mode 100644 mandayejs/mandaye/management/commands/migrate-ldap-users.py
 create mode 100644 mandayejs/mandaye/management/commands/migrate-users.py
 create mode 100644 tests/csv_users.csv
mandayejs/applications.py
62 62
            dct['SITE_AUTH_CHECKER'] = os.path.join(settings.STATIC_ROOT, 
63 63
                    dct['SITE_AUTH_CHECKER'])
64 64
            
65
            # Global js scripts for the current app
66
            if dct.get('SITE_APP_SCRIPTS',None):
67
                dct['SITE_APP_SCRIPTS'] = [ 
68
                    script for script in dct['SITE_APP_SCRIPTS']
69
                ]
70 65
            # Default form submit element
71 66
            if not dct.get('SITE_FORM_SUBMIT_ELEMENT', None):
72 67
                dct['SITE_FORM_SUBMIT_ELEMENT'] = 'input[type=submit], button'
......
77 72
class AppSettings(object):
78 73
    __metaclass__ = AppSettingsMeta
79 74

  
75
# Test App Settings
76
class Test(AppSettings):
77
    SITE_LOGIN_PATH = '/'
78
    SITE_LOCATORS = [
79
        {
80
            'id': '#login',
81
            'label': 'Login',
82
            'name': 'login',
83
            'kind': 'string',
84
        },
85
        {
86
            'id': '#password',
87
            'label': 'Password',
88
            'name': 'password',
89
            'kind': 'password'
90
        }
91
    ]
92
    SITE_AUTH_CHECKER = 'js/test/auth.checker.js'
93
    SITE_AUTH_COOKIE_KEYS = [ 'test']
94
    SITE_FORCE_REDIRECT_URL = '/whatever'
80 95

  
81 96
# Duonet App Settings
82 97
class Duonet(AppSettings):
mandayejs/mandaye/management/commands/migrate-ldap-users.py
1
from __future__ import absolute_import
2

  
3
import json
4
import ldif
5
import logging
6

  
7
from django.core.management.base import BaseCommand, CommandError
8
from django.db import IntegrityError
9
from django.contrib.auth import get_user_model
10

  
11
from mandayejs.mandaye.models import UserCredentials  
12

  
13
logger = logging.getLogger(__name__)
14

  
15
class Command(BaseCommand):
16
    args = '<ldif_file>'
17
    help = 'Migrate users from ldap'
18

  
19
    def handle(self, *args, **kwargs):
20

  
21
        if len(args) < 1:
22
            self.stdout.write('An input file is required')
23
            return False
24
        for ldif_file in args:
25
            data = self.get_ldif_data(ldif_file)
26
            data = [ d[1] for d in data]
27
            self.migrate(data)
28

  
29
    def get_ldif_data(self, filename):
30
        with open(filename, 'rb') as fd:
31
            return ldif.ParseLDIF(fd)
32

  
33
    def migrate(self, parsed_data):
34
        User = get_user_model()
35
        for data in parsed_data:
36
            data = { k : ''.join(v) for k,v in data.items()}
37
            try:
38
                user, created = User.objects.get_or_create(username=data.get('idpUniqueID'), 
39
                        last_name=data.get('spLogin'))
40
                user.save()
41
                uc = UserCredentials(user=user, locators=json.loads(data.get('spPostValues')))
42
                uc.decrypt()
43
                uc.save()
44
                self.stdout.write('{idpUniqueID} imported'.format(**data))
45
                logger.debug('{idpUniqueID} imported'.format(**data))
46
            except (IntegrityError,) as e:
47
                logger.debug(e)
48
                continue
49

  
mandayejs/mandaye/management/commands/migrate-users.py
1
from __future__ import absolute_import
2

  
3
import os
4
import csv
5
import json
6
import ldif
7
import logging
8
from optparse import make_option
9

  
10
from django.core.management.base import BaseCommand, CommandError
11
from django.db import IntegrityError
12
from django.contrib.auth.models import User
13

  
14
from mandayejs.mandaye.models import UserCredentials  
15

  
16
logger = logging.getLogger(__name__)
17

  
18

  
19
class Command(BaseCommand):
20
    args = '<filename>'
21
    help = 'Migrate users from ldif file or csv file'
22

  
23
    option_list = BaseCommand.option_list +(
24
        make_option(
25
            '--ldap',
26
            action='store_true',
27
            default=False,
28
            help='Migrate users from a ldap dump file'
29
        ),
30
        make_option(
31
            '--csv',
32
            action='store_true',
33
            default=False,
34
            help='Migrate users from a csv file'
35
        )
36
    )
37

  
38
    def handle(self, *args, **kwargs):
39
        if len(args) < 1:
40
            self.stdout.write('An input file is required')
41
            return False
42

  
43
        if kwargs.get('csv'):
44
            for filename in args:
45
                data = self.get_csv_data(filename)
46
                self.migrate(data)
47
        else:
48
            for filename in args:
49
                data = self.get_ldif_data(filename)
50
                data = [ d[1] for d in data]
51
                data = [ 
52
                    { k : ''.join(v) for k,v in d.items()} for d in data
53
                ]
54
                self.migrate(data)
55

  
56
    def get_ldif_data(self, filename):
57
        with open(filename, 'rb') as fd:
58
            return ldif.ParseLDIF(fd)
59

  
60
    def get_csv_data(self, filename):
61
        with open(filename, 'rb') as fd:
62
            fieldnames = ['idpUniqueID', 'spPostValues']
63
            reader = csv.DictReader(fd, delimiter=';', quotechar='|', fieldnames=fieldnames)
64
            return list(reader)
65

  
66

  
67
    def migrate(self, parsed_data):
68
        for data in parsed_data:
69
            try:
70
                user, created = User.objects.get_or_create(username=data.get('idpUniqueID'), 
71
                        last_name=data.get('spLogin',''))
72
                user.save()
73
                uc = UserCredentials(user=user, locators=json.loads(data.get('spPostValues')))
74
                uc.decrypt()
75
                uc.save()
76
                logger.debug('{idpUniqueID} imported'.format(**data))
77
            except (IntegrityError,) as e:
78
                logger.debug(e)
79
                continue
80

  
tests/csv_users.csv
1
_288CE3839831B75D8ADD6524329236B7;{"username": "csv1", "password": "cIrWyg=="}
2
_8C36D00EE18E5702A51813413097028F;{"username": "csv2", "password": "cIrWyQ=="}
3
_45EB3616CEE069A42AFD23D7039E6FBD;{"username": "csv3", "password": "cIrWyA=="}
4
_45EB3616CEJ069A42AFD23D70S9E6FBD;{"username": "csv4", "password": "cIrWzw=="}
tests/ldap_users.txt
11 11
entryUUID: 061ece3e-f3ac-1033-91e7-8926ba287cf0
12 12
creatorsName: cn=admin,dc=entrouvert,dc=org
13 13
createTimestamp: 20141029113944Z
14
spPostValues: {"txtCode": "Y4HL6cbGxRsoHQU97VzXgkqEdA==", "txtNomFoyer": "ldap_user1", "t
15
 xtDateNaissance": "23/04/1991"}
14
spPostValues: {"password": "Y4HL6cbGxRsoHQU97VzXgkqEdA==", "login": "ldap_user1"}
16 15
entryCSN: 20141029133206.125440Z#000000#001#000000
17 16
modifiersName: cn=admin,dc=entrouvert,dc=org
18 17
modifyTimestamp: 20141029133206Z
......
30 29
entryUUID: 061fe3c8-f3ac-1033-91ea-8926ba287cf0
31 30
creatorsName: cn=admin,dc=entrouvert,dc=org
32 31
createTimestamp: 20141029113944Z
33
spPostValues: {"txtCode": "Y4HL6cbGxRsoHQU97VzXgkqEdw==", "txtNomFoyer": "ldap_user2", "t
34
 xtDateNaissance": "23/04/1991"}
32
spPostValues: {"password": "Y4HL6cbGxRsoHQU97VzXgkqEdw==", "login": "ldap_user2"}
35 33
entryCSN: 20141029133206.144171Z#000000#001#000000
36 34
modifiersName: cn=admin,dc=entrouvert,dc=org
37 35
modifyTimestamp: 20141029133206Z
......
49 47
entryUUID: 0620818e-f3ac-1033-91ec-8926ba287cf0
50 48
creatorsName: cn=admin,dc=entrouvert,dc=org
51 49
createTimestamp: 20141029113944Z
52
spPostValues: {"txtCode": "Y4HL6cbGxRsoHQU97VzXgkqEdg==", "txtNomFoyer": "ldap_user3",
53
  "txtDateNaissance": "19/08/1953"}
50
spPostValues: {"password": "Y4HL6cbGxRsoHQU97VzXgkqEdg==", "login": "ldap_user3"}
54 51
entryCSN: 20141029133206.156443Z#000000#001#000000
55 52
modifiersName: cn=admin,dc=entrouvert,dc=org
56 53
modifyTimestamp: 20141029133206Z
tests/tests.py
8 8

  
9 9
pytestmark = pytest.mark.django_db
10 10

  
11
# Encryption/Decryption
11
settings.SITE_APP = 'mandayejs.applications.Test'
12
settings.SECRET_KEY = 'od5cei4aeveel8dui4lei2ou9ahsei2A'
12 13

  
14
# ENCRYPTION/DECRYPTION
13 15
def create_user(**kwargs):
14 16
    password = kwargs.pop('password', None) or kwargs.get('username')
15 17
    user, created = User.objects.get_or_create(**kwargs)
......
44 46
    assert decrypted.get('password') == 'john password'
45 47
    
46 48

  
47
# Migration
48
def test_migrate_users_command():
49
# MIGRATION COMMAND
50
def cmd(*args, **kwargs):
51
    cmd = type(
52
        'Cmd',
53
        (object,),
54
        {
55
            'name': 'migrate-users',
56
            'args': args,
57
            'opts': kwargs
58
        }
59
    )
60
    return cmd 
49 61

  
50
    args = ['tests/ldap_users.txt',]
51
    opts = {}
52
    call_command('migrate-ldap-users', *args, **opts)
62
@pytest.fixture
63
def command_ldap():
64
    return cmd(
65
            'tests/ldap_users.txt',
66
            ldap= True
67
        )
68

  
69
@pytest.fixture
70
def command_csv():
71
    return cmd(
72
        'tests/csv_users.csv',
73
         csv= True
74
    )
75

  
76
@pytest.fixture(params=['command_ldap', 'command_csv'])
77
def command(request, command_ldap, command_csv):
78
    return locals().get(request.param)
53 79

  
54
    credentials = UserCredentials.objects.filter(user__last_name__in=[
80
def test_command_migrate_users(command):
81
    call_command(command.name, *command.args, **command.opts)
82
    if command.opts.get('ldap'):
83
        credentials = UserCredentials.objects.filter(user__last_name__in=[
84
                'ldap_user1',
85
                'ldap_user2',
86
                'ldap_user3'
87
            ])
88

  
89
        assert len(credentials) == 3
90

  
91
        for cred in credentials:
92
            assert cred.to_login_info(decrypt=True)['#password'] == 'password_{}'.format(cred.user.last_name)
93
    else:
94
        credentials = UserCredentials.objects.all().exclude(user__last_name__in=[
55 95
            'ldap_user1',
56 96
            'ldap_user2',
57 97
            'ldap_user3'
58 98
        ])
59 99

  
60
    assert len(credentials) == 3
100
        assert len(credentials) == 4
61 101

  
62
    for cred in credentials:
63
        assert cred.to_login_info(decrypt=True)['#txtCode'] == 'password_{}'.format(cred.user.last_name)
102
        for cred in credentials:
103
            assert cred.to_login_info(decrypt=True)['#password'] == cred.to_login_info()['#username']
64 104

  
65
-