0002-create-import_site-and-export_site-commands-16514.patch
src/authentic2/a2_rbac/models.py | ||
---|---|---|
92 | 92 |
def cached(cls): |
93 | 93 |
return cls.objects.all() |
94 | 94 | |
95 |
def export_json(self): |
|
96 |
return { |
|
97 |
'uuid': self.uuid, 'slug': self.slug, 'name': self.name, |
|
98 |
'description': self.description, 'default': self.default, |
|
99 |
'email_is_unique': self.email_is_unique, |
|
100 |
'username_is_unique': self.username_is_unique, |
|
101 |
'validate_emails': self.validate_emails |
|
102 |
} |
|
103 | ||
95 | 104 | |
96 | 105 |
OrganizationalUnit._meta.natural_key = [['uuid'], ['slug'], ['name']] |
97 | 106 | |
... | ... | |
213 | 222 |
'ou__slug': self.ou.slug if self.ou else None, |
214 | 223 |
} |
215 | 224 | |
225 |
def export_json(self, attributes=False, parents=False, permissions=False): |
|
226 |
d = { |
|
227 |
'uuid': self.uuid, 'slug': self.slug, 'name': self.name, |
|
228 |
'description': self.description, 'external_id': self.external_id, |
|
229 |
'ou': self.ou and self.ou.natural_key_json(), |
|
230 |
'service': self.service and self.service.natural_key_json() |
|
231 |
} |
|
232 | ||
233 |
if attributes: |
|
234 |
for attribute in self.attributes.all(): |
|
235 |
d.setdefault('attributes', []).append(attribute.to_json()) |
|
236 | ||
237 |
if parents: |
|
238 |
RoleParenting = rbac_utils.get_role_parenting_model() |
|
239 |
for parenting in RoleParenting.objects.filter(child_id=self.id, direct=True): |
|
240 |
d.setdefault('parents', []).append(parenting.parent.natural_key_json()) |
|
241 | ||
242 |
if permissions: |
|
243 |
for perm in self.permissions.all(): |
|
244 |
d.setdefault('permissions', []).append(perm.export_json()) |
|
245 | ||
246 |
return d |
|
247 | ||
216 | 248 | |
217 | 249 |
Role._meta.natural_key = [ |
218 | 250 |
['uuid'], ['slug', 'ou'], ['name', 'ou'], ['slug', 'service'], ['name', 'service'] |
... | ... | |
250 | 282 |
('role', 'name', 'kind', 'value'), |
251 | 283 |
) |
252 | 284 | |
285 |
def to_json(self): |
|
286 |
return {'name': self.name, 'kind': self.kind, 'value': self.value} |
|
287 | ||
288 | ||
253 | 289 |
GenericRelation(Permission, |
254 | 290 |
content_type_field='target_ct', |
255 | 291 |
object_id_field='target_id').contribute_to_class(ContentType, 'admin_perms') |
src/authentic2/data_transfer.py | ||
---|---|---|
1 |
from django.contrib.contenttypes.models import ContentType |
|
2 | ||
3 |
from django_rbac.models import Operation |
|
4 |
from django_rbac.utils import ( |
|
5 |
get_ou_model, get_role_model, get_role_parenting_model, get_permission_model) |
|
6 |
from authentic2.a2_rbac.models import RoleAttribute |
|
7 |
from authentic2.utils import update_model |
|
8 | ||
9 | ||
10 |
def export_site(): |
|
11 |
return { |
|
12 |
'roles': export_roles(get_role_model().objects.all()), |
|
13 |
'ous': export_ou(get_ou_model().objects.all()) |
|
14 |
} |
|
15 | ||
16 | ||
17 |
def export_ou(ou_query_set): |
|
18 |
return [ou.export_json() for ou in ou_query_set] |
|
19 | ||
20 | ||
21 |
def export_roles(role_queryset): |
|
22 |
""" Serialize roles in role_queryset |
|
23 |
""" |
|
24 |
return [ |
|
25 |
role.export_json(attributes=True, parents=True, permissions=True) |
|
26 |
for role in role_queryset |
|
27 |
] |
|
28 | ||
29 | ||
30 |
def search_ou(ou_d): |
|
31 |
try: |
|
32 |
OU = get_ou_model() |
|
33 |
return OU.objects.get_by_natural_key_json(ou_d) |
|
34 |
except OU.DoesNotExist: |
|
35 |
return None |
|
36 | ||
37 | ||
38 |
def search_role(role_d): |
|
39 |
Role = get_role_model() |
|
40 |
try: |
|
41 |
Role = get_role_model() |
|
42 |
return Role.objects.get_by_natural_key_json(role_d) |
|
43 |
except Role.DoesNotExist: |
|
44 |
return None |
|
45 | ||
46 | ||
47 |
class ImportContext(object): |
|
48 |
""" Holds information on how to perform the import. |
|
49 | ||
50 |
ou_delete_orphans: if True any existing ou that is not found in the export will |
|
51 |
be deleted |
|
52 | ||
53 |
role_delete_orphans: if True any existing role that is not found in the export will |
|
54 |
be deleted |
|
55 | ||
56 | ||
57 |
role_attributes_update: for each role in the import data, |
|
58 |
attributes will deleted and re-created |
|
59 | ||
60 | ||
61 |
role_parentings_update: for each role in the import data, |
|
62 |
parentings will deleted and re-created |
|
63 | ||
64 |
role_permissions_update: for each role in the import data, |
|
65 |
permissions will deleted and re-created |
|
66 |
""" |
|
67 | ||
68 |
def __init__( |
|
69 |
self, role_delete_orphans=False, role_parentings_update=True, |
|
70 |
role_permissions_update=True, role_attributes_update=True, |
|
71 |
ou_delete_orphans=False): |
|
72 |
self.role_delete_orphans = role_delete_orphans |
|
73 |
self.ou_delete_orphans = ou_delete_orphans |
|
74 |
self.role_parentings_update = role_parentings_update |
|
75 |
self.role_permissions_update = role_permissions_update |
|
76 |
self.role_attributes_update = role_attributes_update |
|
77 | ||
78 | ||
79 |
class DataImportError(Exception): |
|
80 |
pass |
|
81 | ||
82 | ||
83 |
class RoleDeserializer(object): |
|
84 | ||
85 |
def __init__(self, d, import_context): |
|
86 |
self._import_context = import_context |
|
87 |
self._obj = None |
|
88 |
self._parents = None |
|
89 |
self._attributes = None |
|
90 |
self._permissions = None |
|
91 | ||
92 |
self._role_d = dict() |
|
93 |
for key, value in d.items(): |
|
94 |
if key == 'parents': |
|
95 |
self._parents = value |
|
96 |
elif key == 'attributes': |
|
97 |
self._attributes = value |
|
98 |
elif key == 'permissions': |
|
99 |
self._permissions = value |
|
100 |
else: |
|
101 |
self._role_d[key] = value |
|
102 | ||
103 |
def deserialize(self): |
|
104 |
ou_d = self._role_d['ou'] |
|
105 |
has_ou = bool(ou_d) |
|
106 |
ou = None if not has_ou else search_ou(ou_d) |
|
107 |
if has_ou and not ou: |
|
108 |
raise DataImportError( |
|
109 |
"Can't import role because missing Organizational Unit : " |
|
110 |
"%s" % ou_d) |
|
111 | ||
112 |
kwargs = self._role_d.copy() |
|
113 |
del kwargs['ou'] |
|
114 |
del kwargs['service'] |
|
115 |
if has_ou: |
|
116 |
kwargs['ou'] = ou |
|
117 | ||
118 |
obj = search_role(self._role_d) |
|
119 |
if obj: # Role already exist |
|
120 |
self._obj = obj |
|
121 |
status = 'updated' |
|
122 |
update_model(self._obj, kwargs) |
|
123 |
else: # Create role |
|
124 |
self._obj = get_role_model().objects.create(**kwargs) |
|
125 |
status = 'created' |
|
126 | ||
127 |
# Ensure admin role is created. |
|
128 |
# Absoluteley necessary to create |
|
129 |
# parentings relationship later on, |
|
130 |
# since we don't deserialize technical role. |
|
131 |
self._obj.get_admin_role() |
|
132 |
return self._obj, status |
|
133 | ||
134 |
def attributes(self): |
|
135 |
""" Update attributes (delete everything then create) |
|
136 |
""" |
|
137 |
created, deleted = [], [] |
|
138 |
for attr in self._obj.attributes.all(): |
|
139 |
attr.delete() |
|
140 |
deleted.append(attr) |
|
141 |
# Create attributes |
|
142 |
if self._attributes: |
|
143 |
for attr_dict in self._attributes: |
|
144 |
attr_dict['role'] = self._obj |
|
145 |
created.append(RoleAttribute.objects.create(**attr_dict)) |
|
146 | ||
147 |
return created, deleted |
|
148 | ||
149 |
def parentings(self): |
|
150 |
""" Update parentings (delete everything then create) |
|
151 |
""" |
|
152 |
created, deleted = [], [] |
|
153 |
Parenting = get_role_parenting_model() |
|
154 |
for parenting in Parenting.objects.filter(child=self._obj, direct=True): |
|
155 |
parenting.delete() |
|
156 |
deleted.append(parenting) |
|
157 | ||
158 |
if self._parents: |
|
159 |
for parent_d in self._parents: |
|
160 |
parent = search_role(parent_d) |
|
161 |
if not parent: |
|
162 |
raise DataImportError("Could not find role : %s" % parent_d) |
|
163 |
created.append(Parenting.objects.create( |
|
164 |
child=self._obj, direct=True, parent=parent)) |
|
165 | ||
166 |
return created, deleted |
|
167 | ||
168 |
def permissions(self): |
|
169 |
""" Update permissions (delete everything then create) |
|
170 |
""" |
|
171 |
created, deleted = [], [] |
|
172 |
for perm in self._obj.permissions.all(): |
|
173 |
perm.delete() |
|
174 |
deleted.append(perm) |
|
175 |
self._obj.permissions.clear() |
|
176 |
if self._permissions: |
|
177 |
for perm in self._permissions: |
|
178 |
op = Operation.objects.get_by_natural_key_json(perm['operation']) |
|
179 |
ou = get_ou_model().objects.get_by_natural_key_json( |
|
180 |
perm['ou']) if perm['ou'] else None |
|
181 |
ct = ContentType.objects.get_by_natural_key_json(perm['target_ct']) |
|
182 |
target = ct.model_class().objects.get_by_natural_key_json(perm['target']) |
|
183 |
perm = get_permission_model().objects.create( |
|
184 |
operation=op, ou=ou, target_ct=ct, target_id=target.pk) |
|
185 |
self._obj.permissions.add(perm) |
|
186 |
created.append(perm) |
|
187 | ||
188 |
return created, deleted |
|
189 | ||
190 | ||
191 |
class ImportResult(object): |
|
192 | ||
193 |
def __init__(self): |
|
194 |
self.roles = {'created': [], 'updated': []} |
|
195 |
self.ous = {'created': [], 'updated': []} |
|
196 |
self.attributes = {'created': [], 'deleted': []} |
|
197 |
self.parentings = {'created': [], 'deleted': []} |
|
198 |
self.permissions = {'created': [], 'deleted': []} |
|
199 | ||
200 |
def update_roles(self, role, d_status): |
|
201 |
self.roles[d_status].append(role) |
|
202 | ||
203 |
def update_ous(self, ou, status): |
|
204 |
self.ous[status].append(ou) |
|
205 | ||
206 |
def _bulk_update(self, attrname, created, deleted): |
|
207 |
attr = getattr(self, attrname) |
|
208 |
attr['created'].extend(created) |
|
209 |
attr['deleted'].extend(deleted) |
|
210 | ||
211 |
def update_attributes(self, created, deleted): |
|
212 |
self._bulk_update('attributes', created, deleted) |
|
213 | ||
214 |
def update_parentings(self, created, deleted): |
|
215 |
self._bulk_update('parentings', created, deleted) |
|
216 | ||
217 |
def update_permissions(self, created, deleted): |
|
218 |
self._bulk_update('permissions', created, deleted) |
|
219 | ||
220 |
def to_str(self, verbose=False): |
|
221 |
res = "" |
|
222 |
for attr in ('roles', 'ous', 'parentings', 'permissions', 'attributes'): |
|
223 |
data = getattr(self, attr) |
|
224 |
for status in ('created', 'updated', 'deleted'): |
|
225 |
if status in data: |
|
226 |
s_data = data[status] |
|
227 |
res += "%s %s %s\n" % (len(s_data), attr, status) |
|
228 |
return res |
|
229 | ||
230 | ||
231 |
def import_ou(ou_d): |
|
232 |
OU = get_ou_model() |
|
233 |
# ou = search_ou([ou_d['slug']]) |
|
234 |
ou = search_ou(ou_d) |
|
235 |
if ou is None: |
|
236 |
ou = OU.objects.create(**ou_d) |
|
237 |
status = 'created' |
|
238 |
else: |
|
239 |
update_model(ou, ou_d) |
|
240 |
status = 'updated' |
|
241 |
# Ensure admin role is created |
|
242 |
ou.get_admin_role() |
|
243 |
return ou, status |
|
244 | ||
245 | ||
246 |
def import_site(json_d, import_context): |
|
247 |
result = ImportResult() |
|
248 | ||
249 |
for ou_d in json_d.get('ous', []): |
|
250 |
result.update_ous(*import_ou(ou_d)) |
|
251 | ||
252 |
roles_ds = [RoleDeserializer(role_d, import_context) for role_d in json_d.get('roles', []) |
|
253 |
if not role_d['slug'].startswith('_')] |
|
254 | ||
255 |
for ds in roles_ds: |
|
256 |
result.update_roles(*ds.deserialize()) |
|
257 | ||
258 |
if import_context.role_attributes_update: |
|
259 |
for ds in roles_ds: |
|
260 |
result.update_attributes(*ds.attributes()) |
|
261 | ||
262 |
if import_context.role_parentings_update: |
|
263 |
for ds in roles_ds: |
|
264 |
result.update_parentings(*ds.parentings()) |
|
265 | ||
266 |
if import_context.role_permissions_update: |
|
267 |
for ds in roles_ds: |
|
268 |
result.update_permissions(*ds.permissions()) |
|
269 | ||
270 |
if import_context.ou_delete_orphans: |
|
271 |
raise DataImportError( |
|
272 |
"Unsupported context value for ou_delete_orphans : %s" % ( |
|
273 |
import_context.ou_delete_orphans)) |
|
274 | ||
275 |
if import_context.role_delete_orphans: |
|
276 |
# FIXME : delete each role that is in DB but not in the export |
|
277 |
raise DataImportError( |
|
278 |
"Unsupported context value for role_delete_orphans : %s" % ( |
|
279 |
import_context.role_delete_orphans)) |
|
280 | ||
281 |
return result |
src/authentic2/management/commands/export_site.py | ||
---|---|---|
1 |
import json |
|
2 |
import sys |
|
3 | ||
4 |
from django.core.management.base import BaseCommand |
|
5 | ||
6 |
from authentic2.data_transfer import export_site |
|
7 |
from django_rbac.utils import get_role_model |
|
8 | ||
9 | ||
10 |
class Command(BaseCommand): |
|
11 |
help = 'Export site' |
|
12 | ||
13 |
def add_arguments(self, parser): |
|
14 |
parser.add_argument('--output', metavar='FILE', default=None, |
|
15 |
help='name of a file to write output to') |
|
16 | ||
17 |
def handle(self, *args, **options): |
|
18 |
if options['output']: |
|
19 |
output, close = open(options['output'], 'w'), True |
|
20 |
else: |
|
21 |
output, close = sys.stdout, False |
|
22 |
json.dump(export_site(), output, indent=4) |
|
23 |
if close: |
|
24 |
output.close() |
src/authentic2/management/commands/import_site.py | ||
---|---|---|
1 |
import contextlib |
|
2 |
import json |
|
3 |
import sys |
|
4 | ||
5 |
from django.conf import settings |
|
6 |
from django.core.management.base import BaseCommand |
|
7 |
from django.db import transaction |
|
8 |
from django.utils import translation |
|
9 | ||
10 |
from authentic2.data_transfer import import_site, ImportContext |
|
11 | ||
12 | ||
13 |
class DryRunException(Exception): |
|
14 |
pass |
|
15 | ||
16 | ||
17 |
def create_context_args(options): |
|
18 |
kwargs = {} |
|
19 |
if options['option']: |
|
20 |
for context_op in options['option']: |
|
21 |
context_op = context_op.replace('-', '_') |
|
22 |
if context_op.startswith('no_'): |
|
23 |
kwargs[context_op[3:]] = False |
|
24 |
else: |
|
25 |
kwargs[context_op] = True |
|
26 |
return kwargs |
|
27 | ||
28 | ||
29 |
# Borrowed from https://bugs.python.org/issue10049#msg118599 |
|
30 |
@contextlib.contextmanager |
|
31 |
def provision_contextm(dry_run, settings): |
|
32 |
if dry_run and 'hobo.agent.authentic2' in settings.INSTALLED_APPS: |
|
33 |
import hobo.agent.authentic2 |
|
34 |
with hobo.agent.authentic2.provisionning.Provisionning(): |
|
35 |
yield |
|
36 |
else: |
|
37 |
yield |
|
38 | ||
39 | ||
40 |
class Command(BaseCommand): |
|
41 |
help = 'Import site' |
|
42 | ||
43 |
def add_arguments(self, parser): |
|
44 |
parser.add_argument( |
|
45 |
'filename', metavar='FILENAME', type=str, help='name of file to import') |
|
46 |
parser.add_argument( |
|
47 |
'--dry-run', action='store_true', dest='dry_run', help='Really perform the import') |
|
48 |
parser.add_argument( |
|
49 |
'-o', '--option', action='append', help='Import context options', |
|
50 |
choices=[ |
|
51 |
'role-delete-orphans', 'ou-delete-orphans', 'no-role-permissions-update', |
|
52 |
'no-role-attributes-update', 'no-role-parentings-update']) |
|
53 | ||
54 |
def handle(self, filename, **options): |
|
55 |
translation.activate(settings.LANGUAGE_CODE) |
|
56 |
dry_run = options['dry_run'] |
|
57 |
msg = "Dry run\n" if dry_run else "Real run\n" |
|
58 |
c_kwargs = create_context_args(options) |
|
59 |
try: |
|
60 |
with open(filename, 'r') as f: |
|
61 |
with provision_contextm(dry_run, settings): |
|
62 |
with transaction.atomic(): |
|
63 |
sys.stdout.write(msg) |
|
64 |
result = import_site(json.load(f), ImportContext(**c_kwargs)) |
|
65 |
if dry_run: |
|
66 |
raise DryRunException() |
|
67 |
except DryRunException: |
|
68 |
pass |
|
69 |
sys.stdout.write(result.to_str()) |
|
70 |
sys.stdout.write("Success\n") |
|
71 |
translation.deactivate() |
src/authentic2/utils.py | ||
---|---|---|
1047 | 1047 |
context=ctx, |
1048 | 1048 |
legacy_subject_templates=legacy_subject_templates, |
1049 | 1049 |
legacy_body_templates=legacy_body_templates) |
1050 | ||
1051 | ||
1052 |
def update_model(obj, d): |
|
1053 |
for attr, value in d.items(): |
|
1054 |
setattr(obj, attr, value) |
src/django_rbac/models.py | ||
---|---|---|
114 | 114 |
def __unicode__(self): |
115 | 115 |
return unicode(_(self.name)) |
116 | 116 | |
117 |
def export_json(self): |
|
118 |
return {'slug': self.slug, 'name': self.name} |
|
119 | ||
117 | 120 |
objects = managers.OperationManager() |
118 | 121 | |
119 | 122 | |
... | ... | |
145 | 148 |
self.target and self.target_ct.natural_key(), |
146 | 149 |
self.target and self.target.natural_key()] |
147 | 150 | |
151 |
def export_json(self): |
|
152 |
return { |
|
153 |
"operation": self.operation.natural_key_json(), |
|
154 |
"ou": self.ou and self.ou.natural_key_json(), |
|
155 |
'target_ct': self.target_ct.natural_key_json(), |
|
156 |
"target": self.target.natural_key_json() |
|
157 |
} |
|
158 | ||
148 | 159 |
def __unicode__(self): |
149 | 160 |
ct = ContentType.objects.get_for_id(self.target_ct_id) |
150 | 161 |
ct_ct = ContentType.objects.get_for_model(ContentType) |
tests/test_a2_rbac.py | ||
---|---|---|
1 | 1 |
import pytest |
2 | 2 | |
3 |
from django.contrib.contenttypes.models import ContentType |
|
4 |
from django_rbac.utils import get_permission_model |
|
5 |
from django_rbac.models import Operation |
|
6 |
from authentic2.a2_rbac.models import Role, OrganizationalUnit as OU, RoleAttribute |
|
3 | 7 |
from authentic2.models import Service |
4 |
from authentic2.a2_rbac.models import Role, OrganizationalUnit as OU
|
|
8 |
from authentic2.utils import get_hex_uuid
|
|
5 | 9 | |
6 | 10 | |
7 | 11 |
def test_role_natural_key(db): |
... | ... | |
24 | 28 |
Role.objects.get_by_natural_key(*r2.natural_key()) |
25 | 29 |
with pytest.raises(Role.DoesNotExist): |
26 | 30 |
Role.objects.get_by_natural_key(*r4.natural_key()) |
31 | ||
32 | ||
33 |
def test_basic_role_export_json(db): |
|
34 |
role = Role.objects.create( |
|
35 |
name='basic role', slug='basic-role', description='basic role description') |
|
36 |
role_dict = role.export_json() |
|
37 |
assert role_dict['name'] == role.name |
|
38 |
assert role_dict['slug'] == role.slug |
|
39 |
assert role_dict['uuid'] == role.uuid |
|
40 |
assert role_dict['description'] == role.description |
|
41 |
assert role_dict['external_id'] == role.external_id |
|
42 |
assert role_dict['ou'] is None |
|
43 |
assert role_dict['service'] is None |
|
44 | ||
45 | ||
46 |
def test_role_with_ou_export_json(db): |
|
47 |
ou = OU.objects.create(name='ou', slug='ou') |
|
48 |
role = Role.objects.create(name='some role', ou=ou) |
|
49 |
role_dict = role.export_json() |
|
50 |
assert role_dict['ou'] == {'uuid': ou.uuid, 'slug': ou.slug, 'name': ou.name} |
|
51 | ||
52 | ||
53 |
def test_role_with_service_export_json(db): |
|
54 |
service = Service.objects.create(name='service name', slug='service-name') |
|
55 |
role = Role.objects.create(name='some role', service=service) |
|
56 |
role_dict = role.export_json() |
|
57 |
assert role_dict['service'] == {'slug': service.slug, 'ou': None} |
|
58 | ||
59 | ||
60 |
def test_role_with_service_with_ou_export_json(db): |
|
61 |
ou = OU.objects.create(name='ou', slug='ou') |
|
62 |
service = Service.objects.create(name='service name', slug='service-name', ou=ou) |
|
63 |
role = Role.objects.create(name='some role', service=service) |
|
64 |
role_dict = role.export_json() |
|
65 |
assert role_dict['service'] == { |
|
66 |
'slug': service.slug, 'ou': {'uuid': ou.uuid, 'slug': 'ou', 'name': 'ou'}} |
|
67 | ||
68 | ||
69 |
def test_role_with_attributes_export_json(db): |
|
70 |
role = Role.objects.create(name='some role') |
|
71 |
attr1 = RoleAttribute.objects.create( |
|
72 |
role=role, name='attr1_name', kind='string', value='attr1_value') |
|
73 |
attr2 = RoleAttribute.objects.create( |
|
74 |
role=role, name='attr2_name', kind='string', value='attr2_value') |
|
75 | ||
76 |
role_dict = role.export_json(attributes=True) |
|
77 |
attributes = role_dict['attributes'] |
|
78 |
assert len(attributes) == 2 |
|
79 | ||
80 |
expected_attr_names = set([attr1.name, attr2.name]) |
|
81 |
for attr_dict in attributes: |
|
82 |
assert attr_dict['name'] in expected_attr_names |
|
83 |
expected_attr_names.remove(attr_dict['name']) |
|
84 |
target_attr = RoleAttribute.objects.filter(name=attr_dict['name']).first() |
|
85 |
assert attr_dict['kind'] == target_attr.kind |
|
86 |
assert attr_dict['value'] == target_attr.value |
|
87 | ||
88 | ||
89 |
def test_role_with_parents_export_json(db): |
|
90 |
grand_parent_role = Role.objects.create( |
|
91 |
name='test grand parent role', slug='test-grand-parent-role') |
|
92 |
parent_1_role = Role.objects.create( |
|
93 |
name='test parent 1 role', slug='test-parent-1-role') |
|
94 |
parent_1_role.add_parent(grand_parent_role) |
|
95 |
parent_2_role = Role.objects.create( |
|
96 |
name='test parent 2 role', slug='test-parent-2-role') |
|
97 |
parent_2_role.add_parent(grand_parent_role) |
|
98 |
child_role = Role.objects.create( |
|
99 |
name='test child role', slug='test-child-role') |
|
100 |
child_role.add_parent(parent_1_role) |
|
101 |
child_role.add_parent(parent_2_role) |
|
102 | ||
103 |
child_role_dict = child_role.export_json(parents=True) |
|
104 |
assert child_role_dict['slug'] == child_role.slug |
|
105 |
parents = child_role_dict['parents'] |
|
106 |
assert len(parents) == 2 |
|
107 |
expected_slugs = set([parent_1_role.slug, parent_2_role.slug]) |
|
108 |
for parent in parents: |
|
109 |
assert parent['slug'] in expected_slugs |
|
110 |
expected_slugs.remove(parent['slug']) |
|
111 | ||
112 |
grand_parent_role_dict = grand_parent_role.export_json(parents=True) |
|
113 |
assert grand_parent_role_dict['slug'] == grand_parent_role.slug |
|
114 |
assert 'parents' not in grand_parent_role_dict |
|
115 | ||
116 |
parent_1_role_dict = parent_1_role.export_json(parents=True) |
|
117 |
assert parent_1_role_dict['slug'] == parent_1_role.slug |
|
118 |
parents = parent_1_role_dict['parents'] |
|
119 |
assert len(parents) == 1 |
|
120 |
assert parents[0]['slug'] == grand_parent_role.slug |
|
121 | ||
122 |
parent_2_role_dict = parent_2_role.export_json(parents=True) |
|
123 |
assert parent_2_role_dict['slug'] == parent_2_role.slug |
|
124 |
parents = parent_2_role_dict['parents'] |
|
125 |
assert len(parents) == 1 |
|
126 |
assert parents[0]['slug'] == grand_parent_role.slug |
|
127 | ||
128 | ||
129 |
def test_role_with_permission_export_json(db): |
|
130 |
some_ou = OU.objects.create(name='some ou', slug='some-ou') |
|
131 |
role = Role.objects.create(name='role name', slug='role-slug') |
|
132 |
other_role = Role.objects.create( |
|
133 |
name='other role name', slug='other-role-slug', uuid=get_hex_uuid(), ou=some_ou) |
|
134 |
ou = OU.objects.create(name='basic ou', slug='basic-ou', description='basic ou description') |
|
135 |
Permission = get_permission_model() |
|
136 |
op = Operation.objects.first() |
|
137 |
perm_saml = Permission.objects.create( |
|
138 |
operation=op, ou=ou, |
|
139 |
target_ct=ContentType.objects.get_for_model(ContentType), |
|
140 |
target_id=ContentType.objects.get(app_label="saml", model="libertyprovider").pk) |
|
141 |
role.permissions.add(perm_saml) |
|
142 |
perm_role = Permission.objects.create( |
|
143 |
operation=op, ou=None, |
|
144 |
target_ct=ContentType.objects.get_for_model(Role), |
|
145 |
target_id=other_role.pk) |
|
146 |
role.permissions.add(perm_role) |
|
147 | ||
148 |
export = role.export_json(permissions=True) |
|
149 |
permissions = export['permissions'] |
|
150 |
assert len(permissions) == 2 |
|
151 |
assert permissions[0] == { |
|
152 |
'operation': {'slug': 'add'}, |
|
153 |
'ou': {'uuid': ou.uuid, 'slug': ou.slug, 'name': ou.name}, |
|
154 |
'target_ct': {'app_label': u'contenttypes', 'model': u'contenttype'}, |
|
155 |
'target': {'model': u'libertyprovider', 'app_label': u'saml'} |
|
156 |
} |
|
157 |
assert permissions[1] == { |
|
158 |
'operation': {'slug': 'add'}, |
|
159 |
'ou': None, |
|
160 |
'target_ct': {'app_label': u'a2_rbac', 'model': u'role'}, |
|
161 |
'target': { |
|
162 |
'slug': u'other-role-slug', 'service': None, 'uuid': other_role.uuid, |
|
163 |
'ou': { |
|
164 |
'slug': u'some-ou', 'uuid': some_ou.uuid, 'name': u'some ou' |
|
165 |
}, |
|
166 |
'name': u'other role name'} |
|
167 |
} |
|
168 | ||
169 | ||
170 |
def test_ou_export_json(db): |
|
171 |
ou = OU.objects.create( |
|
172 |
name='basic ou', slug='basic-ou', description='basic ou description', |
|
173 |
username_is_unique=True, email_is_unique=True, default=False, validate_emails=True) |
|
174 |
ou_dict = ou.export_json() |
|
175 |
assert ou_dict['name'] == ou.name |
|
176 |
assert ou_dict['slug'] == ou.slug |
|
177 |
assert ou_dict['uuid'] == ou.uuid |
|
178 |
assert ou_dict['description'] == ou.description |
|
179 |
assert ou_dict['username_is_unique'] == ou.username_is_unique |
|
180 |
assert ou_dict['email_is_unique'] == ou.email_is_unique |
|
181 |
assert ou_dict['default'] == ou.default |
|
182 |
assert ou_dict['validate_emails'] == ou.validate_emails |
tests/test_data_transfer.py | ||
---|---|---|
1 |
import json |
|
2 | ||
3 |
from django_rbac.utils import get_role_model, get_ou_model |
|
4 |
import py |
|
5 |
import pytest |
|
6 | ||
7 |
from authentic2.a2_rbac.models import RoleParenting |
|
8 |
from authentic2.data_transfer import ( |
|
9 |
DataImportError, export_roles, import_site, export_ou, ImportContext, |
|
10 |
RoleDeserializer, search_role, import_ou) |
|
11 |
from authentic2.utils import get_hex_uuid |
|
12 | ||
13 | ||
14 |
Role = get_role_model() |
|
15 |
OU = get_ou_model() |
|
16 | ||
17 | ||
18 |
def test_export_basic_role(db): |
|
19 |
role = Role.objects.create(name='basic role', slug='basic-role', uuid=get_hex_uuid()) |
|
20 |
query_set = Role.objects.filter(uuid=role.uuid) |
|
21 |
roles = export_roles(query_set) |
|
22 |
assert len(roles) == 1 |
|
23 |
role_dict = roles[0] |
|
24 |
for key, value in role.export_json().items(): |
|
25 |
assert role_dict[key] == value |
|
26 | ||
27 | ||
28 |
def test_export_role_with_parents(db): |
|
29 |
grand_parent_role = Role.objects.create( |
|
30 |
name='test grand parent role', slug='test-grand-parent-role', uuid=get_hex_uuid()) |
|
31 |
parent_1_role = Role.objects.create( |
|
32 |
name='test parent 1 role', slug='test-parent-1-role', uuid=get_hex_uuid()) |
|
33 |
parent_1_role.add_parent(grand_parent_role) |
|
34 |
parent_2_role = Role.objects.create( |
|
35 |
name='test parent 2 role', slug='test-parent-2-role', uuid=get_hex_uuid()) |
|
36 |
parent_2_role.add_parent(grand_parent_role) |
|
37 |
child_role = Role.objects.create( |
|
38 |
name='test child role', slug='test-child-role', uuid=get_hex_uuid()) |
|
39 |
child_role.add_parent(parent_1_role) |
|
40 |
child_role.add_parent(parent_2_role) |
|
41 | ||
42 |
query_set = Role.objects.filter(slug__startswith='test').order_by('slug') |
|
43 |
roles = export_roles(query_set) |
|
44 |
assert len(roles) == 4 |
|
45 | ||
46 |
child_role_dict = roles[0] |
|
47 |
assert child_role_dict['slug'] == child_role.slug |
|
48 |
parents = child_role_dict['parents'] |
|
49 |
assert len(parents) == 2 |
|
50 |
expected_slugs = set([parent_1_role.slug, parent_2_role.slug]) |
|
51 |
for parent in parents: |
|
52 |
assert parent['slug'] in expected_slugs |
|
53 |
expected_slugs.remove(parent['slug']) |
|
54 | ||
55 |
grand_parent_role_dict = roles[1] |
|
56 |
assert grand_parent_role_dict['slug'] == grand_parent_role.slug |
|
57 | ||
58 |
parent_1_role_dict = roles[2] |
|
59 |
assert parent_1_role_dict['slug'] == parent_1_role.slug |
|
60 |
parents = parent_1_role_dict['parents'] |
|
61 |
assert len(parents) == 1 |
|
62 |
assert parents[0]['slug'] == grand_parent_role.slug |
|
63 | ||
64 |
parent_2_role_dict = roles[3] |
|
65 |
assert parent_2_role_dict['slug'] == parent_2_role.slug |
|
66 |
parents = parent_2_role_dict['parents'] |
|
67 |
assert len(parents) == 1 |
|
68 |
assert parents[0]['slug'] == grand_parent_role.slug |
|
69 | ||
70 | ||
71 |
def test_export_ou(db): |
|
72 |
ou = OU.objects.create(name='ou name', slug='ou-slug', description='ou description') |
|
73 |
ous = export_ou(OU.objects.filter(name='ou name')) |
|
74 |
assert len(ous) == 1 |
|
75 |
ou_d = ous[0] |
|
76 |
assert ou_d['name'] == ou.name |
|
77 |
assert ou_d['slug'] == ou.slug |
|
78 |
assert ou_d['description'] == ou.description |
|
79 | ||
80 | ||
81 |
def test_search_role_by_uuid(db): |
|
82 |
uuid = get_hex_uuid() |
|
83 |
role_d = {'uuid': uuid, 'slug': 'role-slug'} |
|
84 |
role = Role.objects.create(**role_d) |
|
85 |
assert role == search_role({'uuid': uuid, 'slug': 'other-role-slug'}) |
|
86 | ||
87 | ||
88 |
def test_search_role_by_slug(db): |
|
89 |
role_d = {'uuid': get_hex_uuid(), 'slug': 'role-slug'} |
|
90 |
role = Role.objects.create(**role_d) |
|
91 |
assert role == search_role({ |
|
92 |
'uuid': get_hex_uuid(), 'slug': 'role-slug', |
|
93 |
'ou': None, 'service': None}) |
|
94 | ||
95 | ||
96 |
def test_search_role_not_found(db): |
|
97 |
assert search_role( |
|
98 |
{ |
|
99 |
'uuid': get_hex_uuid(), 'slug': 'role-slug', 'name': 'role name', |
|
100 |
'ou': None, 'service': None}) is None |
|
101 | ||
102 | ||
103 |
def test_search_role_slug_not_unique(db): |
|
104 |
role1_d = {'uuid': get_hex_uuid(), 'slug': 'role-slug', 'name': 'role name'} |
|
105 |
role2_d = {'uuid': get_hex_uuid(), 'slug': 'role-slug', 'name': 'role name'} |
|
106 |
ou = OU.objects.create(name='some ou', slug='some-ou') |
|
107 |
role1 = Role.objects.create(ou=ou, **role1_d) |
|
108 |
Role.objects.create(**role2_d) |
|
109 |
assert role1 == search_role(role1.export_json()) |
|
110 | ||
111 | ||
112 |
def test_role_deserializer(db): |
|
113 |
rd = RoleDeserializer({ |
|
114 |
'name': 'some role', 'description': 'some role description', 'slug': 'some-role', |
|
115 |
'uuid': get_hex_uuid(), 'ou': None, 'service': None}, ImportContext()) |
|
116 |
assert rd._parents is None |
|
117 |
assert rd._attributes is None |
|
118 |
assert rd._obj is None |
|
119 |
role, status = rd.deserialize() |
|
120 |
assert status == 'created' |
|
121 |
assert role.name == 'some role' |
|
122 |
assert role.description == 'some role description' |
|
123 |
assert role.slug == 'some-role' |
|
124 |
assert rd._obj == role |
|
125 | ||
126 | ||
127 |
def test_role_deserializer_with_ou(db): |
|
128 |
ou = OU.objects.create(name='some ou', slug='some-ou') |
|
129 |
rd = RoleDeserializer({ |
|
130 |
'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'some role description', |
|
131 |
'slug': 'some-role', 'ou': {'slug': 'some-ou'}, 'service': None}, ImportContext()) |
|
132 |
role, status = rd.deserialize() |
|
133 |
assert role.ou == ou |
|
134 | ||
135 | ||
136 |
def test_role_deserializer_missing_ou(db): |
|
137 |
rd = RoleDeserializer({ |
|
138 |
'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'role description', |
|
139 |
'slug': 'some-role', 'ou': {'slug': 'some-ou'}, 'service': None}, |
|
140 |
ImportContext()) |
|
141 |
with pytest.raises(DataImportError): |
|
142 |
rd.deserialize() |
|
143 | ||
144 | ||
145 |
def test_role_deserializer_update_ou(db): |
|
146 |
ou1 = OU.objects.create(name='ou 1', slug='ou-1') |
|
147 |
ou2 = OU.objects.create(name='ou 2', slug='ou-2') |
|
148 |
uuid = get_hex_uuid() |
|
149 |
existing_role = Role.objects.create(uuid=uuid, slug='some-role', ou=ou1) |
|
150 |
rd = RoleDeserializer({ |
|
151 |
'uuid': uuid, 'name': 'some-role', 'slug': 'some-role', |
|
152 |
'ou': {'slug': 'ou-2'}, 'service': None}, ImportContext()) |
|
153 |
role, status = rd.deserialize() |
|
154 |
assert role == existing_role |
|
155 |
assert role.ou == ou2 |
|
156 | ||
157 | ||
158 |
def test_role_deserializer_update_fields(db): |
|
159 |
uuid = get_hex_uuid() |
|
160 |
existing_role = Role.objects.create(uuid=uuid, slug='some-role', name='some role') |
|
161 |
rd = RoleDeserializer({ |
|
162 |
'uuid': uuid, 'slug': 'some-role', 'name': 'some role changed', |
|
163 |
'ou': None, 'service': None}, ImportContext()) |
|
164 |
role, status = rd.deserialize() |
|
165 |
assert role == existing_role |
|
166 |
assert role.name == 'some role changed' |
|
167 | ||
168 | ||
169 |
def test_role_deserializer_with_attributes(db): |
|
170 | ||
171 |
attributes_data = { |
|
172 |
'attr1_name': dict(name='attr1_name', kind='string', value='attr1_value'), |
|
173 |
'attr2_name': dict(name='attr2_name', kind='string', value='attr2_value') |
|
174 |
} |
|
175 |
rd = RoleDeserializer({ |
|
176 |
'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'some role description', |
|
177 |
'slug': 'some-role', 'attributes': list(attributes_data.values()), |
|
178 |
'ou': None, 'service': None}, ImportContext()) |
|
179 |
role, status = rd.deserialize() |
|
180 |
created, deleted = rd.attributes() |
|
181 |
assert role.attributes.count() == 2 |
|
182 |
assert len(created) == 2 |
|
183 | ||
184 |
for attr in created: |
|
185 |
attr_dict = attributes_data[attr.name] |
|
186 |
assert attr_dict['name'] == attr.name |
|
187 |
assert attr_dict['kind'] == attr.kind |
|
188 |
assert attr_dict['value'] == attr.value |
|
189 |
del attributes_data[attr.name] |
|
190 | ||
191 | ||
192 |
def test_role_deserializer_creates_admin_role(db): |
|
193 |
role_dict = { |
|
194 |
'name': 'some role', 'slug': 'some-role', 'uuid': get_hex_uuid(), |
|
195 |
'ou': None, 'service': None} |
|
196 |
rd = RoleDeserializer(role_dict, ImportContext()) |
|
197 |
rd.deserialize() |
|
198 |
Role.objects.get(slug='_a2-managers-of-role-some-role') |
|
199 | ||
200 | ||
201 |
def test_role_deserializer_parenting_existing_parent(db): |
|
202 |
parent_role_dict = { |
|
203 |
'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(), |
|
204 |
'ou': None, 'service': None} |
|
205 |
parent_role = Role.objects.create(**parent_role_dict) |
|
206 |
child_role_dict = { |
|
207 |
'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict], |
|
208 |
'uuid': get_hex_uuid(), 'ou': None, 'service': None} |
|
209 | ||
210 |
rd = RoleDeserializer(child_role_dict, ImportContext()) |
|
211 |
child_role, status = rd.deserialize() |
|
212 |
created, deleted = rd.parentings() |
|
213 | ||
214 |
assert len(created) == 1 |
|
215 |
parenting = created[0] |
|
216 |
assert parenting.direct is True |
|
217 |
assert parenting.parent == parent_role |
|
218 |
assert parenting.child == child_role |
|
219 | ||
220 | ||
221 |
def test_role_deserializer_parenting_non_existing_parent(db): |
|
222 |
parent_role_dict = { |
|
223 |
'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(), |
|
224 |
'ou': None, 'service': None} |
|
225 |
child_role_dict = { |
|
226 |
'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict], |
|
227 |
'uuid': get_hex_uuid(), 'ou': None, 'service': None} |
|
228 |
rd = RoleDeserializer(child_role_dict, ImportContext()) |
|
229 |
rd.deserialize() |
|
230 |
with pytest.raises(DataImportError) as excinfo: |
|
231 |
rd.parentings() |
|
232 | ||
233 |
assert "Could not find role" in str(excinfo.value) |
|
234 | ||
235 | ||
236 |
def test_role_deserializer_permissions(db): |
|
237 |
ou = OU.objects.create(slug='some-ou') |
|
238 |
other_role_dict = { |
|
239 |
'name': 'other role', 'slug': 'other-role-slug', 'uuid': get_hex_uuid(), 'ou': ou} |
|
240 |
other_role = Role.objects.create(**other_role_dict) |
|
241 |
other_role_dict['permisison'] = { |
|
242 |
"operation": { |
|
243 |
"slug": "admin" |
|
244 |
}, |
|
245 |
"ou": { |
|
246 |
"slug": "default", |
|
247 |
"name": "Collectivit\u00e9 par d\u00e9faut" |
|
248 |
}, |
|
249 |
'target_ct': {'app_label': u'a2_rbac', 'model': u'role'}, |
|
250 |
"target": { |
|
251 |
"slug": "role-deux", |
|
252 |
"ou": { |
|
253 |
"slug": "default", |
|
254 |
"name": "Collectivit\u00e9 par d\u00e9faut" |
|
255 |
}, |
|
256 |
"service": None, |
|
257 |
"name": "role deux" |
|
258 |
} |
|
259 |
} |
|
260 |
some_role_dict = { |
|
261 |
'name': 'some role', 'slug': 'some-role', 'uuid': get_hex_uuid(), |
|
262 |
'ou': None, 'service': None} |
|
263 |
some_role_dict['permissions'] = [ |
|
264 |
{ |
|
265 |
'operation': {'slug': 'add'}, |
|
266 |
'ou': None, |
|
267 |
'target_ct': {'app_label': u'a2_rbac', 'model': u'role'}, |
|
268 |
'target': { |
|
269 |
"slug": u'other-role-slug', 'ou': {'slug': 'some-ou'}, 'service': None} |
|
270 |
} |
|
271 |
] |
|
272 | ||
273 |
import_context = ImportContext() |
|
274 |
rd = RoleDeserializer(some_role_dict, import_context) |
|
275 |
rd.deserialize() |
|
276 |
perm_created, perm_deleted = rd.permissions() |
|
277 | ||
278 |
assert len(perm_created) == 1 |
|
279 |
assert len(perm_deleted) == 0 |
|
280 |
del some_role_dict['permissions'] |
|
281 |
role = Role.objects.get(slug=some_role_dict['slug']) |
|
282 |
assert role.permissions.count() == 1 |
|
283 |
perm = role.permissions.first() |
|
284 |
assert perm.operation.slug == 'add' |
|
285 |
assert not perm.ou |
|
286 |
assert perm.target == other_role |
|
287 | ||
288 |
# that one should delete permissions |
|
289 |
rd = RoleDeserializer(some_role_dict, import_context) |
|
290 |
role, _ = rd.deserialize() |
|
291 |
perm_created, perm_deleted = rd.permissions() |
|
292 |
assert role.permissions.count() == 0 |
|
293 |
assert len(perm_created) == 0 |
|
294 |
assert len(perm_deleted) == 1 |
|
295 | ||
296 | ||
297 |
def test_permission_on_role(db): |
|
298 |
perm_ou = OU.objects.create(slug='perm-ou', name='perm ou') |
|
299 |
perm_role = Role.objects.create(slug='perm-role', ou=perm_ou, name='perm role') |
|
300 | ||
301 |
some_role_dict = { |
|
302 |
'name': 'some role', 'slug': 'some-role-slug', 'ou': None, 'service': None} |
|
303 |
some_role_dict['permissions'] = [{ |
|
304 |
"operation": { |
|
305 |
"slug": "admin" |
|
306 |
}, |
|
307 |
"ou": { |
|
308 |
"slug": "perm-ou", |
|
309 |
"name": "perm-ou" |
|
310 |
}, |
|
311 |
'target_ct': {'app_label': u'a2_rbac', 'model': u'role'}, |
|
312 |
"target": { |
|
313 |
"slug": "perm-role", |
|
314 |
"ou": { |
|
315 |
"slug": "perm-ou", |
|
316 |
"name": "perm ou" |
|
317 |
}, |
|
318 |
"service": None, |
|
319 |
"name": "perm role" |
|
320 |
} |
|
321 |
}] |
|
322 | ||
323 |
import_context = ImportContext() |
|
324 |
rd = RoleDeserializer(some_role_dict, import_context) |
|
325 |
rd.deserialize() |
|
326 |
perm_created, perm_deleted = rd.permissions() |
|
327 |
assert len(perm_created) == 1 |
|
328 |
perm = perm_created[0] |
|
329 |
assert perm.target == perm_role |
|
330 |
assert perm.ou == perm_ou |
|
331 |
assert perm.operation.slug == 'admin' |
|
332 | ||
333 | ||
334 |
def test_permission_on_contentype(db): |
|
335 |
perm_ou = OU.objects.create(slug='perm-ou', name='perm ou') |
|
336 |
some_role_dict = { |
|
337 |
'name': 'some role', 'slug': 'some-role-slug', 'ou': None, 'service': None} |
|
338 |
some_role_dict['permissions'] = [{ |
|
339 |
"operation": { |
|
340 |
"slug": "admin" |
|
341 |
}, |
|
342 |
"ou": { |
|
343 |
"slug": "perm-ou", |
|
344 |
"name": "perm-ou" |
|
345 |
}, |
|
346 |
'target_ct': {"model": "contenttype", "app_label": "contenttypes"}, |
|
347 |
"target": {"model": "logentry", "app_label": "admin"} |
|
348 |
}] |
|
349 | ||
350 |
import_context = ImportContext() |
|
351 |
rd = RoleDeserializer(some_role_dict, import_context) |
|
352 |
rd.deserialize() |
|
353 |
perm_created, perm_deleted = rd.permissions() |
|
354 |
assert len(perm_created) == 1 |
|
355 |
perm = perm_created[0] |
|
356 |
assert perm.target.app_label == 'admin' |
|
357 |
assert perm.target.model == 'logentry' |
|
358 |
assert perm.ou == perm_ou |
|
359 | ||
360 | ||
361 |
def import_ou_created(db): |
|
362 |
uuid = get_hex_uuid() |
|
363 |
ou_d = {'uuid': uuid, 'slug': 'ou-slug', 'name': 'ou name'} |
|
364 |
ou, status = import_ou(ou_d) |
|
365 |
assert status == 'created' |
|
366 |
assert ou.uuid == ou_d['uuid'] |
|
367 |
assert ou.slug == ou_d['slug'] |
|
368 |
assert ou.name == ou_d['name'] |
|
369 | ||
370 | ||
371 |
def import_ou_updated(db): |
|
372 |
ou = OU.objects.create(slug='some-ou', name='ou name') |
|
373 |
ou_d = {'uuid': ou.uuid, 'slug': ou.slug, 'name': 'new name'} |
|
374 |
ou_updated, status = import_ou(ou_d) |
|
375 |
assert status == 'updated' |
|
376 |
assert ou == ou_updated |
|
377 |
assert ou.name == 'new name' |
|
378 | ||
379 | ||
380 |
def testi_import_site_empty(): |
|
381 |
res = import_site({}, ImportContext()) |
|
382 |
assert res.roles == {'created': [], 'updated': []} |
|
383 |
assert res.ous == {'created': [], 'updated': []} |
|
384 |
assert res.parentings == {'created': [], 'deleted': []} |
|
385 | ||
386 | ||
387 |
def test_import_site_roles(db): |
|
388 |
parent_role_dict = { |
|
389 |
'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(), |
|
390 |
'ou': None, 'service': None} |
|
391 |
child_role_dict = { |
|
392 |
'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict], |
|
393 |
'uuid': get_hex_uuid(), 'ou': None, 'service': None} |
|
394 |
roles = [ |
|
395 |
parent_role_dict, |
|
396 |
child_role_dict |
|
397 |
] |
|
398 |
res = import_site({'roles': roles}, ImportContext()) |
|
399 |
created_roles = res.roles['created'] |
|
400 |
assert len(created_roles) == 2 |
|
401 |
parent_role = Role.objects.get(**parent_role_dict) |
|
402 |
del child_role_dict['parents'] |
|
403 |
child_role = Role.objects.get(**child_role_dict) |
|
404 |
assert created_roles[0] == parent_role |
|
405 |
assert created_roles[1] == child_role |
|
406 | ||
407 |
assert len(res.parentings['created']) == 1 |
|
408 |
assert res.parentings['created'][0] == RoleParenting.objects.get( |
|
409 |
child=child_role, parent=parent_role, direct=True) |
|
410 | ||
411 | ||
412 |
def test_roles_import_ignore_technical_role(db): |
|
413 |
roles = [{ |
|
414 |
'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}] |
|
415 |
res = import_site({'roles': roles}, ImportContext()) |
|
416 |
assert res.roles == {'created': [], 'updated': []} |
|
417 | ||
418 | ||
419 |
def test_roles_import_ignore_technical_role_with_service(db): |
|
420 |
roles = [{ |
|
421 |
'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}] |
|
422 |
res = import_site({'roles': roles}, ImportContext()) |
|
423 |
assert res.roles == {'created': [], 'updated': []} |
|
424 | ||
425 | ||
426 |
def test_import_role_handle_manager_role_parenting(db): |
|
427 |
parent_role_dict = { |
|
428 |
'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(), |
|
429 |
'ou': None, 'service': None} |
|
430 |
parent_role_manager_dict = { |
|
431 |
'name': 'Administrateur du role grand parent role', |
|
432 |
'slug': '_a2-managers-of-role-grand-parent-role', 'uuid': get_hex_uuid(), |
|
433 |
'ou': None, 'service': None} |
|
434 |
child_role_dict = { |
|
435 |
'name': 'child role', 'slug': 'child-role', |
|
436 |
'parents': [parent_role_dict, parent_role_manager_dict], |
|
437 |
'uuid': get_hex_uuid(), 'ou': None, 'service': None} |
|
438 |
import_site({'roles': [child_role_dict, parent_role_dict]}, ImportContext()) |
|
439 |
child = Role.objects.get(slug='child-role') |
|
440 |
manager = Role.objects.get(slug='_a2-managers-of-role-grand-parent-role') |
|
441 |
RoleParenting.objects.get(child=child, parent=manager, direct=True) |
|
442 | ||
443 | ||
444 |
def test_import_roles_role_delete_orphans(db): |
|
445 |
roles = [{ |
|
446 |
'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}] |
|
447 |
with pytest.raises(DataImportError): |
|
448 |
import_site({'roles': roles}, ImportContext(role_delete_orphans=True)) |
|
449 | ||
450 | ||
451 |
def test_import_ou(db): |
|
452 |
uuid = get_hex_uuid() |
|
453 |
name = 'ou name' |
|
454 |
ous = [{'uuid': uuid, 'slug': 'ou-slug', 'name': name}] |
|
455 |
res = import_site({'ous': ous}, ImportContext()) |
|
456 |
assert len(res.ous['created']) == 1 |
|
457 |
ou = res.ous['created'][0] |
|
458 |
assert ou.uuid == uuid |
|
459 |
assert ou.name == name |
|
460 |
Role.objects.get(slug='_a2-managers-of-ou-slug') |
|
461 | ||
462 | ||
463 |
def test_import_ou_already_existing(db): |
|
464 |
uuid = get_hex_uuid() |
|
465 |
ou_d = {'uuid': uuid, 'slug': 'ou-slug', 'name': 'ou name'} |
|
466 |
ou = OU.objects.create(**ou_d) |
|
467 |
num_ous = OU.objects.count() |
|
468 |
res = import_site({'ous': [ou_d]}, ImportContext()) |
|
469 |
assert len(res.ous['created']) == 0 |
|
470 |
assert num_ous == OU.objects.count() |
|
471 |
assert ou == OU.objects.get(uuid=uuid) |
tests/test_import_export_site_cmd.py | ||
---|---|---|
1 |
import __builtin__ |
|
2 |
import json |
|
3 | ||
4 |
from django.core import management |
|
5 |
import pytest |
|
6 | ||
7 |
from django_rbac.utils import get_role_model |
|
8 | ||
9 | ||
10 |
def dummy_export_site(*args): |
|
11 |
return {'roles': [{'name': 'role1'}]} |
|
12 | ||
13 | ||
14 |
def test_export_role_cmd_stdout(db, capsys, monkeypatch): |
|
15 |
import authentic2.management.commands.export_site |
|
16 |
monkeypatch.setattr( |
|
17 |
authentic2.management.commands.export_site, 'export_site', dummy_export_site) |
|
18 |
management.call_command('export_site') |
|
19 |
out, err = capsys.readouterr() |
|
20 |
assert json.loads(out) == dummy_export_site() |
|
21 | ||
22 | ||
23 |
def test_export_role_cmd_to_file(db, monkeypatch, tmpdir): |
|
24 |
import authentic2.management.commands.export_site |
|
25 |
monkeypatch.setattr( |
|
26 |
authentic2.management.commands.export_site, 'export_site', dummy_export_site) |
|
27 |
outfile = tmpdir.join('export.json') |
|
28 |
management.call_command('export_site', '--output', outfile.strpath) |
|
29 |
with outfile.open('r') as f: |
|
30 |
assert json.loads(f.read()) == dummy_export_site() |
|
31 | ||
32 | ||
33 |
def test_import_site_cmd(db, tmpdir, monkeypatch): |
|
34 |
export_file = tmpdir.join('roles-export.json') |
|
35 |
with export_file.open('w'): |
|
36 |
export_file.write(json.dumps({'roles': []})) |
|
37 |
management.call_command('import_site', export_file.strpath) |
|
38 | ||
39 | ||
40 |
def test_import_site_cmd_infos_on_stdout(db, tmpdir, monkeypatch, capsys): |
|
41 |
export_file = tmpdir.join('roles-export.json') |
|
42 |
with export_file.open('w'): |
|
43 |
export_file.write(json.dumps( |
|
44 |
{'roles': [{ |
|
45 |
'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name', |
|
46 |
'ou': None, 'service': None}]})) |
|
47 | ||
48 |
management.call_command('import_site', export_file.strpath) |
|
49 | ||
50 |
out, err = capsys.readouterr() |
|
51 |
assert "Real run" in out |
|
52 |
assert "1 roles created" in out |
|
53 |
assert "0 roles updated" in out |
|
54 | ||
55 | ||
56 |
def test_import_site_transaction_rollback_on_error(db, tmpdir, monkeypatch, capsys): |
|
57 |
export_file = tmpdir.join('roles-export.json') |
|
58 |
with export_file.open('w'): |
|
59 |
export_file.write(json.dumps({'roles': []})) |
|
60 | ||
61 |
Role = get_role_model() |
|
62 | ||
63 |
def exception_import_site(*args): |
|
64 |
Role.objects.create(slug='role-slug') |
|
65 |
raise Exception() |
|
66 | ||
67 |
import authentic2.management.commands.import_site |
|
68 |
monkeypatch.setattr( |
|
69 |
authentic2.management.commands.import_site, 'import_site', exception_import_site) |
|
70 | ||
71 |
with pytest.raises(Exception): |
|
72 |
management.call_command('import_site', export_file.strpath) |
|
73 | ||
74 |
with pytest.raises(Role.DoesNotExist): |
|
75 |
Role.objects.get(slug='role-slug') |
|
76 | ||
77 | ||
78 |
def test_import_site_transaction_rollback_on_dry_run(db, tmpdir, monkeypatch, capsys): |
|
79 |
export_file = tmpdir.join('roles-export.json') |
|
80 |
with export_file.open('w'): |
|
81 |
export_file.write(json.dumps( |
|
82 |
{'roles': [{ |
|
83 |
'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name', |
|
84 |
'ou': None, 'service': None}]})) |
|
85 | ||
86 |
Role = get_role_model() |
|
87 | ||
88 |
management.call_command('import_site', '--dry-run', export_file.strpath) |
|
89 | ||
90 |
with pytest.raises(Role.DoesNotExist): |
|
91 |
Role.objects.get(slug='role-slug') |
|
92 | ||
93 | ||
94 |
def test_import_site_cmd_unhandled_context_option(db, tmpdir, monkeypatch, capsys): |
|
95 |
from authentic2.data_transfer import DataImportError |
|
96 | ||
97 |
export_file = tmpdir.join('roles-export.json') |
|
98 |
with export_file.open('w'): |
|
99 |
export_file.write(json.dumps( |
|
100 |
{'roles': [{ |
|
101 |
'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name', |
|
102 |
'ou': None, 'service': None}]})) |
|
103 | ||
104 |
get_role_model().objects.create(uuid='dqfewrvesvews2532', slug='role-slug', name='role-name') |
|
105 | ||
106 |
with pytest.raises(DataImportError): |
|
107 |
management.call_command( |
|
108 |
'import_site', '-o', 'role-delete-orphans', export_file.strpath) |
|
109 | ||
110 | ||
111 |
def test_import_site_cmd_unknown_context_option(db, tmpdir, monkeypatch, capsys): |
|
112 |
from django.core.management.base import CommandError |
|
113 |
export_file = tmpdir.join('roles-export.json') |
|
114 |
with pytest.raises(CommandError): |
|
115 |
management.call_command('import_site', '-o', 'unknown-option', export_file.strpath) |
|
116 | ||
117 | ||
118 |
def test_import_site_confirm_prompt_yes(db, tmpdir, monkeypatch): |
|
119 |
export_file = tmpdir.join('roles-export.json') |
|
120 |
with export_file.open('w'): |
|
121 |
export_file.write(json.dumps( |
|
122 |
{'roles': [{ |
|
123 |
'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name', |
|
124 |
'ou': None, 'service': None}]})) |
|
125 | ||
126 |
def yes_raw_input(*args, **kwargs): |
|
127 |
return 'yes' |
|
128 | ||
129 |
monkeypatch.setattr(__builtin__, 'raw_input', yes_raw_input) |
|
130 | ||
131 |
management.call_command('import_site', export_file.strpath, stdin='yes') |
|
132 |
assert get_role_model().objects.get(uuid='dqfewrvesvews2532') |
|
0 |
- |