0001-create-import_site-and-export_site-commands-16514.patch
src/authentic2/a2_rbac/models.py | ||
---|---|---|
22 | 22 | |
23 | 23 |
from . import managers, fields |
24 | 24 | |
25 |
SIMPLE_SERIALIZABLE_FIELDS = (models.TextField, models.CharField, models.SlugField, |
|
26 |
models.URLField, models.BooleanField, fields.UniqueBooleanField, |
|
27 |
models.IntegerField, models.CommaSeparatedIntegerField, |
|
28 |
models.EmailField, models.IntegerField, models.PositiveIntegerField) |
|
29 | ||
25 | 30 | |
26 | 31 |
class OrganizationalUnit(OrganizationalUnitAbstractBase): |
27 | 32 |
username_is_unique = models.BooleanField( |
... | ... | |
92 | 97 |
def cached(cls): |
93 | 98 |
return cls.objects.all() |
94 | 99 | |
100 |
def export_json(self): |
|
101 |
d = {} |
|
102 |
concrete_fields = [f for f in self.__class__._meta.get_fields() |
|
103 |
if f.concrete and not f.is_relation] |
|
104 |
for field in concrete_fields: |
|
105 |
if field.name == 'id': |
|
106 |
continue |
|
107 |
value = getattr(self, field.attname) |
|
108 |
if isinstance(field, SIMPLE_SERIALIZABLE_FIELDS): |
|
109 |
d[field.name] = value |
|
110 |
else: |
|
111 |
raise Exception('export_json: field %s of ressource class %s is unsupported' % ( |
|
112 |
field, self.__class__)) |
|
113 |
return d |
|
114 | ||
115 |
@classmethod |
|
116 |
def import_json(cls, d, to_update=None): |
|
117 |
if to_update is None: |
|
118 |
return cls.objects.create(**d) |
|
119 |
else: |
|
120 |
for attr, value in d.items(): |
|
121 |
setattr(to_update, attr, value) |
|
122 |
to_update.save() |
|
123 |
return to_update |
|
124 | ||
95 | 125 | |
96 | 126 |
class Permission(PermissionAbstractBase): |
97 | 127 |
class Meta: |
... | ... | |
207 | 237 |
'ou__slug': self.ou.slug if self.ou else None, |
208 | 238 |
} |
209 | 239 | |
240 |
def export_json(self, attributes=False, parents=False, permissions=False): |
|
241 |
d = {} |
|
242 |
concrete_fields = [f for f in self.__class__._meta.get_fields() |
|
243 |
if f.concrete and not f.is_relation] |
|
244 |
for field in concrete_fields: |
|
245 |
if field.name == 'id': |
|
246 |
continue |
|
247 |
value = getattr(self, field.attname) |
|
248 |
if isinstance(field, SIMPLE_SERIALIZABLE_FIELDS): |
|
249 |
d[field.name] = value |
|
250 |
else: |
|
251 |
raise Exception('export_json: field %s of ressource class %s is unsupported' % ( |
|
252 |
field, self.__class__)) |
|
253 | ||
254 |
d['natural_key'] = self.natural_key() |
|
255 | ||
256 |
if attributes: |
|
257 |
for attribute in self.attributes.all(): |
|
258 |
d.setdefault('attributes', []).append(attribute.to_json()) |
|
259 | ||
260 |
if parents: |
|
261 |
RoleParenting = rbac_utils.get_role_parenting_model() |
|
262 |
for parenting in RoleParenting.objects.filter(child_id=self.id, direct=True): |
|
263 |
d.setdefault('parents', []).append(parenting.parent.export_json()) |
|
264 | ||
265 |
if permissions: |
|
266 |
for perm in self.permissions.all(): |
|
267 |
d.setdefault('permissions', []).append(perm.natural_key()) |
|
268 | ||
269 |
return d |
|
270 | ||
271 |
@classmethod |
|
272 |
def import_json(cls, d, to_update=None): |
|
273 |
kwargs = {} |
|
274 |
concrete_fields = [f for f in cls._meta.get_fields() |
|
275 |
if f.concrete and not f.is_relation] |
|
276 |
for field in concrete_fields: |
|
277 |
if field.name == 'id': |
|
278 |
continue |
|
279 |
if isinstance(field, SIMPLE_SERIALIZABLE_FIELDS) and field.name in d: |
|
280 |
kwargs[field.name] = d[field.name] |
|
281 |
if to_update is None: |
|
282 |
return cls.objects.create(**kwargs) |
|
283 |
else: |
|
284 |
for attr, value in kwargs.items(): |
|
285 |
setattr(to_update, attr, value) |
|
286 |
to_update.save() |
|
287 |
return to_update |
|
288 | ||
210 | 289 | |
211 | 290 |
class RoleParenting(RoleParentingAbstractBase): |
212 | 291 |
class Meta(RoleParentingAbstractBase.Meta): |
... | ... | |
239 | 318 |
('role', 'name', 'kind', 'value'), |
240 | 319 |
) |
241 | 320 | |
321 |
def to_json(self): |
|
322 |
return {'name': self.name, 'kind': self.kind, 'value': self.value} |
|
323 | ||
324 | ||
242 | 325 |
GenericRelation(Permission, |
243 | 326 |
content_type_field='target_ct', |
244 | 327 |
object_id_field='target_id').contribute_to_class(ContentType, 'admin_perms') |
src/authentic2/data_transfer.py | ||
---|---|---|
1 |
from django.contrib.contenttypes.models import ContentType |
|
2 | ||
3 |
from django_rbac.models import Operation |
|
4 |
from django_rbac.utils import ( |
|
5 |
get_ou_model, get_role_model, get_role_parenting_model, get_permission_model) |
|
6 |
from authentic2.a2_rbac.models import RoleAttribute |
|
7 | ||
8 | ||
9 |
def export_site(): |
|
10 |
return { |
|
11 |
'roles': export_roles(get_role_model().objects.all()), |
|
12 |
'ous': export_ou(get_ou_model().objects.all()) |
|
13 |
} |
|
14 | ||
15 | ||
16 |
def export_ou(ou_query_set): |
|
17 |
return [ou.export_json() for ou in ou_query_set] |
|
18 | ||
19 | ||
20 |
def export_roles(role_queryset): |
|
21 |
""" Serialize roles in role_queryset |
|
22 |
""" |
|
23 |
return [role.export_json(attributes=True, parents=True) for role in role_queryset] |
|
24 | ||
25 | ||
26 |
def search_ou(ou_natural_key): |
|
27 |
""" ou_natural_key: ['ou-slug'] or None |
|
28 |
""" |
|
29 |
if ou_natural_key: |
|
30 |
try: |
|
31 |
OU = get_ou_model() |
|
32 |
return OU.objects.get_by_natural_key(*ou_natural_key) |
|
33 |
except OU.DoesNotExist: |
|
34 |
pass |
|
35 |
return None |
|
36 | ||
37 | ||
38 |
def search_role(role_d): |
|
39 |
Role = get_role_model() |
|
40 |
try: |
|
41 |
return Role.objects.get(uuid=role_d['uuid']) |
|
42 |
except Role.DoesNotExist: |
|
43 |
pass |
|
44 |
try: |
|
45 |
return Role.objects.get_by_natural_key(*role_d['natural_key']) |
|
46 |
except Role.DoesNotExist: |
|
47 |
return None |
|
48 | ||
49 | ||
50 |
class ImportContext(object): |
|
51 |
""" Holds information on how to perform the import. |
|
52 | ||
53 |
ou_delete_orphans: if True any existing ou that is not found in the export will |
|
54 |
be deleted |
|
55 | ||
56 |
role_delete_orphans: if True any existing role that is not found in the export will |
|
57 |
be deleted |
|
58 | ||
59 | ||
60 |
role_attributes_update: for each role in the import data, |
|
61 |
attributes will deleted and re-created |
|
62 | ||
63 | ||
64 |
role_parentings_update: for each role in the import data, |
|
65 |
parentings will deleted and re-created |
|
66 | ||
67 |
role_permissions_update: for each role in the import data, |
|
68 |
permissions will deleted and re-created |
|
69 |
""" |
|
70 | ||
71 |
def __init__( |
|
72 |
self, role_delete_orphans=False, role_parentings_update=True, |
|
73 |
role_permissions_update=True, role_attributes_update=True, |
|
74 |
ou_delete_orphans=False): |
|
75 |
self.role_delete_orphans = role_delete_orphans |
|
76 |
self.ou_delete_orphans = ou_delete_orphans |
|
77 |
self.role_parentings_update = role_parentings_update |
|
78 |
self.role_permissions_update = role_permissions_update |
|
79 |
self.role_attributes_update = role_attributes_update |
|
80 | ||
81 | ||
82 |
class DataImportError(Exception): |
|
83 |
pass |
|
84 | ||
85 | ||
86 |
class RoleDeserializer(object): |
|
87 | ||
88 |
def __init__(self, d, import_context): |
|
89 |
self._import_context = import_context |
|
90 |
self._obj = None |
|
91 |
self._ou = None |
|
92 |
self._parents = None |
|
93 |
self._attributes = None |
|
94 |
self._permissions = None |
|
95 | ||
96 |
self._role_d = dict() |
|
97 |
for key, value in d.items(): |
|
98 |
if key == 'parents': |
|
99 |
self._parents = value |
|
100 |
elif key == 'attributes': |
|
101 |
self._attributes = value |
|
102 |
elif key == 'permissions': |
|
103 |
self._permissions = value |
|
104 |
else: |
|
105 |
self._role_d[key] = value |
|
106 | ||
107 |
ou_natural_key = self._role_d['natural_key'][1] |
|
108 |
if ou_natural_key: |
|
109 |
self._ou = search_ou(ou_natural_key) |
|
110 |
if self._ou is None: |
|
111 |
raise DataImportError( |
|
112 |
"Can't import role because missing Organizational Unit : " |
|
113 |
"%s" % ou_natural_key[0]) |
|
114 | ||
115 |
def deserialize(self): |
|
116 |
kwargs = self._role_d.copy() |
|
117 |
obj = search_role(self._role_d) |
|
118 |
if obj: # Role already exist |
|
119 |
self._obj = obj |
|
120 |
status = 'updated' |
|
121 |
get_role_model().import_json(kwargs, self._obj) |
|
122 |
if self._ou and (self._obj.ou != self._ou): |
|
123 |
# Need to update ou |
|
124 |
self._obj.ou = self._ou |
|
125 |
self._obj.save() |
|
126 |
else: # Create role |
|
127 |
self._obj = get_role_model().import_json(kwargs) |
|
128 |
if self._ou: |
|
129 |
self._obj.ou = self._ou |
|
130 |
self._obj.save() |
|
131 |
status = 'created' |
|
132 | ||
133 |
# Ensure admin role is created |
|
134 |
self._obj.get_admin_role() |
|
135 |
return self._obj, status |
|
136 | ||
137 |
def attributes(self): |
|
138 |
""" Update attributes (delete everything then create) |
|
139 |
""" |
|
140 |
created, deleted = [], [] |
|
141 |
for attr in self._obj.attributes.all(): |
|
142 |
attr.delete() |
|
143 |
deleted.append(attr) |
|
144 |
# Create attributes |
|
145 |
if self._attributes: |
|
146 |
for attr_dict in self._attributes: |
|
147 |
attr_dict['role'] = self._obj |
|
148 |
created.append(RoleAttribute.objects.create(**attr_dict)) |
|
149 | ||
150 |
return created, deleted |
|
151 | ||
152 |
def parentings(self): |
|
153 |
""" Update parentings (delete everything then create) |
|
154 |
""" |
|
155 |
created, deleted = [], [] |
|
156 |
Parenting = get_role_parenting_model() |
|
157 |
for parenting in Parenting.objects.filter(child=self._obj, direct=True): |
|
158 |
parenting.delete() |
|
159 |
deleted.append(parenting) |
|
160 | ||
161 |
if self._parents: |
|
162 |
for parent_d in self._parents: |
|
163 |
parent = search_role(parent_d) |
|
164 |
if not parent: |
|
165 |
raise DataImportError("Could not find role : %s" % parent_d) |
|
166 |
created.append(Parenting.objects.create( |
|
167 |
child=self._obj, direct=True, parent=parent)) |
|
168 | ||
169 |
return created, deleted |
|
170 | ||
171 |
def permissions(self): |
|
172 |
""" Update permissions (delete everything then create) |
|
173 |
""" |
|
174 |
created, deleted = [], [] |
|
175 |
for perm in self._obj.permissions.all(): |
|
176 |
perm.delete() |
|
177 |
deleted.append(perm) |
|
178 |
self._obj.permissions.clear() |
|
179 |
if self._permissions: |
|
180 |
for perm in self._permissions: |
|
181 |
op = Operation.objects.get_by_natural_key(perm[0]) |
|
182 |
ou = get_ou_model().objects.get_by_natural_key(perm[1]) if perm[1] else None |
|
183 |
ct = ContentType.objects.get_by_natural_key(*perm[2]) |
|
184 |
target = ct.model_class().objects.get_by_natural_key(*perm[3]) |
|
185 |
perm = get_permission_model().objects.create( |
|
186 |
operation=op, ou=ou, target_ct=ct, target_id=target.pk) |
|
187 |
self._obj.permissions.add(perm) |
|
188 |
created.append(perm) |
|
189 | ||
190 |
return created, deleted |
|
191 | ||
192 | ||
193 |
class ImportResult(object): |
|
194 | ||
195 |
def __init__(self): |
|
196 |
self.roles = {'created': [], 'updated': []} |
|
197 |
self.ous = {'created': [], 'updated': []} |
|
198 |
self.attributes = {'created': [], 'deleted': []} |
|
199 |
self.parentings = {'created': [], 'deleted': []} |
|
200 |
self.permissions = {'created': [], 'deleted': []} |
|
201 | ||
202 |
def update_roles(self, role, d_status): |
|
203 |
self.roles[d_status].append(role) |
|
204 | ||
205 |
def update_ous(self, ou, status): |
|
206 |
self.ous[status].append(ou) |
|
207 | ||
208 |
def _bulk_update(self, attrname, created, deleted): |
|
209 |
attr = getattr(self, attrname) |
|
210 |
attr['created'].extend(created) |
|
211 |
attr['deleted'].extend(deleted) |
|
212 | ||
213 |
def update_attributes(self, created, deleted): |
|
214 |
self._bulk_update('attributes', created, deleted) |
|
215 | ||
216 |
def update_parentings(self, created, deleted): |
|
217 |
self._bulk_update('parentings', created, deleted) |
|
218 | ||
219 |
def update_permissions(self, created, deleted): |
|
220 |
self._bulk_update('permissions', created, deleted) |
|
221 | ||
222 |
def to_str(self, verbose=False): |
|
223 |
res = "" |
|
224 |
for attr in ('roles', 'ous', 'parentings', 'permissions', 'attributes'): |
|
225 |
data = getattr(self, attr) |
|
226 |
for status in ('created', 'updated', 'deleted'): |
|
227 |
if status in data: |
|
228 |
s_data = data[status] |
|
229 |
res += "%s %s %s\n" % (len(s_data), attr, status) |
|
230 |
return res |
|
231 | ||
232 | ||
233 |
def import_ou(ou_d): |
|
234 |
OU = get_ou_model() |
|
235 |
ou = search_ou([ou_d['slug']]) |
|
236 |
if ou is None: |
|
237 |
ou = OU.import_json(ou_d) |
|
238 |
status = 'created' |
|
239 |
else: |
|
240 |
OU.import_json(ou_d, ou) |
|
241 |
status = 'updated' |
|
242 |
# Ensure admin role is created |
|
243 |
ou.get_admin_role() |
|
244 |
return ou, status |
|
245 | ||
246 | ||
247 |
def import_site(json_d, import_context): |
|
248 |
result = ImportResult() |
|
249 | ||
250 |
for ou_d in json_d.get('ous', []): |
|
251 |
result.update_ous(*import_ou(ou_d)) |
|
252 | ||
253 |
roles_ds = [RoleDeserializer(role_d, import_context) for role_d in json_d.get('roles', []) |
|
254 |
if not role_d['slug'].startswith('_')] |
|
255 | ||
256 |
for ds in roles_ds: |
|
257 |
result.update_roles(*ds.deserialize()) |
|
258 | ||
259 |
if import_context.role_attributes_update: |
|
260 |
for ds in roles_ds: |
|
261 |
result.update_attributes(*ds.attributes()) |
|
262 | ||
263 |
if import_context.role_parentings_update: |
|
264 |
for ds in roles_ds: |
|
265 |
result.update_parentings(*ds.parentings()) |
|
266 | ||
267 |
if import_context.role_permissions_update: |
|
268 |
for ds in roles_ds: |
|
269 |
result.update_permissions(*ds.permissions()) |
|
270 | ||
271 |
if import_context.ou_delete_orphans: |
|
272 |
raise DataImportError( |
|
273 |
"Unsupported context value for ou_delete_orphans : %s" % ( |
|
274 |
import_context.ou_delete_orphans)) |
|
275 | ||
276 |
if import_context.role_delete_orphans: |
|
277 |
# FIXME : delete each role that is in DB but not in the export |
|
278 |
raise DataImportError( |
|
279 |
"Unsupported context value for role_delete_orphans : %s" % ( |
|
280 |
import_context.role_delete_orphans)) |
|
281 | ||
282 |
return result |
src/authentic2/management/commands/export_site.py | ||
---|---|---|
1 |
import json |
|
2 |
import sys |
|
3 | ||
4 |
from django.core.management.base import BaseCommand |
|
5 | ||
6 |
from authentic2.data_transfer import export_site |
|
7 |
from django_rbac.utils import get_role_model |
|
8 | ||
9 | ||
10 |
class Command(BaseCommand): |
|
11 |
help = 'Export site' |
|
12 | ||
13 |
def add_arguments(self, parser): |
|
14 |
parser.add_argument('--output', metavar='FILE', default=None, |
|
15 |
help='name of a file to write output to') |
|
16 | ||
17 |
def handle(self, *args, **options): |
|
18 |
if options['output']: |
|
19 |
output, close = open(options['output'], 'w'), True |
|
20 |
else: |
|
21 |
output, close = sys.stdout, False |
|
22 |
json.dump(export_site(), output, indent=4) |
|
23 |
if close: |
|
24 |
output.close() |
src/authentic2/management/commands/import_site.py | ||
---|---|---|
1 |
import contextlib |
|
2 |
import json |
|
3 |
import sys |
|
4 | ||
5 |
from django.conf import settings |
|
6 |
from django.core.management.base import BaseCommand |
|
7 |
from django.db import transaction |
|
8 |
from django.utils import translation |
|
9 | ||
10 |
from authentic2.data_transfer import import_site, ImportContext |
|
11 | ||
12 | ||
13 |
class DryRunException(Exception): |
|
14 |
pass |
|
15 | ||
16 | ||
17 |
def create_context_args(options): |
|
18 |
kwargs = {} |
|
19 |
if options['option']: |
|
20 |
for context_op in options['option']: |
|
21 |
context_op = context_op.replace('-', '_') |
|
22 |
if context_op.startswith('no_'): |
|
23 |
kwargs[context_op[3:]] = False |
|
24 |
else: |
|
25 |
kwargs[context_op] = True |
|
26 |
return kwargs |
|
27 | ||
28 | ||
29 |
# Borrowed from https://bugs.python.org/issue10049#msg118599 |
|
30 |
@contextlib.contextmanager |
|
31 |
def provision_contextm(dry_run): |
|
32 |
multitenant = False |
|
33 |
try: |
|
34 |
import hobo.agent.authentic2 |
|
35 |
multitenant = True |
|
36 |
except ImportError: |
|
37 |
pass |
|
38 |
if dry_run and multitenant: |
|
39 |
with hobo.agent.authentic2.provisionning.Provisionning(): |
|
40 |
yield |
|
41 |
else: |
|
42 |
yield |
|
43 | ||
44 | ||
45 |
class Command(BaseCommand): |
|
46 |
help = 'Import site' |
|
47 | ||
48 |
def add_arguments(self, parser): |
|
49 |
parser.add_argument( |
|
50 |
'filename', metavar='FILENAME', type=str, help='name of file to import') |
|
51 |
parser.add_argument( |
|
52 |
'--no-dry-run', action='store_false', dest='dry_run', help='Really perform the import') |
|
53 |
parser.add_argument( |
|
54 |
'-o', '--option', action='append', help='Import context options', |
|
55 |
choices=[ |
|
56 |
'role-delete-orphans', 'ou-delete-orphans', 'no-role-permissions-update', |
|
57 |
'no-role-attributes-update', 'no-role-parentings-update']) |
|
58 | ||
59 |
def handle(self, filename, **options): |
|
60 |
translation.activate(settings.LANGUAGE_CODE) |
|
61 |
dry_run = options['dry_run'] |
|
62 |
msg = "Dry run\n" if dry_run else "Real run\n" |
|
63 |
c_kwargs = create_context_args(options) |
|
64 |
try: |
|
65 |
with open(filename, 'r') as f: |
|
66 |
with provision_contextm(dry_run): |
|
67 |
with transaction.atomic(): |
|
68 |
sys.stdout.write(msg) |
|
69 |
result = import_site(json.load(f), ImportContext(**c_kwargs)) |
|
70 |
if dry_run: |
|
71 |
raise DryRunException() |
|
72 |
except DryRunException: |
|
73 |
pass |
|
74 |
sys.stdout.write(result.to_str()) |
|
75 |
sys.stdout.write("Success\n") |
|
76 |
translation.deactivate() |
tests/test_a2_rbac.py | ||
---|---|---|
1 | 1 |
import pytest |
2 | 2 | |
3 |
from django.contrib.contenttypes.models import ContentType |
|
4 |
from django_rbac.utils import get_permission_model |
|
5 |
from django_rbac.models import Operation |
|
6 |
from authentic2.a2_rbac.models import Role, OrganizationalUnit as OU, RoleAttribute |
|
3 | 7 |
from authentic2.models import Service |
4 |
from authentic2.a2_rbac.models import Role, OrganizationalUnit as OU
|
|
8 |
from authentic2.utils import get_hex_uuid
|
|
5 | 9 | |
6 | 10 | |
7 | 11 |
def test_role_natural_key(db): |
... | ... | |
24 | 28 |
Role.objects.get_by_natural_key(*r2.natural_key()) |
25 | 29 |
with pytest.raises(Role.DoesNotExist): |
26 | 30 |
Role.objects.get_by_natural_key(*r4.natural_key()) |
31 | ||
32 | ||
33 |
def test_basic_role_export_json(db): |
|
34 |
role = Role.objects.create( |
|
35 |
name='basic role', slug='basic-role', description='basic role description') |
|
36 |
role_dict = role.export_json() |
|
37 |
assert role_dict['name'] == role.name |
|
38 |
assert role_dict['slug'] == role.slug |
|
39 |
assert role_dict['uuid'] == role.uuid |
|
40 |
assert role_dict['description'] == role.description |
|
41 |
assert role_dict['admin_scope_id'] == role.admin_scope_id |
|
42 |
assert role_dict['external_id'] == role.external_id |
|
43 |
assert role_dict['natural_key'] == ['basic-role', None, None] |
|
44 | ||
45 | ||
46 |
def test_role_with_ou_export_json(db): |
|
47 |
ou = OU.objects.create(name='ou', slug='ou') |
|
48 |
role = Role.objects.create(name='some role', ou=ou) |
|
49 |
role_dict = role.export_json() |
|
50 |
assert role_dict['natural_key'] == ['some-role', ['ou'], None] |
|
51 | ||
52 | ||
53 |
def test_role_with_service_export_json(db): |
|
54 |
service = Service.objects.create(name='service name', slug='service-name') |
|
55 |
role = Role.objects.create(name='some role', service=service) |
|
56 |
role_dict = role.export_json() |
|
57 |
assert role_dict['natural_key'] == [u'some-role', None, [None, 'service-name']] |
|
58 | ||
59 | ||
60 |
def test_role_with_attributes_export_json(db): |
|
61 |
role = Role.objects.create(name='some role') |
|
62 |
attr1 = RoleAttribute.objects.create( |
|
63 |
role=role, name='attr1_name', kind='string', value='attr1_value') |
|
64 |
attr2 = RoleAttribute.objects.create( |
|
65 |
role=role, name='attr2_name', kind='string', value='attr2_value') |
|
66 | ||
67 |
role_dict = role.export_json(attributes=True) |
|
68 |
attributes = role_dict['attributes'] |
|
69 |
assert len(attributes) == 2 |
|
70 | ||
71 |
expected_attr_names = set([attr1.name, attr2.name]) |
|
72 |
for attr_dict in attributes: |
|
73 |
assert attr_dict['name'] in expected_attr_names |
|
74 |
expected_attr_names.remove(attr_dict['name']) |
|
75 |
target_attr = RoleAttribute.objects.filter(name=attr_dict['name']).first() |
|
76 |
assert attr_dict['kind'] == target_attr.kind |
|
77 |
assert attr_dict['value'] == target_attr.value |
|
78 | ||
79 | ||
80 |
def test_role_with_parents_export_json(db): |
|
81 |
grand_parent_role = Role.objects.create( |
|
82 |
name='test grand parent role', slug='test-grand-parent-role') |
|
83 |
parent_1_role = Role.objects.create( |
|
84 |
name='test parent 1 role', slug='test-parent-1-role') |
|
85 |
parent_1_role.add_parent(grand_parent_role) |
|
86 |
parent_2_role = Role.objects.create( |
|
87 |
name='test parent 2 role', slug='test-parent-2-role') |
|
88 |
parent_2_role.add_parent(grand_parent_role) |
|
89 |
child_role = Role.objects.create( |
|
90 |
name='test child role', slug='test-child-role') |
|
91 |
child_role.add_parent(parent_1_role) |
|
92 |
child_role.add_parent(parent_2_role) |
|
93 | ||
94 |
child_role_dict = child_role.export_json(parents=True) |
|
95 |
assert child_role_dict['slug'] == child_role.slug |
|
96 |
parents = child_role_dict['parents'] |
|
97 |
assert len(parents) == 2 |
|
98 |
expected_slugs = set([parent_1_role.slug, parent_2_role.slug]) |
|
99 |
for parent in parents: |
|
100 |
assert parent['slug'] in expected_slugs |
|
101 |
expected_slugs.remove(parent['slug']) |
|
102 | ||
103 |
grand_parent_role_dict = grand_parent_role.export_json(parents=True) |
|
104 |
assert grand_parent_role_dict['slug'] == grand_parent_role.slug |
|
105 |
assert 'parents' not in grand_parent_role_dict |
|
106 | ||
107 |
parent_1_role_dict = parent_1_role.export_json(parents=True) |
|
108 |
assert parent_1_role_dict['slug'] == parent_1_role.slug |
|
109 |
parents = parent_1_role_dict['parents'] |
|
110 |
assert len(parents) == 1 |
|
111 |
assert parents[0]['slug'] == grand_parent_role.slug |
|
112 | ||
113 |
parent_2_role_dict = parent_2_role.export_json(parents=True) |
|
114 |
assert parent_2_role_dict['slug'] == parent_2_role.slug |
|
115 |
parents = parent_2_role_dict['parents'] |
|
116 |
assert len(parents) == 1 |
|
117 |
assert parents[0]['slug'] == grand_parent_role.slug |
|
118 | ||
119 | ||
120 |
def test_role_with_permission_export_json(db): |
|
121 |
some_ou = OU.objects.create(name='some ou', slug='some-ou') |
|
122 |
role = Role.objects.create(name='role name', slug='role-slug') |
|
123 |
other_role = Role.objects.create( |
|
124 |
name='other role name', slug='other-role-slug', uuid=get_hex_uuid(), ou=some_ou) |
|
125 |
ou = OU.objects.create(name='basic ou', slug='basic-ou', description='basic ou description') |
|
126 |
Permission = get_permission_model() |
|
127 |
op = Operation.objects.first() |
|
128 |
perm_saml = Permission.objects.create( |
|
129 |
operation=op, ou=ou, |
|
130 |
target_ct=ContentType.objects.get_for_model(ContentType), |
|
131 |
target_id=ContentType.objects.get(app_label="saml", model="libertyprovider").pk) |
|
132 |
role.permissions.add(perm_saml) |
|
133 |
perm_role = Permission.objects.create( |
|
134 |
operation=op, ou=None, |
|
135 |
target_ct=ContentType.objects.get_for_model(Role), |
|
136 |
target_id=other_role.pk) |
|
137 |
role.permissions.add(perm_role) |
|
138 | ||
139 |
export = role.export_json(permissions=True) |
|
140 |
permissions = export['permissions'] |
|
141 |
assert len(permissions) == 2 |
|
142 |
assert permissions[0] == [ |
|
143 |
u'add', [u'basic-ou'], (u'contenttypes', u'contenttype'), (u'saml', u'libertyprovider')] |
|
144 |
assert permissions[1] == [ |
|
145 |
u'add', None, (u'a2_rbac', u'role'), [u'other-role-slug', ['some-ou'], None]] |
|
146 | ||
147 | ||
148 |
def test_basic_role_import_json(db): |
|
149 |
role_dict = dict( |
|
150 |
name='basic role', slug='basic-role', description='basic role description', |
|
151 |
uuid=get_hex_uuid(), admin_scope_id=1, external_id='some id') |
|
152 |
role = Role.import_json(role_dict) |
|
153 |
assert role_dict['name'] == role.name |
|
154 |
assert role_dict['slug'] == role.slug |
|
155 |
assert role_dict['uuid'] == role.uuid |
|
156 |
assert role_dict['description'] == role.description |
|
157 |
assert role_dict['admin_scope_id'] == role.admin_scope_id |
|
158 |
assert role_dict['external_id'] == role.external_id |
|
159 | ||
160 | ||
161 |
def test_ou_export_json(db): |
|
162 |
ou = OU.objects.create( |
|
163 |
name='basic ou', slug='basic-ou', description='basic ou description', |
|
164 |
username_is_unique=True, email_is_unique=True, default=False, validate_emails=True) |
|
165 |
ou_dict = ou.export_json() |
|
166 |
assert ou_dict['name'] == ou.name |
|
167 |
assert ou_dict['slug'] == ou.slug |
|
168 |
assert ou_dict['uuid'] == ou.uuid |
|
169 |
assert ou_dict['description'] == ou.description |
|
170 |
assert ou_dict['username_is_unique'] == ou.username_is_unique |
|
171 |
assert ou_dict['email_is_unique'] == ou.email_is_unique |
|
172 |
assert ou_dict['default'] == ou.default |
|
173 |
assert ou_dict['validate_emails'] == ou.validate_emails |
|
174 | ||
175 | ||
176 |
def test_ou_import_json(db): |
|
177 |
ou_d = dict(name='basic ou', slug='basic-ou', description='basic ou description', |
|
178 |
username_is_unique=True, email_is_unique=True, default=False, validate_emails=True) |
|
179 |
ou = OU.import_json(ou_d) |
|
180 |
for field, value in ou_d.items(): |
|
181 |
assert getattr(ou, field) == value |
tests/test_data_transfer.py | ||
---|---|---|
1 |
from django_rbac.utils import get_role_model, get_ou_model |
|
2 |
import pytest |
|
3 | ||
4 |
from authentic2.a2_rbac.models import RoleParenting |
|
5 |
from authentic2.data_transfer import ( |
|
6 |
DataImportError, export_roles, import_site, export_ou, ImportContext, |
|
7 |
RoleDeserializer, search_role, import_ou) |
|
8 |
from authentic2.utils import get_hex_uuid |
|
9 | ||
10 | ||
11 |
Role = get_role_model() |
|
12 |
OU = get_ou_model() |
|
13 | ||
14 | ||
15 |
def test_export_basic_role(db): |
|
16 |
role = Role.objects.create(name='basic role', slug='basic-role', uuid=get_hex_uuid()) |
|
17 |
query_set = Role.objects.filter(uuid=role.uuid) |
|
18 |
roles = export_roles(query_set) |
|
19 |
assert len(roles) == 1 |
|
20 |
role_dict = roles[0] |
|
21 |
for key, value in role.export_json().items(): |
|
22 |
assert role_dict[key] == value |
|
23 | ||
24 | ||
25 |
def test_export_role_with_parents(db): |
|
26 |
grand_parent_role = Role.objects.create( |
|
27 |
name='test grand parent role', slug='test-grand-parent-role', uuid=get_hex_uuid()) |
|
28 |
parent_1_role = Role.objects.create( |
|
29 |
name='test parent 1 role', slug='test-parent-1-role', uuid=get_hex_uuid()) |
|
30 |
parent_1_role.add_parent(grand_parent_role) |
|
31 |
parent_2_role = Role.objects.create( |
|
32 |
name='test parent 2 role', slug='test-parent-2-role', uuid=get_hex_uuid()) |
|
33 |
parent_2_role.add_parent(grand_parent_role) |
|
34 |
child_role = Role.objects.create( |
|
35 |
name='test child role', slug='test-child-role', uuid=get_hex_uuid()) |
|
36 |
child_role.add_parent(parent_1_role) |
|
37 |
child_role.add_parent(parent_2_role) |
|
38 | ||
39 |
query_set = Role.objects.filter(slug__startswith='test').order_by('slug') |
|
40 |
roles = export_roles(query_set) |
|
41 |
assert len(roles) == 4 |
|
42 | ||
43 |
child_role_dict = roles[0] |
|
44 |
assert child_role_dict['slug'] == child_role.slug |
|
45 |
parents = child_role_dict['parents'] |
|
46 |
assert len(parents) == 2 |
|
47 |
expected_slugs = set([parent_1_role.slug, parent_2_role.slug]) |
|
48 |
for parent in parents: |
|
49 |
assert parent['slug'] in expected_slugs |
|
50 |
expected_slugs.remove(parent['slug']) |
|
51 | ||
52 |
grand_parent_role_dict = roles[1] |
|
53 |
assert grand_parent_role_dict['slug'] == grand_parent_role.slug |
|
54 | ||
55 |
parent_1_role_dict = roles[2] |
|
56 |
assert parent_1_role_dict['slug'] == parent_1_role.slug |
|
57 |
parents = parent_1_role_dict['parents'] |
|
58 |
assert len(parents) == 1 |
|
59 |
assert parents[0]['slug'] == grand_parent_role.slug |
|
60 | ||
61 |
parent_2_role_dict = roles[3] |
|
62 |
assert parent_2_role_dict['slug'] == parent_2_role.slug |
|
63 |
parents = parent_2_role_dict['parents'] |
|
64 |
assert len(parents) == 1 |
|
65 |
assert parents[0]['slug'] == grand_parent_role.slug |
|
66 | ||
67 | ||
68 |
def test_export_ou(db): |
|
69 |
ou = OU.objects.create(name='ou name', slug='ou-slug') |
|
70 |
ous = export_ou(OU.objects.filter(name='ou name')) |
|
71 |
assert len(ous) == 1 |
|
72 |
ou_d = ous[0] |
|
73 |
assert ou_d['name'] == ou.name |
|
74 |
assert ou_d['slug'] == ou.slug |
|
75 | ||
76 | ||
77 |
def test_search_role_by_uuid(db): |
|
78 |
uuid = get_hex_uuid() |
|
79 |
role_d = {'uuid': uuid, 'slug': 'role-slug'} |
|
80 |
role = Role.objects.create(**role_d) |
|
81 |
assert role == search_role({'uuid': uuid, 'slug': 'other-role-slug'}) |
|
82 | ||
83 | ||
84 |
def test_search_role_by_slug(db): |
|
85 |
role_d = {'uuid': get_hex_uuid(), 'slug': 'role-slug'} |
|
86 |
role = Role.objects.create(**role_d) |
|
87 |
assert role == search_role({ |
|
88 |
'uuid': get_hex_uuid(), 'slug': 'role-slug', 'natural_key': ['role-slug', None, None]}) |
|
89 | ||
90 | ||
91 |
def test_search_role_not_found(db): |
|
92 |
assert search_role( |
|
93 |
{ |
|
94 |
'uuid': get_hex_uuid(), 'slug': 'role-slug', 'name': 'role name', |
|
95 |
'natural_key': ['role-slug', None, None]}) is None |
|
96 | ||
97 | ||
98 |
def test_search_role_slug_not_unique(db): |
|
99 |
role1_d = {'uuid': get_hex_uuid(), 'slug': 'role-slug', 'name': 'role name'} |
|
100 |
role2_d = {'uuid': get_hex_uuid(), 'slug': 'role-slug', 'name': 'role name'} |
|
101 |
ou = OU.objects.create(name='some ou', slug='some-ou') |
|
102 |
role1 = Role.objects.create(ou=ou, **role1_d) |
|
103 |
Role.objects.create(**role2_d) |
|
104 |
assert role1 == search_role(role1.export_json()) |
|
105 | ||
106 | ||
107 |
def test_role_deserializer(db): |
|
108 |
rd = RoleDeserializer({ |
|
109 |
'name': 'some role', 'description': 'some role description', 'slug': 'some-role', |
|
110 |
'uuid': get_hex_uuid(), 'natural_key': ['some-role', None, None]}, ImportContext()) |
|
111 |
assert rd._ou is None |
|
112 |
assert rd._parents is None |
|
113 |
assert rd._attributes is None |
|
114 |
assert rd._obj is None |
|
115 |
role, status = rd.deserialize() |
|
116 |
assert status == 'created' |
|
117 |
assert role.name == 'some role' |
|
118 |
assert role.description == 'some role description' |
|
119 |
assert role.slug == 'some-role' |
|
120 |
assert rd._obj == role |
|
121 | ||
122 | ||
123 |
def test_role_deserializer_with_ou(db): |
|
124 |
ou = OU.objects.create(name='some ou', slug='some-ou') |
|
125 |
rd = RoleDeserializer({ |
|
126 |
'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'some role description', |
|
127 |
'slug': 'some-role', 'natural_key': ['some-role', ['some-ou'], None]}, ImportContext()) |
|
128 |
role, status = rd.deserialize() |
|
129 |
assert role.ou == ou |
|
130 | ||
131 | ||
132 |
def test_role_deserializer_missing_ou(db): |
|
133 |
with pytest.raises(DataImportError): |
|
134 |
RoleDeserializer({ |
|
135 |
'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'role description', |
|
136 |
'slug': 'some-role', 'natural_key': ['role-slug', ['some-ou'], None]}, |
|
137 |
ImportContext()) |
|
138 | ||
139 | ||
140 |
def test_role_deserializer_update_ou(db): |
|
141 |
ou1 = OU.objects.create(name='ou 1', slug='ou-1') |
|
142 |
ou2 = OU.objects.create(name='ou 2', slug='ou-2') |
|
143 |
uuid = get_hex_uuid() |
|
144 |
existing_role = Role.objects.create(uuid=uuid, slug='some-role', ou=ou1) |
|
145 |
rd = RoleDeserializer({ |
|
146 |
'uuid': uuid, 'name': 'some-role', 'slug': 'some-role', |
|
147 |
'natural_key': ['some-role', ['ou-2'], None]}, ImportContext()) |
|
148 |
role, status = rd.deserialize() |
|
149 |
assert role == existing_role |
|
150 |
assert role.ou == ou2 |
|
151 | ||
152 | ||
153 |
def test_role_deserializer_update_fields(db): |
|
154 |
uuid = get_hex_uuid() |
|
155 |
existing_role = Role.objects.create(uuid=uuid, slug='some-role', name='some role') |
|
156 |
rd = RoleDeserializer({ |
|
157 |
'uuid': uuid, 'slug': 'some-role', 'name': 'some role changed', |
|
158 |
'natural_key': ['some-role', None, None]}, ImportContext()) |
|
159 |
role, status = rd.deserialize() |
|
160 |
assert role == existing_role |
|
161 |
assert role.name == 'some role changed' |
|
162 | ||
163 | ||
164 |
def test_role_deserializer_with_attributes(db): |
|
165 | ||
166 |
attributes_data = { |
|
167 |
'attr1_name': dict(name='attr1_name', kind='string', value='attr1_value'), |
|
168 |
'attr2_name': dict(name='attr2_name', kind='string', value='attr2_value') |
|
169 |
} |
|
170 |
rd = RoleDeserializer({ |
|
171 |
'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'some role description', |
|
172 |
'slug': 'some-role', 'attributes': list(attributes_data.values()), |
|
173 |
'natural_key': ['some-role', None, None]}, ImportContext()) |
|
174 |
role, status = rd.deserialize() |
|
175 |
created, deleted = rd.attributes() |
|
176 |
assert role.attributes.count() == 2 |
|
177 |
assert len(created) == 2 |
|
178 | ||
179 |
for attr in created: |
|
180 |
attr_dict = attributes_data[attr.name] |
|
181 |
assert attr_dict['name'] == attr.name |
|
182 |
assert attr_dict['kind'] == attr.kind |
|
183 |
assert attr_dict['value'] == attr.value |
|
184 |
del attributes_data[attr.name] |
|
185 | ||
186 | ||
187 |
def test_role_deserializer_creates_admin_role(db): |
|
188 |
role_dict = { |
|
189 |
'name': 'some role', 'slug': 'some-role', 'uuid': get_hex_uuid(), |
|
190 |
'natural_key': ['some-role', None, None]} |
|
191 |
rd = RoleDeserializer(role_dict, ImportContext()) |
|
192 |
rd.deserialize() |
|
193 |
Role.objects.get(slug='_a2-managers-of-role-some-role') |
|
194 | ||
195 | ||
196 |
def test_role_deserializer_parenting_existing_parent(db): |
|
197 |
parent_role_dict = { |
|
198 |
'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(), |
|
199 |
'natural_key': ['grand-parent-role', None, None]} |
|
200 |
parent_role = Role.import_json(parent_role_dict) |
|
201 |
child_role_dict = { |
|
202 |
'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict], |
|
203 |
'uuid': get_hex_uuid(), 'natural_key': ['child-role', None, None]} |
|
204 | ||
205 |
rd = RoleDeserializer(child_role_dict, ImportContext()) |
|
206 |
child_role, status = rd.deserialize() |
|
207 |
created, deleted = rd.parentings() |
|
208 | ||
209 |
assert len(created) == 1 |
|
210 |
parenting = created[0] |
|
211 |
assert parenting.direct is True |
|
212 |
assert parenting.parent == parent_role |
|
213 |
assert parenting.child == child_role |
|
214 | ||
215 | ||
216 |
def test_role_deserializer_parenting_non_existing_parent(db): |
|
217 |
parent_role_dict = { |
|
218 |
'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(), |
|
219 |
'natural_key': ['grand-parent-role', None, None]} |
|
220 |
child_role_dict = { |
|
221 |
'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict], |
|
222 |
'uuid': get_hex_uuid(), 'natural_key': ['child-role', None, None]} |
|
223 |
rd = RoleDeserializer(child_role_dict, ImportContext()) |
|
224 |
rd.deserialize() |
|
225 |
with pytest.raises(DataImportError) as excinfo: |
|
226 |
rd.parentings() |
|
227 | ||
228 |
assert "Could not find role" in str(excinfo.value) |
|
229 | ||
230 | ||
231 |
def test_role_deserializer_permissions(db): |
|
232 |
ou = OU.objects.create(slug='some-ou') |
|
233 |
other_role_dict = { |
|
234 |
'name': 'other role', 'slug': 'other-role-slug', 'uuid': get_hex_uuid(), 'ou': ou} |
|
235 |
other_role = Role.objects.create(**other_role_dict) |
|
236 |
some_role_dict = { |
|
237 |
'name': 'some role', 'slug': 'some-role', 'uuid': get_hex_uuid(), |
|
238 |
'natural_key': ['some-role', None, None]} |
|
239 |
some_role_dict['permissions'] = [ |
|
240 |
[u'add', None, (u'a2_rbac', u'role'), [u'other-role-slug', ['some-ou'], None]] |
|
241 |
] |
|
242 | ||
243 |
import_context = ImportContext() |
|
244 |
rd = RoleDeserializer(some_role_dict, import_context) |
|
245 |
rd.deserialize() |
|
246 |
perm_created, perm_deleted = rd.permissions() |
|
247 | ||
248 |
assert len(perm_created) == 1 |
|
249 |
assert len(perm_deleted) == 0 |
|
250 |
del some_role_dict['permissions'] |
|
251 |
role = Role.objects.get(slug=some_role_dict['slug']) |
|
252 |
assert role.permissions.count() == 1 |
|
253 |
perm = role.permissions.first() |
|
254 |
assert perm.operation.slug == 'add' |
|
255 |
assert not perm.ou |
|
256 |
assert perm.target == other_role |
|
257 | ||
258 |
# that one should delete permissions |
|
259 |
rd = RoleDeserializer(some_role_dict, import_context) |
|
260 |
role, _ = rd.deserialize() |
|
261 |
perm_created, perm_deleted = rd.permissions() |
|
262 |
assert role.permissions.count() == 0 |
|
263 |
assert len(perm_created) == 0 |
|
264 |
assert len(perm_deleted) == 1 |
|
265 | ||
266 | ||
267 |
def import_ou_created(db): |
|
268 |
uuid = get_hex_uuid() |
|
269 |
ou_d = {'uuid': uuid, 'slug': 'ou-slug', 'name': 'ou name'} |
|
270 |
ou, status = import_ou(ou_d) |
|
271 |
assert status == 'created' |
|
272 |
assert ou.uuid == ou_d['uuid'] |
|
273 |
assert ou.slug == ou_d['slug'] |
|
274 |
assert ou.name == ou_d['name'] |
|
275 | ||
276 | ||
277 |
def import_ou_updated(db): |
|
278 |
ou = OU.objects.create(slug='some-ou', name='ou name') |
|
279 |
ou_d = {'uuid': ou.uuid, 'slug': ou.slug, 'name': 'new name'} |
|
280 |
ou_updated, status = import_ou(ou_d) |
|
281 |
assert status == 'updated' |
|
282 |
assert ou == ou_updated |
|
283 |
assert ou.name == 'new name' |
|
284 | ||
285 | ||
286 |
def testi_import_site_empty(): |
|
287 |
res = import_site({}, ImportContext()) |
|
288 |
assert res.roles == {'created': [], 'updated': []} |
|
289 |
assert res.ous == {'created': [], 'updated': []} |
|
290 |
assert res.parentings == {'created': [], 'deleted': []} |
|
291 | ||
292 | ||
293 |
def test_import_site_roles(db): |
|
294 |
parent_role_dict = { |
|
295 |
'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(), |
|
296 |
'natural_key': ['grand-parent-role', None, None]} |
|
297 |
child_role_dict = { |
|
298 |
'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict], |
|
299 |
'uuid': get_hex_uuid(), 'natural_key': ['child-role', None, None]} |
|
300 |
roles = [ |
|
301 |
parent_role_dict, |
|
302 |
child_role_dict |
|
303 |
] |
|
304 |
res = import_site({'roles': roles}, ImportContext()) |
|
305 |
created_roles = res.roles['created'] |
|
306 |
assert len(created_roles) == 2 |
|
307 |
del parent_role_dict['natural_key'] |
|
308 |
parent_role = Role.objects.get(**parent_role_dict) |
|
309 |
del child_role_dict['parents'] |
|
310 |
del child_role_dict['natural_key'] |
|
311 |
child_role = Role.objects.get(**child_role_dict) |
|
312 |
assert created_roles[0] == parent_role |
|
313 |
assert created_roles[1] == child_role |
|
314 | ||
315 |
assert len(res.parentings['created']) == 1 |
|
316 |
assert res.parentings['created'][0] == RoleParenting.objects.get( |
|
317 |
child=child_role, parent=parent_role, direct=True) |
|
318 | ||
319 | ||
320 |
def test_roles_import_ignore_technical_role(db): |
|
321 |
roles = [{ |
|
322 |
'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}] |
|
323 |
res = import_site({'roles': roles}, ImportContext()) |
|
324 |
assert res.roles == {'created': [], 'updated': []} |
|
325 | ||
326 | ||
327 |
def test_roles_import_ignore_technical_role_with_service(db): |
|
328 |
roles = [{ |
|
329 |
'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}] |
|
330 |
res = import_site({'roles': roles}, ImportContext()) |
|
331 |
assert res.roles == {'created': [], 'updated': []} |
|
332 | ||
333 | ||
334 |
def test_import_role_handle_manager_role_parenting(db): |
|
335 |
parent_role_dict = { |
|
336 |
'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(), |
|
337 |
'natural_key': ['grand-parent-role', None, None]} |
|
338 |
parent_role_manager_dict = { |
|
339 |
'name': 'Administrateur du role grand parent role', |
|
340 |
'slug': '_a2-managers-of-role-grand-parent-role', 'uuid': get_hex_uuid(), |
|
341 |
'natural_key': ['_a2-managers-of-role-grand-parent-role', None, None]} |
|
342 |
child_role_dict = { |
|
343 |
'name': 'child role', 'slug': 'child-role', |
|
344 |
'parents': [parent_role_dict, parent_role_manager_dict], |
|
345 |
'uuid': get_hex_uuid(), 'natural_key': ['child-role', None, None]} |
|
346 |
import_site({'roles': [child_role_dict, parent_role_dict]}, ImportContext()) |
|
347 |
child = Role.objects.get(slug='child-role') |
|
348 |
manager = Role.objects.get(slug='_a2-managers-of-role-grand-parent-role') |
|
349 |
RoleParenting.objects.get(child=child, parent=manager, direct=True) |
|
350 | ||
351 | ||
352 |
def test_import_roles_role_delete_orphans(db): |
|
353 |
roles = [{ |
|
354 |
'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}] |
|
355 |
with pytest.raises(DataImportError): |
|
356 |
import_site({'roles': roles}, ImportContext(role_delete_orphans=True)) |
|
357 | ||
358 | ||
359 |
def test_import_ou(db): |
|
360 |
uuid = get_hex_uuid() |
|
361 |
name = 'ou name' |
|
362 |
ous = [{'uuid': uuid, 'slug': 'ou-slug', 'name': name}] |
|
363 |
res = import_site({'ous': ous}, ImportContext()) |
|
364 |
assert len(res.ous['created']) == 1 |
|
365 |
ou = res.ous['created'][0] |
|
366 |
assert ou.uuid == uuid |
|
367 |
assert ou.name == name |
|
368 |
Role.objects.get(slug='_a2-managers-of-ou-slug') |
|
369 | ||
370 | ||
371 |
def test_import_ou_already_existing(db): |
|
372 |
uuid = get_hex_uuid() |
|
373 |
ou_d = {'uuid': uuid, 'slug': 'ou-slug', 'name': 'ou name'} |
|
374 |
ou = OU.objects.create(**ou_d) |
|
375 |
num_ous = OU.objects.count() |
|
376 |
res = import_site({'ous': [ou_d]}, ImportContext()) |
|
377 |
assert len(res.ous['created']) == 0 |
|
378 |
assert num_ous == OU.objects.count() |
|
379 |
assert ou == OU.objects.get(uuid=uuid) |
tests/test_import_export_site_cmd.py | ||
---|---|---|
1 |
import json |
|
2 | ||
3 |
from django.core import management |
|
4 |
import pytest |
|
5 | ||
6 |
from django_rbac.utils import get_role_model |
|
7 | ||
8 | ||
9 |
def dummy_export_site(*args): |
|
10 |
return {'roles': [{'name': 'role1'}]} |
|
11 | ||
12 | ||
13 |
def test_export_role_cmd_stdout(db, capsys, monkeypatch): |
|
14 |
import authentic2.management.commands.export_site |
|
15 |
monkeypatch.setattr( |
|
16 |
authentic2.management.commands.export_site, 'export_site', dummy_export_site) |
|
17 |
management.call_command('export_site') |
|
18 |
out, err = capsys.readouterr() |
|
19 |
assert json.loads(out) == dummy_export_site() |
|
20 | ||
21 | ||
22 |
def test_export_role_cmd_to_file(db, monkeypatch, tmpdir): |
|
23 |
import authentic2.management.commands.export_site |
|
24 |
monkeypatch.setattr( |
|
25 |
authentic2.management.commands.export_site, 'export_site', dummy_export_site) |
|
26 |
outfile = tmpdir.join('export.json') |
|
27 |
management.call_command('export_site', '--output', outfile.strpath) |
|
28 |
with outfile.open('r') as f: |
|
29 |
assert json.loads(f.read()) == dummy_export_site() |
|
30 | ||
31 | ||
32 |
def test_import_site_cmd(db, tmpdir, monkeypatch): |
|
33 |
export_file = tmpdir.join('roles-export.json') |
|
34 |
with export_file.open('w'): |
|
35 |
export_file.write(json.dumps({'roles': []})) |
|
36 |
management.call_command('import_site', export_file.strpath) |
|
37 | ||
38 | ||
39 |
def test_import_site_cmd_infos_on_stdout(db, tmpdir, monkeypatch, capsys): |
|
40 |
export_file = tmpdir.join('roles-export.json') |
|
41 |
with export_file.open('w'): |
|
42 |
export_file.write(json.dumps( |
|
43 |
{'roles': [{ |
|
44 |
'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name', |
|
45 |
'natural_key': ['role-slug', None, None]}]})) |
|
46 | ||
47 |
management.call_command('import_site', export_file.strpath) |
|
48 | ||
49 |
out, err = capsys.readouterr() |
|
50 |
assert "Dry run" in out |
|
51 |
assert "1 roles created" in out |
|
52 |
assert "0 roles updated" in out |
|
53 | ||
54 | ||
55 |
def test_import_site_transaction_rollback_on_error(db, tmpdir, monkeypatch, capsys): |
|
56 |
export_file = tmpdir.join('roles-export.json') |
|
57 |
with export_file.open('w'): |
|
58 |
export_file.write(json.dumps({'roles': []})) |
|
59 | ||
60 |
Role = get_role_model() |
|
61 | ||
62 |
def exception_import_site(*args): |
|
63 |
Role.objects.create(slug='role-slug') |
|
64 |
raise Exception() |
|
65 | ||
66 |
import authentic2.management.commands.import_site |
|
67 |
monkeypatch.setattr( |
|
68 |
authentic2.management.commands.import_site, 'import_site', exception_import_site) |
|
69 | ||
70 |
with pytest.raises(Exception): |
|
71 |
management.call_command('import_site', '--no-dry-run', export_file.strpath) |
|
72 | ||
73 |
with pytest.raises(Role.DoesNotExist): |
|
74 |
Role.objects.get(slug='role-slug') |
|
75 | ||
76 | ||
77 |
def test_import_site_transaction_rollback_on_dry_run(db, tmpdir, monkeypatch, capsys): |
|
78 |
export_file = tmpdir.join('roles-export.json') |
|
79 |
with export_file.open('w'): |
|
80 |
export_file.write(json.dumps( |
|
81 |
{'roles': [{ |
|
82 |
'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name', |
|
83 |
'natural_key': ['role-slug', None, None]}]})) |
|
84 | ||
85 |
Role = get_role_model() |
|
86 | ||
87 |
management.call_command('import_site', export_file.strpath) |
|
88 | ||
89 |
with pytest.raises(Role.DoesNotExist): |
|
90 |
Role.objects.get(slug='role-slug') |
|
91 | ||
92 | ||
93 |
def test_import_site_cmd_unhandled_context_option(db, tmpdir, monkeypatch, capsys): |
|
94 |
from authentic2.data_transfer import DataImportError |
|
95 | ||
96 |
export_file = tmpdir.join('roles-export.json') |
|
97 |
with export_file.open('w'): |
|
98 |
export_file.write(json.dumps( |
|
99 |
{'roles': [{ |
|
100 |
'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name', |
|
101 |
'natural_key': ['role-slug', None, None]}]})) |
|
102 | ||
103 |
get_role_model().objects.create(uuid='dqfewrvesvews2532', slug='role-slug', name='role-name') |
|
104 | ||
105 |
with pytest.raises(DataImportError): |
|
106 |
management.call_command('import_site', '-o', 'role-delete-orphans', export_file.strpath) |
|
107 | ||
108 | ||
109 |
def test_import_site_cmd_unknown_context_option(db, tmpdir, monkeypatch, capsys): |
|
110 |
from django.core.management.base import CommandError |
|
111 |
export_file = tmpdir.join('roles-export.json') |
|
112 |
with pytest.raises(CommandError): |
|
113 |
management.call_command('import_site', '-o', 'unknown-option', export_file.strpath) |
|
0 |
- |