0001-bdd-user-migration-from-csv-9717.patch
mandayejs/applications.py | ||
---|---|---|
62 | 62 |
dct['SITE_AUTH_CHECKER'] = os.path.join(settings.STATIC_ROOT, |
63 | 63 |
dct['SITE_AUTH_CHECKER']) |
64 | 64 |
|
65 |
# Global js scripts for the current app |
|
66 |
if dct.get('SITE_APP_SCRIPTS',None): |
|
67 |
dct['SITE_APP_SCRIPTS'] = [ |
|
68 |
script for script in dct['SITE_APP_SCRIPTS'] |
|
69 |
] |
|
70 | 65 |
# Default form submit element |
71 | 66 |
if not dct.get('SITE_FORM_SUBMIT_ELEMENT', None): |
72 | 67 |
dct['SITE_FORM_SUBMIT_ELEMENT'] = 'input[type=submit], button' |
... | ... | |
77 | 72 |
class AppSettings(object): |
78 | 73 |
__metaclass__ = AppSettingsMeta |
79 | 74 | |
75 |
# Test App Settings |
|
76 |
class Test(AppSettings): |
|
77 |
SITE_LOGIN_PATH = '/' |
|
78 |
SITE_LOCATORS = [ |
|
79 |
{ |
|
80 |
'id': '#login', |
|
81 |
'label': 'Login', |
|
82 |
'name': 'login', |
|
83 |
'kind': 'string', |
|
84 |
}, |
|
85 |
{ |
|
86 |
'id': '#password', |
|
87 |
'label': 'Password', |
|
88 |
'name': 'password', |
|
89 |
'kind': 'password' |
|
90 |
} |
|
91 |
] |
|
92 |
SITE_AUTH_CHECKER = 'js/test/auth.checker.js' |
|
93 |
SITE_AUTH_COOKIE_KEYS = [ 'test'] |
|
94 |
SITE_FORCE_REDIRECT_URL = '/whatever' |
|
80 | 95 | |
81 | 96 |
# Duonet App Settings |
82 | 97 |
class Duonet(AppSettings): |
mandayejs/mandaye/management/commands/migrate-ldap-users.py | ||
---|---|---|
1 |
from __future__ import absolute_import |
|
2 | ||
3 |
import json |
|
4 |
import ldif |
|
5 |
import logging |
|
6 | ||
7 |
from django.core.management.base import BaseCommand, CommandError |
|
8 |
from django.db import IntegrityError |
|
9 |
from django.contrib.auth import get_user_model |
|
10 | ||
11 |
from mandayejs.mandaye.models import UserCredentials |
|
12 | ||
13 |
logger = logging.getLogger(__name__) |
|
14 | ||
15 |
class Command(BaseCommand): |
|
16 |
args = '<ldif_file>' |
|
17 |
help = 'Migrate users from ldap' |
|
18 | ||
19 |
def handle(self, *args, **kwargs): |
|
20 | ||
21 |
if len(args) < 1: |
|
22 |
self.stdout.write('An input file is required') |
|
23 |
return False |
|
24 |
for ldif_file in args: |
|
25 |
data = self.get_ldif_data(ldif_file) |
|
26 |
data = [ d[1] for d in data] |
|
27 |
self.migrate(data) |
|
28 | ||
29 |
def get_ldif_data(self, filename): |
|
30 |
with open(filename, 'rb') as fd: |
|
31 |
return ldif.ParseLDIF(fd) |
|
32 | ||
33 |
def migrate(self, parsed_data): |
|
34 |
User = get_user_model() |
|
35 |
for data in parsed_data: |
|
36 |
data = { k : ''.join(v) for k,v in data.items()} |
|
37 |
try: |
|
38 |
user, created = User.objects.get_or_create(username=data.get('idpUniqueID'), |
|
39 |
last_name=data.get('spLogin')) |
|
40 |
user.save() |
|
41 |
uc = UserCredentials(user=user, locators=json.loads(data.get('spPostValues'))) |
|
42 |
uc.decrypt() |
|
43 |
uc.save() |
|
44 |
self.stdout.write('{idpUniqueID} imported'.format(**data)) |
|
45 |
logger.debug('{idpUniqueID} imported'.format(**data)) |
|
46 |
except (IntegrityError,) as e: |
|
47 |
logger.debug(e) |
|
48 |
continue |
|
49 |
mandayejs/mandaye/management/commands/migrate-users.py | ||
---|---|---|
1 |
from __future__ import absolute_import |
|
2 | ||
3 |
import os |
|
4 |
import csv |
|
5 |
import json |
|
6 |
import ldif |
|
7 |
import logging |
|
8 |
from optparse import make_option |
|
9 | ||
10 |
from django.core.management.base import BaseCommand, CommandError |
|
11 |
from django.db import IntegrityError |
|
12 |
from django.contrib.auth.models import User |
|
13 | ||
14 |
from mandayejs.mandaye.models import UserCredentials |
|
15 | ||
16 |
logger = logging.getLogger(__name__) |
|
17 | ||
18 | ||
19 |
class Command(BaseCommand): |
|
20 |
args = '<filename>' |
|
21 |
help = 'Migrate users from ldif file or csv file' |
|
22 | ||
23 |
option_list = BaseCommand.option_list +( |
|
24 |
make_option( |
|
25 |
'--ldap', |
|
26 |
action='store_true', |
|
27 |
default=False, |
|
28 |
help='Migrate users from a ldap dump file' |
|
29 |
), |
|
30 |
make_option( |
|
31 |
'--csv', |
|
32 |
action='store_true', |
|
33 |
default=False, |
|
34 |
help='Migrate users from a csv file' |
|
35 |
) |
|
36 |
) |
|
37 | ||
38 |
def handle(self, *args, **kwargs): |
|
39 |
if len(args) < 1: |
|
40 |
self.stdout.write('An input file is required') |
|
41 |
return False |
|
42 | ||
43 |
if kwargs.get('csv'): |
|
44 |
for filename in args: |
|
45 |
data = self.get_csv_data(filename) |
|
46 |
self.migrate(data) |
|
47 |
else: |
|
48 |
for filename in args: |
|
49 |
data = self.get_ldif_data(filename) |
|
50 |
data = [ d[1] for d in data] |
|
51 |
data = [ |
|
52 |
{ k : ''.join(v) for k,v in d.items()} for d in data |
|
53 |
] |
|
54 |
self.migrate(data) |
|
55 | ||
56 |
def get_ldif_data(self, filename): |
|
57 |
with open(filename, 'rb') as fd: |
|
58 |
return ldif.ParseLDIF(fd) |
|
59 | ||
60 |
def get_csv_data(self, filename): |
|
61 |
with open(filename, 'rb') as fd: |
|
62 |
fieldnames = ['idpUniqueID', 'spPostValues'] |
|
63 |
reader = csv.DictReader(fd, delimiter=';', quotechar='|', fieldnames=fieldnames) |
|
64 |
return list(reader) |
|
65 | ||
66 | ||
67 |
def migrate(self, parsed_data): |
|
68 |
for data in parsed_data: |
|
69 |
try: |
|
70 |
user, created = User.objects.get_or_create(username=data.get('idpUniqueID'), |
|
71 |
last_name=data.get('spLogin','')) |
|
72 |
user.save() |
|
73 |
uc = UserCredentials(user=user, locators=json.loads(data.get('spPostValues'))) |
|
74 |
uc.decrypt() |
|
75 |
uc.save() |
|
76 |
logger.debug('{idpUniqueID} imported'.format(**data)) |
|
77 |
except (IntegrityError,) as e: |
|
78 |
logger.debug(e) |
|
79 |
continue |
|
80 |
tests/csv_users.csv | ||
---|---|---|
1 |
_288CE3839831B75D8ADD6524329236B7;{"username": "csv1", "password": "cIrWyg=="} |
|
2 |
_8C36D00EE18E5702A51813413097028F;{"username": "csv2", "password": "cIrWyQ=="} |
|
3 |
_45EB3616CEE069A42AFD23D7039E6FBD;{"username": "csv3", "password": "cIrWyA=="} |
|
4 |
_45EB3616CEJ069A42AFD23D70S9E6FBD;{"username": "csv4", "password": "cIrWzw=="} |
tests/ldap_users.txt | ||
---|---|---|
11 | 11 |
entryUUID: 061ece3e-f3ac-1033-91e7-8926ba287cf0 |
12 | 12 |
creatorsName: cn=admin,dc=entrouvert,dc=org |
13 | 13 |
createTimestamp: 20141029113944Z |
14 |
spPostValues: {"txtCode": "Y4HL6cbGxRsoHQU97VzXgkqEdA==", "txtNomFoyer": "ldap_user1", "t |
|
15 |
xtDateNaissance": "23/04/1991"} |
|
14 |
spPostValues: {"password": "Y4HL6cbGxRsoHQU97VzXgkqEdA==", "login": "ldap_user1"} |
|
16 | 15 |
entryCSN: 20141029133206.125440Z#000000#001#000000 |
17 | 16 |
modifiersName: cn=admin,dc=entrouvert,dc=org |
18 | 17 |
modifyTimestamp: 20141029133206Z |
... | ... | |
30 | 29 |
entryUUID: 061fe3c8-f3ac-1033-91ea-8926ba287cf0 |
31 | 30 |
creatorsName: cn=admin,dc=entrouvert,dc=org |
32 | 31 |
createTimestamp: 20141029113944Z |
33 |
spPostValues: {"txtCode": "Y4HL6cbGxRsoHQU97VzXgkqEdw==", "txtNomFoyer": "ldap_user2", "t |
|
34 |
xtDateNaissance": "23/04/1991"} |
|
32 |
spPostValues: {"password": "Y4HL6cbGxRsoHQU97VzXgkqEdw==", "login": "ldap_user2"} |
|
35 | 33 |
entryCSN: 20141029133206.144171Z#000000#001#000000 |
36 | 34 |
modifiersName: cn=admin,dc=entrouvert,dc=org |
37 | 35 |
modifyTimestamp: 20141029133206Z |
... | ... | |
49 | 47 |
entryUUID: 0620818e-f3ac-1033-91ec-8926ba287cf0 |
50 | 48 |
creatorsName: cn=admin,dc=entrouvert,dc=org |
51 | 49 |
createTimestamp: 20141029113944Z |
52 |
spPostValues: {"txtCode": "Y4HL6cbGxRsoHQU97VzXgkqEdg==", "txtNomFoyer": "ldap_user3", |
|
53 |
"txtDateNaissance": "19/08/1953"} |
|
50 |
spPostValues: {"password": "Y4HL6cbGxRsoHQU97VzXgkqEdg==", "login": "ldap_user3"} |
|
54 | 51 |
entryCSN: 20141029133206.156443Z#000000#001#000000 |
55 | 52 |
modifiersName: cn=admin,dc=entrouvert,dc=org |
56 | 53 |
modifyTimestamp: 20141029133206Z |
tests/tests.py | ||
---|---|---|
8 | 8 | |
9 | 9 |
pytestmark = pytest.mark.django_db |
10 | 10 | |
11 |
# Encryption/Decryption |
|
11 |
settings.SITE_APP = 'mandayejs.applications.Test' |
|
12 |
settings.SECRET_KEY = 'od5cei4aeveel8dui4lei2ou9ahsei2A' |
|
12 | 13 | |
14 |
# ENCRYPTION/DECRYPTION |
|
13 | 15 |
def create_user(**kwargs): |
14 | 16 |
password = kwargs.pop('password', None) or kwargs.get('username') |
15 | 17 |
user, created = User.objects.get_or_create(**kwargs) |
... | ... | |
44 | 46 |
assert decrypted.get('password') == 'john password' |
45 | 47 |
|
46 | 48 | |
47 |
# Migration |
|
48 |
def test_migrate_users_command(): |
|
49 |
# MIGRATION COMMAND |
|
50 |
def cmd(*args, **kwargs): |
|
51 |
cmd = type( |
|
52 |
'Cmd', |
|
53 |
(object,), |
|
54 |
{ |
|
55 |
'name': 'migrate-users', |
|
56 |
'args': args, |
|
57 |
'opts': kwargs |
|
58 |
} |
|
59 |
) |
|
60 |
return cmd |
|
49 | 61 | |
50 |
args = ['tests/ldap_users.txt',] |
|
51 |
opts = {} |
|
52 |
call_command('migrate-ldap-users', *args, **opts) |
|
62 |
@pytest.fixture |
|
63 |
def command_ldap(): |
|
64 |
return cmd( |
|
65 |
'tests/ldap_users.txt', |
|
66 |
ldap= True |
|
67 |
) |
|
68 | ||
69 |
@pytest.fixture |
|
70 |
def command_csv(): |
|
71 |
return cmd( |
|
72 |
'tests/csv_users.csv', |
|
73 |
csv= True |
|
74 |
) |
|
75 | ||
76 |
@pytest.fixture(params=['command_ldap', 'command_csv']) |
|
77 |
def command(request, command_ldap, command_csv): |
|
78 |
return locals().get(request.param) |
|
53 | 79 | |
54 |
credentials = UserCredentials.objects.filter(user__last_name__in=[ |
|
80 |
def test_command_migrate_users(command): |
|
81 |
call_command(command.name, *command.args, **command.opts) |
|
82 |
if command.opts.get('ldap'): |
|
83 |
credentials = UserCredentials.objects.filter(user__last_name__in=[ |
|
84 |
'ldap_user1', |
|
85 |
'ldap_user2', |
|
86 |
'ldap_user3' |
|
87 |
]) |
|
88 | ||
89 |
assert len(credentials) == 3 |
|
90 | ||
91 |
for cred in credentials: |
|
92 |
assert cred.to_login_info(decrypt=True)['#password'] == 'password_{}'.format(cred.user.last_name) |
|
93 |
else: |
|
94 |
credentials = UserCredentials.objects.all().exclude(user__last_name__in=[ |
|
55 | 95 |
'ldap_user1', |
56 | 96 |
'ldap_user2', |
57 | 97 |
'ldap_user3' |
58 | 98 |
]) |
59 | 99 | |
60 |
assert len(credentials) == 3
|
|
100 |
assert len(credentials) == 4
|
|
61 | 101 | |
62 |
for cred in credentials: |
|
63 |
assert cred.to_login_info(decrypt=True)['#txtCode'] == 'password_{}'.format(cred.user.last_name)
|
|
102 |
for cred in credentials:
|
|
103 |
assert cred.to_login_info(decrypt=True)['#password'] == cred.to_login_info()['#username']
|
|
64 | 104 | |
65 |
- |