Projet

Général

Profil

0001-bdd-user-migration-from-csv-9717.patch

Josué Kouka, 26 janvier 2016 16:41

Télécharger (11,7 ko)

Voir les différences:

Subject: [PATCH] bdd user migration from csv (#9717)

 mandayejs/applications.py                          | 25 +++++--
 .../management/commands/migrate-ldap-users.py      | 49 -------------
 .../mandaye/management/commands/migrate-users.py   | 82 ++++++++++++++++++++++
 tests/csv_users.csv                                |  4 ++
 tests/ldap_users.txt                               |  9 +--
 tests/tests.py                                     | 61 +++++++++++++---
 6 files changed, 160 insertions(+), 70 deletions(-)
 delete mode 100644 mandayejs/mandaye/management/commands/migrate-ldap-users.py
 create mode 100644 mandayejs/mandaye/management/commands/migrate-users.py
 create mode 100644 tests/csv_users.csv
mandayejs/applications.py
62 62
            dct['SITE_AUTH_CHECKER'] = os.path.join(settings.STATIC_ROOT, 
63 63
                    dct['SITE_AUTH_CHECKER'])
64 64
            
65
            # Global js scripts for the current app
66
            if dct.get('SITE_APP_SCRIPTS',None):
67
                dct['SITE_APP_SCRIPTS'] = [ 
68
                    script for script in dct['SITE_APP_SCRIPTS']
69
                ]
70 65
            # Default form submit element
71 66
            if not dct.get('SITE_FORM_SUBMIT_ELEMENT', None):
72 67
                dct['SITE_FORM_SUBMIT_ELEMENT'] = 'input[type=submit], button'
......
77 72
class AppSettings(object):
78 73
    __metaclass__ = AppSettingsMeta
79 74

  
75
# Test App Settings
76
class Test(AppSettings):
77
    SITE_LOGIN_PATH = '/'
78
    SITE_LOCATORS = [
79
        {
80
            'id': '#login',
81
            'label': 'Login',
82
            'name': 'login',
83
            'kind': 'string',
84
        },
85
        {
86
            'id': '#password',
87
            'label': 'Password',
88
            'name': 'password',
89
            'kind': 'password'
90
        }
91
    ]
92
    SITE_AUTH_CHECKER = 'js/test/auth.checker.js'
93
    SITE_AUTH_COOKIE_KEYS = [ 'test']
94
    SITE_FORCE_REDIRECT_URL = '/whatever'
80 95

  
81 96
# Duonet App Settings
82 97
class Duonet(AppSettings):
mandayejs/mandaye/management/commands/migrate-ldap-users.py
1
from __future__ import absolute_import
2

  
3
import json
4
import ldif
5
import logging
6

  
7
from django.core.management.base import BaseCommand, CommandError
8
from django.db import IntegrityError
9
from django.contrib.auth import get_user_model
10

  
11
from mandayejs.mandaye.models import UserCredentials  
12

  
13
logger = logging.getLogger(__name__)
14

  
15
class Command(BaseCommand):
16
    args = '<ldif_file>'
17
    help = 'Migrate users from ldap'
18

  
19
    def handle(self, *args, **kwargs):
20

  
21
        if len(args) < 1:
22
            self.stdout.write('An input file is required')
23
            return False
24
        for ldif_file in args:
25
            data = self.get_ldif_data(ldif_file)
26
            data = [ d[1] for d in data]
27
            self.migrate(data)
28

  
29
    def get_ldif_data(self, filename):
30
        with open(filename, 'rb') as fd:
31
            return ldif.ParseLDIF(fd)
32

  
33
    def migrate(self, parsed_data):
34
        User = get_user_model()
35
        for data in parsed_data:
36
            data = { k : ''.join(v) for k,v in data.items()}
37
            try:
38
                user, created = User.objects.get_or_create(username=data.get('idpUniqueID'), 
39
                        last_name=data.get('spLogin'))
40
                user.save()
41
                uc = UserCredentials(user=user, locators=json.loads(data.get('spPostValues')))
42
                uc.decrypt()
43
                uc.save()
44
                self.stdout.write('{idpUniqueID} imported'.format(**data))
45
                logger.debug('{idpUniqueID} imported'.format(**data))
46
            except (IntegrityError,) as e:
47
                logger.debug(e)
48
                continue
49

  
mandayejs/mandaye/management/commands/migrate-users.py
1
from __future__ import absolute_import
2

  
3
import os
4
import csv
5
import json
6
import ldif
7
import logging
8
from optparse import make_option
9

  
10
from django.core.management.base import BaseCommand, CommandError
11
from django.db import IntegrityError
12
from django.contrib.auth.models import User
13

  
14
from mandayejs.mandaye.models import UserCredentials  
15

  
16
logger = logging.getLogger(__name__)
17

  
18
def change_keys_name(s):
19
    d = json.loads(s)
20

  
21
class Command(BaseCommand):
22
    args = '<filename>'
23
    help = 'Migrate users from ldif file or csv file'
24

  
25
    option_list = BaseCommand.option_list +(
26
        make_option(
27
            '--ldap',
28
            action='store_true',
29
            default=False,
30
            help='Migrate users from a ldap dump file'
31
        ),
32
        make_option(
33
            '--csv',
34
            action='store_true',
35
            default=False,
36
            help='Migrate users from a csv file'
37
        )
38
    )
39

  
40
    def handle(self, *args, **kwargs):
41
        if len(args) < 1:
42
            self.stdout.write('An input file is required')
43
            return False
44

  
45
        if kwargs.get('csv'):
46
            for filename in args:
47
                data = self.get_csv_data(filename)
48
                self.migrate(data)
49
        else:
50
            for filename in args:
51
                data = self.get_ldif_data(filename)
52
                data = [ d[1] for d in data]
53
                data = [ 
54
                    { k : ''.join(v) for k,v in d.items()} for d in data
55
                ]
56
                self.migrate(data)
57

  
58
    def get_ldif_data(self, filename):
59
        with open(filename, 'rb') as fd:
60
            return ldif.ParseLDIF(fd)
61

  
62
    def get_csv_data(self, filename):
63
        with open(filename, 'rb') as fd:
64
            fieldnames = ['idpUniqueID', 'spPostValues']
65
            reader = csv.DictReader(fd, delimiter=';', quotechar='|', fieldnames=fieldnames)
66
            return list(reader)
67

  
68

  
69
    def migrate(self, parsed_data):
70
        for data in parsed_data:
71
            try:
72
                user, created = User.objects.get_or_create(username=data.get('idpUniqueID'), 
73
                        last_name=data.get('spLogin',''))
74
                user.save()
75
                uc = UserCredentials(user=user, locators=json.loads(data.get('spPostValues')))
76
                uc.decrypt()
77
                uc.save()
78
                logger.debug('{idpUniqueID} imported'.format(**data))
79
            except (IntegrityError,) as e:
80
                logger.debug(e)
81
                continue
82

  
tests/csv_users.csv
1
_288CE3839831B75D8ADD6524329236B7;{"username": "csv1", "password": "cIrWyg=="}
2
_8C36D00EE18E5702A51813413097028F;{"username": "csv2", "password": "cIrWyQ=="}
3
_45EB3616CEE069A42AFD23D7039E6FBD;{"username": "csv3", "password": "cIrWyA=="}
4
_45EB3616CEJ069A42AFD23D70S9E6FBD;{"username": "csv4", "password": "cIrWzw=="}
tests/ldap_users.txt
11 11
entryUUID: 061ece3e-f3ac-1033-91e7-8926ba287cf0
12 12
creatorsName: cn=admin,dc=entrouvert,dc=org
13 13
createTimestamp: 20141029113944Z
14
spPostValues: {"txtCode": "Y4HL6cbGxRsoHQU97VzXgkqEdA==", "txtNomFoyer": "ldap_user1", "t
15
 xtDateNaissance": "23/04/1991"}
14
spPostValues: {"password": "Y4HL6cbGxRsoHQU97VzXgkqEdA==", "login": "ldap_user1"}
16 15
entryCSN: 20141029133206.125440Z#000000#001#000000
17 16
modifiersName: cn=admin,dc=entrouvert,dc=org
18 17
modifyTimestamp: 20141029133206Z
......
30 29
entryUUID: 061fe3c8-f3ac-1033-91ea-8926ba287cf0
31 30
creatorsName: cn=admin,dc=entrouvert,dc=org
32 31
createTimestamp: 20141029113944Z
33
spPostValues: {"txtCode": "Y4HL6cbGxRsoHQU97VzXgkqEdw==", "txtNomFoyer": "ldap_user2", "t
34
 xtDateNaissance": "23/04/1991"}
32
spPostValues: {"password": "Y4HL6cbGxRsoHQU97VzXgkqEdw==", "login": "ldap_user2"}
35 33
entryCSN: 20141029133206.144171Z#000000#001#000000
36 34
modifiersName: cn=admin,dc=entrouvert,dc=org
37 35
modifyTimestamp: 20141029133206Z
......
49 47
entryUUID: 0620818e-f3ac-1033-91ec-8926ba287cf0
50 48
creatorsName: cn=admin,dc=entrouvert,dc=org
51 49
createTimestamp: 20141029113944Z
52
spPostValues: {"txtCode": "Y4HL6cbGxRsoHQU97VzXgkqEdg==", "txtNomFoyer": "ldap_user3",
53
  "txtDateNaissance": "19/08/1953"}
50
spPostValues: {"password": "Y4HL6cbGxRsoHQU97VzXgkqEdg==", "login": "ldap_user3"}
54 51
entryCSN: 20141029133206.156443Z#000000#001#000000
55 52
modifiersName: cn=admin,dc=entrouvert,dc=org
56 53
modifyTimestamp: 20141029133206Z
tests/tests.py
3 3
from django.conf import settings
4 4
from django.contrib.auth.models import User
5 5
from django.core.management import call_command
6
from django.core.management.base import BaseCommand
6 7

  
7 8
from mandayejs.mandaye.models import UserCredentials
8 9

  
9 10
pytestmark = pytest.mark.django_db
10 11

  
11
# Encryption/Decryption
12
settings.SITE_APP = 'mandayejs.applications.Test'
13
settings.SECRET_KEY = 'od5cei4aeveel8dui4lei2ou9ahsei2A'
12 14

  
15
# ENCRYPTION/DECRYPTION
13 16
def create_user(**kwargs):
14 17
    password = kwargs.pop('password', None) or kwargs.get('username')
15 18
    user, created = User.objects.get_or_create(**kwargs)
......
44 47
    assert decrypted.get('password') == 'john password'
45 48
    
46 49

  
47
# Migration
48
def test_migrate_users_command():
50
# MIGRATION COMMAND
51
def cmd(*args, **kwargs):
52
    cmd = type(
53
        'Cmd',
54
        (object,),
55
        {
56
            'name': 'migrate-users',
57
            'args': args,
58
            'opts': kwargs
59
        }
60
    )
61
    return cmd 
49 62

  
50
    args = ['tests/ldap_users.txt',]
51
    opts = {}
52
    call_command('migrate-ldap-users', *args, **opts)
63
@pytest.fixture
64
def command_ldap():
65
    return cmd(
66
            'tests/ldap_users.txt',
67
            ldap= True
68
        )
69

  
70
@pytest.fixture
71
def command_csv():
72
    return cmd(
73
        'tests/csv_users.csv',
74
         csv= True
75
    )
76

  
77
@pytest.fixture(params=['command_ldap', 'command_csv'])
78
def command(request, command_ldap, command_csv):
79
    return locals().get(request.param)
53 80

  
54
    credentials = UserCredentials.objects.filter(user__last_name__in=[
81
def test_command_migrate_users(command):
82
    call_command(command.name, *command.args, **command.opts)
83
    if command.opts.get('ldap'):
84
        credentials = UserCredentials.objects.filter(user__last_name__in=[
85
                'ldap_user1',
86
                'ldap_user2',
87
                'ldap_user3'
88
            ])
89

  
90
        assert len(credentials) == 3
91

  
92
        for cred in credentials:
93
            assert cred.to_login_info(decrypt=True)['#password'] == 'password_{}'.format(cred.user.last_name)
94
    else:
95
        credentials = UserCredentials.objects.all().exclude(user__last_name__in=[
55 96
            'ldap_user1',
56 97
            'ldap_user2',
57 98
            'ldap_user3'
58 99
        ])
59 100

  
60
    assert len(credentials) == 3
101
        assert len(credentials) == 4
61 102

  
62
    for cred in credentials:
63
        assert cred.to_login_info(decrypt=True)['#txtCode'] == 'password_{}'.format(cred.user.last_name)
103
        for cred in credentials:
104
            assert cred.to_login_info(decrypt=True)['#password'] == cred.to_login_info()['#username']
64 105

  
65
-