0001-import_site-export_site-commands-16514.patch
src/authentic2/a2_rbac/models.py | ||
---|---|---|
22 | 22 | |
23 | 23 |
from . import managers, fields |
24 | 24 | |
25 |
SIMPLE_SERIALIZABLE_FIELDS = (models.TextField, models.CharField, models.SlugField, |
|
26 |
models.URLField, models.BooleanField, models.IntegerField, |
|
27 |
models.CommaSeparatedIntegerField, models.EmailField, |
|
28 |
models.IntegerField, models.PositiveIntegerField) |
|
29 | ||
25 | 30 | |
26 | 31 |
class OrganizationalUnit(OrganizationalUnitAbstractBase): |
27 | 32 |
username_is_unique = models.BooleanField( |
... | ... | |
207 | 212 |
'ou__slug': self.ou.slug if self.ou else None, |
208 | 213 |
} |
209 | 214 | |
215 |
def export_json(self, attributes=False, parents=False, relations=False): |
|
216 |
d = {} |
|
217 |
concrete_fields = [f for f in self.__class__._meta.get_fields() |
|
218 |
if f.concrete and not f.is_relation] |
|
219 |
for field in concrete_fields: |
|
220 |
if field.name == 'id': |
|
221 |
continue |
|
222 |
value = getattr(self, field.attname) |
|
223 |
if isinstance(field, SIMPLE_SERIALIZABLE_FIELDS): |
|
224 |
d[field.name] = value |
|
225 |
else: |
|
226 |
raise Exception('export_json: field %s of ressource class %s is unsupported' % ( |
|
227 |
field, self.__class__)) |
|
228 | ||
229 |
if relations: |
|
230 |
for attr_name in ('ou', 'service'): |
|
231 |
related_obj = getattr(self, attr_name) |
|
232 |
if related_obj: |
|
233 |
for related_attr in ('slug', 'uuid', 'name'): |
|
234 |
if hasattr(related_obj, related_attr): |
|
235 |
d['%s_%s' % (attr_name, related_attr)] = getattr( |
|
236 |
related_obj, related_attr) |
|
237 | ||
238 |
if attributes: |
|
239 |
for attribute in self.attributes.all(): |
|
240 |
d.setdefault('attributes', []).append(attribute.to_json()) |
|
241 | ||
242 |
if parents: |
|
243 |
RoleParenting = rbac_utils.get_role_parenting_model() |
|
244 |
for parenting in RoleParenting.objects.filter(child_id=self.id, direct=True): |
|
245 |
d.setdefault('parents', []).append(parenting.parent.export_json()) |
|
246 | ||
247 |
return d |
|
248 | ||
249 |
@classmethod |
|
250 |
def import_json(cls, d): |
|
251 |
kwargs = {} |
|
252 |
concrete_fields = [f for f in cls._meta.get_fields() |
|
253 |
if f.concrete and not f.is_relation] |
|
254 |
for field in concrete_fields: |
|
255 |
if field.name == 'id': |
|
256 |
continue |
|
257 |
if isinstance(field, SIMPLE_SERIALIZABLE_FIELDS) and field.name in d: |
|
258 |
kwargs[field.name] = d[field.name] |
|
259 | ||
260 |
return cls.objects.create(**kwargs) |
|
261 | ||
210 | 262 | |
211 | 263 |
class RoleParenting(RoleParentingAbstractBase): |
212 | 264 |
class Meta(RoleParentingAbstractBase.Meta): |
... | ... | |
239 | 291 |
('role', 'name', 'kind', 'value'), |
240 | 292 |
) |
241 | 293 | |
294 |
def to_json(self): |
|
295 |
return {'name': self.name, 'kind': self.kind, 'value': self.value} |
|
296 | ||
297 | ||
242 | 298 |
GenericRelation(Permission, |
243 | 299 |
content_type_field='target_ct', |
244 | 300 |
object_id_field='target_id').contribute_to_class(ContentType, 'admin_perms') |
src/authentic2/data_transfer.py | ||
---|---|---|
1 |
from django_rbac.utils import get_ou_model, get_role_model, get_role_parenting_model |
|
2 |
from authentic2.a2_rbac.models import RoleAttribute |
|
3 |
from authentic2.models import Service |
|
4 | ||
5 | ||
6 |
def export_roles(role_queryset): |
|
7 |
""" Serialize every role in queryset |
|
8 |
""" |
|
9 |
roles = [] |
|
10 |
for role in role_queryset: |
|
11 |
roles.append(role.export_json(attributes=True, parents=True, relations=True)) |
|
12 |
return {'roles': roles} |
|
13 | ||
14 | ||
15 |
class DBSearchStrategy(object): |
|
16 |
"""DB search strategy. |
|
17 |
model_cls objects will be searched using the specified search_attrs, |
|
18 |
one at a time. |
|
19 |
""" |
|
20 |
def __init__(self, model_cls, search_attrs): |
|
21 |
self._model_cls = model_cls |
|
22 |
self.search_attrs = search_attrs |
|
23 | ||
24 |
@property |
|
25 |
def model_name(self): |
|
26 |
return self._model_cls._meta.model_name |
|
27 | ||
28 |
def search(self, d): |
|
29 |
for attr in self.search_attrs: |
|
30 |
if attr in d: |
|
31 |
try: |
|
32 |
return self._model_cls.objects.get(**{attr: d[attr]}) |
|
33 |
except self._model_cls.DoesNotExist: |
|
34 |
pass |
|
35 |
return None |
|
36 | ||
37 | ||
38 |
class RoleRelatedObj(object): |
|
39 |
""" |
|
40 |
service or ou |
|
41 |
""" |
|
42 | ||
43 |
def __init__(self, data, searcher): |
|
44 |
self._data = data |
|
45 |
self._searcher = searcher |
|
46 |
self._obj = None |
|
47 |
self._checked = False |
|
48 | ||
49 |
def __eq__(self, other): |
|
50 |
return hash(self) == hash(other) |
|
51 | ||
52 |
def __hash__(self): |
|
53 |
to_hash = [self.model_name] |
|
54 |
for attr in self._searcher.search_attrs: |
|
55 |
to_hash.extend([attr, self._data[attr]]) |
|
56 |
return hash(tuple(to_hash)) |
|
57 | ||
58 |
def __str__(self): |
|
59 |
res = self.model_name |
|
60 |
for attr in self._searcher.search_attrs: |
|
61 |
res += " <%s: %s>" % (attr, self._data[attr]) |
|
62 |
return res |
|
63 | ||
64 |
@property |
|
65 |
def model_name(self): |
|
66 |
return self._searcher.model_name |
|
67 | ||
68 |
@property |
|
69 |
def obj(self): |
|
70 |
""" Return the matching db obj (None if there is no match) |
|
71 |
""" |
|
72 |
if self._checked: |
|
73 |
return self._obj |
|
74 |
self._obj = self._searcher.search(self._data) |
|
75 |
self._checked = True |
|
76 |
return self._obj |
|
77 | ||
78 | ||
79 |
class ImportContext(object): |
|
80 |
""" Holds information on how to perform the import. |
|
81 | ||
82 |
role_update : if True, an already existing role will be checked and potentially |
|
83 |
updated to match exactly what is described in the json export. |
|
84 |
Service and OU relation could be added, changed or deleted |
|
85 |
RoleAttributes could be added, changed our deleted |
|
86 |
RoleParenting could be added, changed our deleted |
|
87 | ||
88 |
role_delete_orphans: if True any existing role that is not found in the export will |
|
89 |
be deleted |
|
90 |
""" |
|
91 | ||
92 |
def __init__(self, role_update=False, role_delete_orphans=False): |
|
93 |
self.role_update = role_update |
|
94 |
self.role_delete_orphans = role_delete_orphans |
|
95 | ||
96 | ||
97 |
class DataImportError(Exception): |
|
98 |
pass |
|
99 | ||
100 | ||
101 |
class MissingRelated(DataImportError): |
|
102 |
pass |
|
103 | ||
104 | ||
105 |
class RoleDeserializer(object): |
|
106 | ||
107 |
def __init__(self, d, import_context): |
|
108 |
self._role_data = {} |
|
109 |
self._import_context = import_context |
|
110 |
self.ou = None |
|
111 |
self.service = None |
|
112 |
self.obj = None |
|
113 |
self._role_created = False |
|
114 |
self._role_updated = False |
|
115 |
self._role_noops = False |
|
116 |
self.parents = None |
|
117 |
self.attributes = None |
|
118 |
self._role_cls = get_role_model() |
|
119 |
self._searcher = DBSearchStrategy(self._role_cls, ('slug', 'uuid', 'name')) |
|
120 | ||
121 |
ou_d = {} |
|
122 |
service_d = {} |
|
123 |
for key, value in d.items(): |
|
124 |
if key.startswith('ou_'): |
|
125 |
ou_d[key[3:]] = value |
|
126 |
elif key.startswith('service_'): |
|
127 |
service_d[key[8:]] = value |
|
128 |
elif key == 'parents': |
|
129 |
self.parents = value |
|
130 |
elif key == 'attributes': |
|
131 |
self.attributes = value |
|
132 |
else: |
|
133 |
self._role_data[key] = value |
|
134 |
if ou_d: |
|
135 |
self.ou = RoleRelatedObj( |
|
136 |
ou_d, DBSearchStrategy(get_ou_model(), ('slug', 'uuid', 'name'))) |
|
137 |
if service_d: |
|
138 |
self.service = RoleRelatedObj(service_d, DBSearchStrategy(Service, ('slug', 'name'))) |
|
139 | ||
140 |
def _deserialize_status(self): |
|
141 |
if self._role_created: |
|
142 |
return 'created' |
|
143 |
if self._role_updated: |
|
144 |
return 'updated' |
|
145 |
if self._role_noops: |
|
146 |
return 'noops' |
|
147 | ||
148 |
def check_related(self): |
|
149 |
missing_related = [] |
|
150 |
for related in (self.ou, self.service): |
|
151 |
if related is not None and related.obj is None: |
|
152 |
missing_related.append(related) |
|
153 |
return missing_related |
|
154 | ||
155 |
def deserialize(self): |
|
156 |
obj = self._searcher.search(self._role_data) |
|
157 |
if obj: # Role already exist |
|
158 |
self._role_noops = True |
|
159 |
self.obj = obj |
|
160 |
if self._import_context.role_update: |
|
161 |
# FIXME : do the job instead of crashing |
|
162 |
# check ou and service |
|
163 |
# check role attributes |
|
164 |
self._role_updated = True |
|
165 |
raise DataImportError( |
|
166 |
"Unsupported context value for role_update : %s" % ( |
|
167 |
self._import_context.role_update)) |
|
168 |
else: # Create role |
|
169 |
kwargs = self._role_data.copy() |
|
170 |
self.obj = self._role_cls.import_json(kwargs) |
|
171 | ||
172 |
if self.ou or self.service: |
|
173 |
if self.ou: |
|
174 |
self.obj.ou = self.ou.obj |
|
175 |
if self.service: |
|
176 |
self.obj.service = self.service.obj |
|
177 |
self.obj.save() |
|
178 | ||
179 |
# Create attributes |
|
180 |
if self.attributes: |
|
181 |
for attr_dict in self.attributes: |
|
182 |
attr_dict['role'] = self.obj |
|
183 |
RoleAttribute.objects.create(**attr_dict) |
|
184 |
self._role_created = True |
|
185 | ||
186 |
return self.obj, self._deserialize_status() |
|
187 | ||
188 |
def parentings(self): |
|
189 |
created, updated, deleted = [], [], [] |
|
190 |
if self.parents: |
|
191 |
if self._role_created: |
|
192 |
# Create parenting |
|
193 |
for parent_d in self.parents: |
|
194 |
if not parent_d['slug'].startswith('_'): |
|
195 |
parent = self._searcher.search(parent_d) |
|
196 |
if not parent: |
|
197 |
raise DataImportError("Could not find role : %s" % parent_d) |
|
198 |
created.append(get_role_parenting_model().objects.create( |
|
199 |
child=self.obj, direct=True, parent=parent)) |
|
200 |
else: |
|
201 |
if self._import_context.role_update: |
|
202 |
# FIXME : update parenting instead of crashing |
|
203 |
raise DataImportError( |
|
204 |
"Unsupported context value for role_update : %s" % ( |
|
205 |
self._import_context.role_update)) |
|
206 | ||
207 |
return created, updated, deleted |
|
208 | ||
209 | ||
210 |
class ImportResult(object): |
|
211 | ||
212 |
def __init__(self): |
|
213 |
self.roles = {'created': [], 'updated': [], 'noops': []} |
|
214 |
self.parentings = {'created': [], 'updated': [], 'deleted': []} |
|
215 |
self.missing_related = {} |
|
216 | ||
217 |
def update_roles(self, role, d_status): |
|
218 |
self.roles[d_status].append(role) |
|
219 | ||
220 |
def update_parentings(self, created, updated, deleted): |
|
221 |
self.parentings['created'].extend(created) |
|
222 |
self.parentings['updated'].extend(updated) |
|
223 |
self.parentings['deleted'].extend(deleted) |
|
224 | ||
225 |
def update_missing_related(self, related_l): |
|
226 |
for related in related_l: |
|
227 |
self.missing_related.setdefault(related.model_name, set()).add(related) |
|
228 | ||
229 |
@property |
|
230 |
def success(self): |
|
231 |
return not self.missing_related |
|
232 | ||
233 | ||
234 |
def import_roles(roles_list, import_context): |
|
235 |
result = ImportResult() |
|
236 |
roles_ds = [RoleDeserializer(role_d, import_context) for role_d in roles_list |
|
237 |
if not role_d['slug'].startswith('_')] |
|
238 | ||
239 |
for ds in roles_ds: |
|
240 |
result.update_missing_related(ds.check_related()) |
|
241 | ||
242 |
if not result.success: # Some related objs are missing |
|
243 |
return result |
|
244 | ||
245 |
for ds in roles_ds: |
|
246 |
result.update_roles(*ds.deserialize()) |
|
247 | ||
248 |
for ds in roles_ds: |
|
249 |
result.update_parentings(*ds.parentings()) |
|
250 | ||
251 |
if import_context.role_delete_orphans: |
|
252 |
# FIXME : delete each role that is in DB but not in the export |
|
253 |
raise DataImportError( |
|
254 |
"Unsupported context value for role_delete_orphans : %s" % ( |
|
255 |
import_context.role_delete_orphans)) |
|
256 | ||
257 |
return result |
src/authentic2/management/commands/export_site.py | ||
---|---|---|
1 |
import json |
|
2 |
import sys |
|
3 | ||
4 |
from django.core.management.base import BaseCommand |
|
5 | ||
6 |
from authentic2.data_transfer import export_roles |
|
7 |
from django_rbac.utils import get_role_model |
|
8 | ||
9 | ||
10 |
class Command(BaseCommand): |
|
11 |
help = 'Export site' |
|
12 | ||
13 |
def add_arguments(self, parser): |
|
14 |
parser.add_argument('--output', metavar='FILE', default=None, |
|
15 |
help='name of a file to write output to') |
|
16 | ||
17 |
def handle(self, *args, **options): |
|
18 |
if options['output']: |
|
19 |
output, close = open(options['output'], 'w'), True |
|
20 |
else: |
|
21 |
output, close = sys.stdout, False |
|
22 |
json.dump(export_roles( |
|
23 |
get_role_model().objects.exclude(slug__startswith='_').all()), output, indent=4) |
|
24 |
if close: |
|
25 |
output.close() |
src/authentic2/management/commands/import_site.py | ||
---|---|---|
1 |
import json |
|
2 |
import sys |
|
3 | ||
4 |
from django.core.management.base import BaseCommand |
|
5 |
from django.db import transaction |
|
6 | ||
7 | ||
8 |
from authentic2.data_transfer import import_roles, ImportContext |
|
9 | ||
10 | ||
11 |
class Command(BaseCommand): |
|
12 |
help = 'Import site' |
|
13 | ||
14 |
def add_arguments(self, parser): |
|
15 |
parser.add_argument('filename', metavar='FILENAME', type=str, |
|
16 |
help='name of file to import') |
|
17 | ||
18 |
def handle(self, filename, **options): |
|
19 |
with open(filename, 'r') as f: |
|
20 |
with transaction.atomic(): |
|
21 |
result = import_roles(json.load(f)['roles'], ImportContext()) |
|
22 |
if not result.success: |
|
23 |
sys.stderr.write("Error : could not import roles due to missing related objects\n") |
|
24 |
for relation, objs in result.missing_related.items(): |
|
25 |
for obj in objs: |
|
26 |
sys.stderr.write("Missing %s \n" % obj) |
|
27 |
sys.exit(1) |
|
28 | ||
29 |
sys.stdout.write("Success\n") |
|
30 |
for ops, roles in result.roles.items(): |
|
31 |
sys.stdout.write("%s roles %s \n" % (len(roles), ops)) |
tests/test_a2_rbac.py | ||
---|---|---|
1 | 1 |
import pytest |
2 | 2 | |
3 |
from authentic2.a2_rbac.models import Role, OrganizationalUnit as OU, RoleAttribute |
|
3 | 4 |
from authentic2.models import Service |
4 |
from authentic2.a2_rbac.models import Role, OrganizationalUnit as OU
|
|
5 |
from authentic2.utils import get_hex_uuid
|
|
5 | 6 | |
6 | 7 | |
7 | 8 |
def test_role_natural_key(db): |
... | ... | |
24 | 25 |
Role.objects.get_by_natural_key(*r2.natural_key()) |
25 | 26 |
with pytest.raises(Role.DoesNotExist): |
26 | 27 |
Role.objects.get_by_natural_key(*r4.natural_key()) |
28 | ||
29 | ||
30 |
def test_basic_role_export_json(db): |
|
31 |
role = Role.objects.create( |
|
32 |
name='basic role', slug='basic-role', description='basic role description') |
|
33 |
role_dict = role.export_json() |
|
34 |
assert role_dict['name'] == role.name |
|
35 |
assert role_dict['slug'] == role.slug |
|
36 |
assert role_dict['uuid'] == role.uuid |
|
37 |
assert role_dict['description'] == role.description |
|
38 |
assert role_dict['admin_scope_id'] == role.admin_scope_id |
|
39 |
assert role_dict['external_id'] == role.external_id |
|
40 | ||
41 | ||
42 |
def test_role_with_ou_export_json(db): |
|
43 |
ou = OU.objects.create(name='ou', slug='ou') |
|
44 |
role = Role.objects.create(name='some role', ou=ou) |
|
45 |
role_dict = role.export_json(relations=True) |
|
46 |
assert role_dict['ou_uuid'] == ou.uuid |
|
47 |
assert role_dict['ou_name'] == ou.name |
|
48 |
assert role_dict['ou_slug'] == ou.slug |
|
49 | ||
50 | ||
51 |
def test_role_with_service_export_json(db): |
|
52 |
service = Service.objects.create(name='service name', slug='service-name') |
|
53 |
role = Role.objects.create(name='some role', service=service) |
|
54 |
role_dict = role.export_json(relations=True) |
|
55 |
assert role_dict['service_name'] == service.name |
|
56 |
assert role_dict['service_slug'] == service.slug |
|
57 | ||
58 | ||
59 |
def test_role_with_attributes_export_json(db): |
|
60 |
role = Role.objects.create(name='some role') |
|
61 |
attr1 = RoleAttribute.objects.create( |
|
62 |
role=role, name='attr1_name', kind='string', value='attr1_value') |
|
63 |
attr2 = RoleAttribute.objects.create( |
|
64 |
role=role, name='attr2_name', kind='string', value='attr2_value') |
|
65 | ||
66 |
role_dict = role.export_json(attributes=True) |
|
67 |
attributes = role_dict['attributes'] |
|
68 |
assert len(attributes) == 2 |
|
69 | ||
70 |
expected_attr_names = set([attr1.name, attr2.name]) |
|
71 |
for attr_dict in attributes: |
|
72 |
assert attr_dict['name'] in expected_attr_names |
|
73 |
expected_attr_names.remove(attr_dict['name']) |
|
74 |
target_attr = RoleAttribute.objects.filter(name=attr_dict['name']).first() |
|
75 |
assert attr_dict['kind'] == target_attr.kind |
|
76 |
assert attr_dict['value'] == target_attr.value |
|
77 | ||
78 | ||
79 |
def test_role_with_parents_export_json(db): |
|
80 |
grand_parent_role = Role.objects.create( |
|
81 |
name='test grand parent role', slug='test-grand-parent-role') |
|
82 |
parent_1_role = Role.objects.create( |
|
83 |
name='test parent 1 role', slug='test-parent-1-role') |
|
84 |
parent_1_role.add_parent(grand_parent_role) |
|
85 |
parent_2_role = Role.objects.create( |
|
86 |
name='test parent 2 role', slug='test-parent-2-role') |
|
87 |
parent_2_role.add_parent(grand_parent_role) |
|
88 |
child_role = Role.objects.create( |
|
89 |
name='test child role', slug='test-child-role') |
|
90 |
child_role.add_parent(parent_1_role) |
|
91 |
child_role.add_parent(parent_2_role) |
|
92 | ||
93 |
child_role_dict = child_role.export_json(parents=True) |
|
94 |
assert child_role_dict['slug'] == child_role.slug |
|
95 |
parents = child_role_dict['parents'] |
|
96 |
assert len(parents) == 2 |
|
97 |
expected_slugs = set([parent_1_role.slug, parent_2_role.slug]) |
|
98 |
for parent in parents: |
|
99 |
assert parent['slug'] in expected_slugs |
|
100 |
expected_slugs.remove(parent['slug']) |
|
101 | ||
102 |
grand_parent_role_dict = grand_parent_role.export_json(parents=True) |
|
103 |
assert grand_parent_role_dict['slug'] == grand_parent_role.slug |
|
104 |
assert 'parents' not in grand_parent_role_dict |
|
105 | ||
106 |
parent_1_role_dict = parent_1_role.export_json(parents=True) |
|
107 |
assert parent_1_role_dict['slug'] == parent_1_role.slug |
|
108 |
parents = parent_1_role_dict['parents'] |
|
109 |
assert len(parents) == 1 |
|
110 |
assert parents[0]['slug'] == grand_parent_role.slug |
|
111 | ||
112 |
parent_2_role_dict = parent_2_role.export_json(parents=True) |
|
113 |
assert parent_2_role_dict['slug'] == parent_2_role.slug |
|
114 |
parents = parent_2_role_dict['parents'] |
|
115 |
assert len(parents) == 1 |
|
116 |
assert parents[0]['slug'] == grand_parent_role.slug |
|
117 | ||
118 | ||
119 |
def test_basic_role_import_json(db): |
|
120 |
role_dict = dict( |
|
121 |
name='basic role', slug='basic-role', description='basic role description', |
|
122 |
uuid=get_hex_uuid(), admin_scope_id=1, external_id='some id') |
|
123 |
role = Role.import_json(role_dict) |
|
124 |
assert role_dict['name'] == role.name |
|
125 |
assert role_dict['slug'] == role.slug |
|
126 |
assert role_dict['uuid'] == role.uuid |
|
127 |
assert role_dict['description'] == role.description |
|
128 |
assert role_dict['admin_scope_id'] == role.admin_scope_id |
|
129 |
assert role_dict['external_id'] == role.external_id |
tests/test_data_transfer.py | ||
---|---|---|
1 |
from django_rbac.utils import get_role_model, get_ou_model |
|
2 |
import pytest |
|
3 | ||
4 |
from authentic2.a2_rbac.models import RoleParenting |
|
5 |
from authentic2.data_transfer import ( |
|
6 |
DataImportError, DBSearchStrategy, export_roles, import_roles, ImportContext, ImportResult, |
|
7 |
RoleDeserializer, RoleRelatedObj) |
|
8 |
from authentic2.models import Service |
|
9 |
from authentic2.utils import get_hex_uuid |
|
10 | ||
11 | ||
12 |
Role = get_role_model() |
|
13 |
OU = get_ou_model() |
|
14 | ||
15 | ||
16 |
def test_export_basic_role(db): |
|
17 |
role = Role.objects.create(name='basic role', slug='basic-role', uuid=get_hex_uuid()) |
|
18 |
query_set = Role.objects.filter(uuid=role.uuid) |
|
19 |
export = export_roles(query_set) |
|
20 |
roles = export['roles'] |
|
21 |
assert len(roles) == 1 |
|
22 |
role_dict = roles[0] |
|
23 |
for key, value in role.export_json().items(): |
|
24 |
assert role_dict[key] == value |
|
25 | ||
26 | ||
27 |
def test_export_role_with_parents(db): |
|
28 |
grand_parent_role = Role.objects.create( |
|
29 |
name='test grand parent role', slug='test-grand-parent-role', uuid=get_hex_uuid()) |
|
30 |
parent_1_role = Role.objects.create( |
|
31 |
name='test parent 1 role', slug='test-parent-1-role', uuid=get_hex_uuid()) |
|
32 |
parent_1_role.add_parent(grand_parent_role) |
|
33 |
parent_2_role = Role.objects.create( |
|
34 |
name='test parent 2 role', slug='test-parent-2-role', uuid=get_hex_uuid()) |
|
35 |
parent_2_role.add_parent(grand_parent_role) |
|
36 |
child_role = Role.objects.create( |
|
37 |
name='test child role', slug='test-child-role', uuid=get_hex_uuid()) |
|
38 |
child_role.add_parent(parent_1_role) |
|
39 |
child_role.add_parent(parent_2_role) |
|
40 | ||
41 |
query_set = Role.objects.filter(slug__startswith='test').order_by('slug') |
|
42 |
export = export_roles(query_set) |
|
43 |
roles = export['roles'] |
|
44 |
assert len(roles) == 4 |
|
45 | ||
46 |
child_role_dict = roles[0] |
|
47 |
assert child_role_dict['slug'] == child_role.slug |
|
48 |
parents = child_role_dict['parents'] |
|
49 |
assert len(parents) == 2 |
|
50 |
expected_slugs = set([parent_1_role.slug, parent_2_role.slug]) |
|
51 |
for parent in parents: |
|
52 |
assert parent['slug'] in expected_slugs |
|
53 |
expected_slugs.remove(parent['slug']) |
|
54 | ||
55 |
grand_parent_role_dict = roles[1] |
|
56 |
assert grand_parent_role_dict['slug'] == grand_parent_role.slug |
|
57 | ||
58 |
parent_1_role_dict = roles[2] |
|
59 |
assert parent_1_role_dict['slug'] == parent_1_role.slug |
|
60 |
parents = parent_1_role_dict['parents'] |
|
61 |
assert len(parents) == 1 |
|
62 |
assert parents[0]['slug'] == grand_parent_role.slug |
|
63 | ||
64 |
parent_2_role_dict = roles[3] |
|
65 |
assert parent_2_role_dict['slug'] == parent_2_role.slug |
|
66 |
parents = parent_2_role_dict['parents'] |
|
67 |
assert len(parents) == 1 |
|
68 |
assert parents[0]['slug'] == grand_parent_role.slug |
|
69 | ||
70 | ||
71 |
def test_db_search(db): |
|
72 |
role_search = DBSearchStrategy(Role, ('slug', 'uuid', 'name')) |
|
73 | ||
74 |
assert role_search.model_name == Role._meta.model_name |
|
75 | ||
76 |
role = Role.objects.create(slug='slug1') |
|
77 |
assert role == role_search.search({'slug': 'slug1'}) |
|
78 | ||
79 |
role = Role.objects.create(slug='slug2') |
|
80 |
assert role == role_search.search({'slug': 'non-matching-slug', 'uuid': role.uuid}) |
|
81 | ||
82 |
role = Role.objects.create(slug='slug3', name='some name') |
|
83 |
assert role == role_search.search( |
|
84 |
{'slug': 'non-matching-slug', 'uuid': get_hex_uuid(), 'name': 'some name'}) |
|
85 | ||
86 |
role = Role.objects.create(slug='slug4', name='some name') |
|
87 |
obj = role_search.search( |
|
88 |
{'slug': 'non-matching-slug', 'uuid': get_hex_uuid(), 'name': 'non matching name'}) |
|
89 |
assert obj is None |
|
90 | ||
91 | ||
92 |
def test_db_search_accept_missing_data(db): |
|
93 |
role_search = DBSearchStrategy(Role, ('slug', 'uuid', 'name')) |
|
94 |
Role.objects.create(slug='slug1') |
|
95 |
assert role_search.search({}) is None |
|
96 | ||
97 | ||
98 |
def test_role_related_obj(db): |
|
99 |
ou = OU.objects.create(slug='some-ou', name='some ou', uuid=get_hex_uuid()) |
|
100 |
ou_search = DBSearchStrategy(OU, ('slug', 'uuid', 'name')) |
|
101 |
rro = RoleRelatedObj({'slug': ou.slug, 'uuid': ou.uuid, 'name': ou.name}, ou_search) |
|
102 |
assert rro.model_name == OU._meta.model_name |
|
103 |
assert rro.obj == ou |
|
104 |
assert hash(rro) == hash(rro) |
|
105 |
assert str(rro) == "%s <slug: some-ou> <uuid: %s> <name: some ou>" % (rro.model_name, ou.uuid) |
|
106 | ||
107 |
rro = RoleRelatedObj( |
|
108 |
{'slug': 'unkown-slug', 'uuid': get_hex_uuid(), 'name': 'unkown name'}, ou_search) |
|
109 |
assert rro.obj is None |
|
110 | ||
111 | ||
112 |
def test_import_result(db): |
|
113 |
ou_search = DBSearchStrategy(OU, ('slug', 'uuid', 'name')) |
|
114 |
rro = RoleRelatedObj({'slug': 'ou-slug', 'uuid': 'asefdvsdvdsv', 'name': 'ou name'}, ou_search) |
|
115 |
imp_res = ImportResult() |
|
116 |
imp_res.update_missing_related([rro]) |
|
117 |
assert rro in imp_res.missing_related[rro.model_name] |
|
118 | ||
119 | ||
120 |
def test_role_deserializer(db): |
|
121 |
rd = RoleDeserializer({ |
|
122 |
'name': 'some role', 'description': 'some role description', 'slug': 'some-role', |
|
123 |
'uuid': get_hex_uuid()}, ImportContext()) |
|
124 |
assert rd.ou is None |
|
125 |
assert rd.service is None |
|
126 |
assert rd.parents is None |
|
127 |
assert rd.attributes is None |
|
128 |
assert rd.obj is None |
|
129 |
role, status = rd.deserialize() |
|
130 |
assert status == 'created' |
|
131 |
assert role.name == 'some role' |
|
132 |
assert role.description == 'some role description' |
|
133 |
assert role.slug == 'some-role' |
|
134 |
assert rd.obj == role |
|
135 | ||
136 | ||
137 |
def test_role_deserializer_with_ou(db): |
|
138 |
ou = get_ou_model().objects.create(name='some ou', slug='some-ou') |
|
139 |
rd = RoleDeserializer({ |
|
140 |
'name': 'some role', 'description': 'some role description', 'slug': 'some-role', |
|
141 |
'ou_name': 'some ou', 'ou_slug': 'some-ou'}, ImportContext()) |
|
142 |
role, status = rd.deserialize() |
|
143 |
assert role.ou == ou |
|
144 | ||
145 | ||
146 |
def test_role_deserializer_missing_ou(db): |
|
147 |
rd = RoleDeserializer({ |
|
148 |
'name': 'some role', 'description': 'role description', 'slug': 'some-role', |
|
149 |
'ou_name': 'some ou', 'ou_slug': 'some-ou'}, ImportContext()) |
|
150 |
missing_related = rd.check_related() |
|
151 |
assert len(missing_related) == 1 |
|
152 |
assert missing_related[0]._data == {'slug': 'some-ou', 'name': 'some ou'} |
|
153 | ||
154 | ||
155 |
def test_role_deserializer_with_service(db): |
|
156 |
service = Service.objects.create(name='some service', slug='some-service') |
|
157 |
rd = RoleDeserializer({ |
|
158 |
'name': 'some role', 'description': 'some role description', 'slug': 'some-role', |
|
159 |
'service_name': service.name, 'service_slug': service.slug}, ImportContext()) |
|
160 |
role, status = rd.deserialize() |
|
161 |
assert role.service == service |
|
162 | ||
163 | ||
164 |
def test_role_deserializer_missing_service(db): |
|
165 |
rd = RoleDeserializer({ |
|
166 |
'name': 'some role', 'description': 'some role description', 'slug': 'some-role', |
|
167 |
'service_name': 'some service', 'service_slug': 'some-service'}, ImportContext()) |
|
168 |
missing_related = rd.check_related() |
|
169 |
assert len(missing_related) == 1 |
|
170 |
assert missing_related[0]._data == {'slug': 'some-service', 'name': 'some service'} |
|
171 | ||
172 | ||
173 |
def test_role_deserializer_with_attributes(db): |
|
174 | ||
175 |
attributes_data = { |
|
176 |
'attr1_name': dict(name='attr1_name', kind='string', value='attr1_value'), |
|
177 |
'attr2_name': dict(name='attr2_name', kind='string', value='attr2_value') |
|
178 |
} |
|
179 |
rd = RoleDeserializer({ |
|
180 |
'name': 'some role', 'description': 'some role description', 'slug': 'some-role', |
|
181 |
'attributes': list(attributes_data.values())}, ImportContext()) |
|
182 |
role, status = rd.deserialize() |
|
183 |
attributes = role.attributes.all() |
|
184 |
assert len(attributes) == 2 |
|
185 | ||
186 |
for attr in attributes: |
|
187 |
attr_dict = attributes_data[attr.name] |
|
188 |
assert attr_dict['name'] == attr.name |
|
189 |
assert attr_dict['kind'] == attr.kind |
|
190 |
assert attr_dict['value'] == attr.value |
|
191 |
del attributes_data[attr.name] |
|
192 | ||
193 | ||
194 |
def test_role_deserializer_role_update(db): |
|
195 |
role_dict = {'name': 'some role', 'slug': 'some-role'} |
|
196 |
Role.objects.create(**role_dict) |
|
197 |
rd = RoleDeserializer(role_dict, ImportContext(role_update=True)) |
|
198 |
with pytest.raises(DataImportError): |
|
199 |
rd.deserialize() |
|
200 | ||
201 | ||
202 |
def test_role_deserializer_parenting_existing_parent(db): |
|
203 |
parent_role_dict = {'name': 'grand parent role', 'slug': 'grand-parent-role'} |
|
204 |
parent_role = Role.objects.create(**parent_role_dict) |
|
205 |
child_role_dict = {'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict]} |
|
206 | ||
207 |
rd = RoleDeserializer(child_role_dict, ImportContext()) |
|
208 |
child_role, status = rd.deserialize() |
|
209 |
created, updated, deleted = rd.parentings() |
|
210 | ||
211 |
assert len(created) == 1 |
|
212 |
parenting = created[0] |
|
213 |
assert parenting.direct is True |
|
214 |
assert parenting.parent == parent_role |
|
215 |
assert parenting.child == child_role |
|
216 | ||
217 | ||
218 |
def test_role_deserializer_parenting_non_existing_parent(db): |
|
219 |
parent_role_dict = {'name': 'grand parent role', 'slug': 'grand-parent-role'} |
|
220 |
child_role_dict = {'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict]} |
|
221 |
rd = RoleDeserializer(child_role_dict, ImportContext()) |
|
222 |
rd.deserialize() |
|
223 |
with pytest.raises(DataImportError) as excinfo: |
|
224 |
rd.parentings() |
|
225 | ||
226 |
assert "Could not find role" in str(excinfo.value) |
|
227 | ||
228 | ||
229 |
def test_role_deserializer_parenting_role_update(db): |
|
230 |
parent_role_dict = {'name': 'grand parent role', 'slug': 'grand-parent-role'} |
|
231 |
child_role_dict = {'name': 'child role', 'slug': 'child-role'} |
|
232 |
Role.objects.create(**child_role_dict) |
|
233 |
child_role_dict['parents'] = [parent_role_dict] |
|
234 | ||
235 |
import_context = ImportContext(role_update=False) |
|
236 |
rd = RoleDeserializer(child_role_dict, import_context) |
|
237 |
rd.deserialize() |
|
238 | ||
239 |
with pytest.raises(DataImportError) as excinfo: |
|
240 |
import_context.role_update = True |
|
241 |
rd.parentings() |
|
242 | ||
243 |
assert "Unsupported context value" in str(excinfo.value) |
|
244 | ||
245 | ||
246 |
def test_empty_roles_import(): |
|
247 |
res = import_roles([], ImportContext()) |
|
248 |
assert res.success |
|
249 |
assert res.roles == {'created': [], 'updated': [], 'noops': []} |
|
250 |
assert res.parentings == {'created': [], 'updated': [], 'deleted': []} |
|
251 |
assert res.missing_related == {} |
|
252 | ||
253 | ||
254 |
def test_import_roles(db): |
|
255 |
parent_role_dict = {'name': 'grand parent role', 'slug': 'grand-parent-role'} |
|
256 |
child_role_dict = {'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict]} |
|
257 |
roles = [ |
|
258 |
parent_role_dict, |
|
259 |
child_role_dict |
|
260 |
] |
|
261 | ||
262 |
res = import_roles(roles, ImportContext()) |
|
263 |
assert res.success is True |
|
264 |
created_roles = res.roles['created'] |
|
265 |
assert len(created_roles) == 2 |
|
266 |
parent_role = Role.objects.get(**parent_role_dict) |
|
267 |
del child_role_dict['parents'] |
|
268 |
child_role = Role.objects.get(**child_role_dict) |
|
269 |
assert created_roles[0] == parent_role |
|
270 |
assert created_roles[1] == child_role |
|
271 | ||
272 |
assert len(res.parentings['created']) == 1 |
|
273 |
assert res.parentings['created'][0] == RoleParenting.objects.get( |
|
274 |
child=child_role, parent=parent_role, direct=True) |
|
275 | ||
276 | ||
277 |
def test_roles_import_ignore_technical_role(db): |
|
278 |
roles = [{ |
|
279 |
'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}] |
|
280 |
res = import_roles(roles, ImportContext()) |
|
281 |
assert res.success is True |
|
282 |
assert res.roles == {'created': [], 'updated': [], 'noops': []} |
|
283 | ||
284 | ||
285 |
def test_import_roles_role_delete_orphans(db, tmpdir, monkeypatch): |
|
286 |
roles = [{ |
|
287 |
'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}] |
|
288 |
with pytest.raises(DataImportError): |
|
289 |
import_roles(roles, ImportContext(role_delete_orphans=True)) |
tests/test_import_export_site_cmd.py | ||
---|---|---|
1 |
import json |
|
2 |
from mock import Mock |
|
3 | ||
4 |
from django.core import management |
|
5 |
import pytest |
|
6 | ||
7 |
from django_rbac.utils import get_role_model |
|
8 | ||
9 | ||
10 |
def dummy_export_roles(*args): |
|
11 |
return {'roles': [{'name': 'role1'}]} |
|
12 | ||
13 | ||
14 |
class DummyImportResult(object): |
|
15 | ||
16 |
def __init__(self, success, missing_related, roles): |
|
17 |
self.success = success |
|
18 |
self.missing_related = missing_related |
|
19 |
self.roles = roles |
|
20 | ||
21 | ||
22 |
def test_export_role_cmd_stdout(db, capsys, monkeypatch): |
|
23 |
import authentic2.management.commands.export_site |
|
24 |
monkeypatch.setattr( |
|
25 |
authentic2.management.commands.export_site, 'export_roles', dummy_export_roles) |
|
26 |
management.call_command('export_site') |
|
27 |
out, err = capsys.readouterr() |
|
28 |
assert json.loads(out) == dummy_export_roles() |
|
29 | ||
30 | ||
31 |
def test_export_role_cmd_to_file(db, monkeypatch, tmpdir): |
|
32 |
import authentic2.management.commands.export_site |
|
33 |
monkeypatch.setattr( |
|
34 |
authentic2.management.commands.export_site, 'export_roles', dummy_export_roles) |
|
35 |
outfile = tmpdir.join('export.json') |
|
36 |
management.call_command('export_site', '--output', outfile.strpath) |
|
37 |
with outfile.open('r') as f: |
|
38 |
assert json.loads(f.read()) == dummy_export_roles() |
|
39 | ||
40 | ||
41 |
def test_import_role_cmd(db, tmpdir, monkeypatch): |
|
42 |
export_file = tmpdir.join('roles-export.json') |
|
43 |
with export_file.open('w'): |
|
44 |
export_file.write(json.dumps({'roles': []})) |
|
45 |
management.call_command('import_site', export_file.strpath) |
|
46 | ||
47 | ||
48 |
def test_import_role_cmd_error_on_stderr(db, tmpdir, monkeypatch, capsys): |
|
49 |
export_file = tmpdir.join('roles-export.json') |
|
50 |
with export_file.open('w'): |
|
51 |
export_file.write(json.dumps({'roles': []})) |
|
52 | ||
53 |
def error_import_roles(*args): |
|
54 |
return DummyImportResult( |
|
55 |
success=False, missing_related={'ou': ['ou1', 'ou2']}, |
|
56 |
roles={'created': [], 'updated': [], 'deleted': []}) |
|
57 | ||
58 |
import authentic2.management.commands.import_site |
|
59 |
monkeypatch.setattr( |
|
60 |
authentic2.management.commands.import_site, 'import_roles', error_import_roles) |
|
61 | ||
62 |
def sys_exit_side_effect(arg): |
|
63 |
return |
|
64 | ||
65 |
sys_exit = Mock(side_effect=sys_exit_side_effect) |
|
66 |
import sys |
|
67 |
monkeypatch.setattr(sys, 'exit', sys_exit) |
|
68 | ||
69 |
management.call_command('import_site', export_file.strpath) |
|
70 | ||
71 |
out, err = capsys.readouterr() |
|
72 |
assert "could not import roles due to missing" in err |
|
73 |
assert "Missing ou" in err |
|
74 |
assert "ou1" in err |
|
75 |
assert "ou2" in err |
|
76 |
sys_exit.assert_called_with(1) |
|
77 | ||
78 | ||
79 |
def test_import_role_cmd_infos_on_stdout(db, tmpdir, monkeypatch, capsys): |
|
80 |
export_file = tmpdir.join('roles-export.json') |
|
81 |
with export_file.open('w'): |
|
82 |
export_file.write(json.dumps({'roles': []})) |
|
83 | ||
84 |
def dummy_import_roles(*args): |
|
85 |
return DummyImportResult( |
|
86 |
success=True, missing_related={}, |
|
87 |
roles={'created': ['role1'], 'updated': [], 'deleted': []}) |
|
88 | ||
89 |
import authentic2.management.commands.import_site |
|
90 |
monkeypatch.setattr( |
|
91 |
authentic2.management.commands.import_site, 'import_roles', dummy_import_roles) |
|
92 | ||
93 |
management.call_command('import_site', export_file.strpath) |
|
94 | ||
95 |
out, err = capsys.readouterr() |
|
96 |
assert "Success" in out |
|
97 |
assert "1 roles created" in out |
|
98 | ||
99 | ||
100 |
def test_import_role_transaction_rollback(db, tmpdir, monkeypatch, capsys): |
|
101 |
export_file = tmpdir.join('roles-export.json') |
|
102 |
with export_file.open('w'): |
|
103 |
export_file.write(json.dumps({'roles': []})) |
|
104 | ||
105 |
Role = get_role_model() |
|
106 | ||
107 |
def exception_import_roles(*args): |
|
108 |
Role.objects.create(slug='role-slug') |
|
109 |
raise ValueError() |
|
110 | ||
111 |
import authentic2.management.commands.import_site |
|
112 |
monkeypatch.setattr( |
|
113 |
authentic2.management.commands.import_site, 'import_roles', exception_import_roles) |
|
114 | ||
115 |
with pytest.raises(ValueError): |
|
116 |
management.call_command('import_site', export_file.strpath) |
|
117 | ||
118 |
with pytest.raises(Role.DoesNotExist): |
|
119 |
Role.objects.get(slug='role-slug') |
|
0 |
- |