0001-agent-redeploy-roles-when-authentic-get-a-new-servic.patch
hobo/agent/authentic2/management/commands/hobo_deploy.py | ||
---|---|---|
15 | 15 |
from django.core import serializers |
16 | 16 | |
17 | 17 |
from django_rbac.utils import get_role_model, get_ou_model |
18 | 18 |
from django.conf import settings |
19 | 19 | |
20 | 20 |
from tenant_schemas.utils import tenant_context |
21 | 21 | |
22 | 22 |
from hobo.agent.common.management.commands import hobo_deploy |
23 |
from hobo.agent.authentic2.provisionning import Provisionning |
|
23 | 24 | |
24 | 25 |
User = get_user_model() |
25 | 26 | |
26 | 27 | |
27 | 28 |
class Command(hobo_deploy.Command): |
28 | 29 |
help = 'Deploy multitenant authentic service from hobo' |
29 | 30 | |
30 | 31 |
def __init__(self, *args, **kwargs): |
... | ... | |
200 | 201 |
content_type=provider_type) |
201 | 202 |
# load skeleton if service is new |
202 | 203 |
if service.get('template_name'): |
203 | 204 |
# if there are more of the same servie, we will create an |
204 | 205 |
# ou |
205 | 206 |
self.load_skeleton(provider, service['service-id'], |
206 | 207 |
service['template_name']) |
207 | 208 | |
209 |
if service_created: |
|
210 |
# mass provision roles |
|
211 |
engine = Provisionning() |
|
212 |
roles = get_role_model().objects.all() |
|
213 |
ous = {provider.ou.id: provider.ou} |
|
214 |
engine.notify_roles(ous, roles, full=True) |
|
215 | ||
216 | ||
208 | 217 |
def load_skeleton(self, provider, service_id, template_name, |
209 | 218 |
create_ou=False): |
210 | 219 |
if not getattr(settings, 'HOBO_SKELETONS_DIR', None): |
211 | 220 |
self.logger.debug('no skeleton: no HOBO_SKELETONS_DIR setting') |
212 | 221 |
return |
213 | 222 |
# ex.: /var/lib/authentic2-multitenant/skeletons/communes/wcs/ |
214 | 223 |
skeleton_dir = os.path.join(settings.HOBO_SKELETONS_DIR, template_name, |
215 | 224 |
service_id) |
tests_authentic/test_hobo_deploy.py | ||
---|---|---|
48 | 48 |
'model': 'a2_rbac.role', |
49 | 49 |
'fields': { |
50 | 50 |
'name': u'Service état-civil', |
51 | 51 |
'slug': u'service-etat-civil', |
52 | 52 |
}, |
53 | 53 |
}, |
54 | 54 |
], roles_json) |
55 | 55 | |
56 |
# As a user is created, notify_agents is called, as celery is not running
|
|
56 |
# notify_agents is called on service creation, as celery is not running
|
|
57 | 57 |
# we just block it |
58 |
mocker.patch('hobo.agent.authentic2.provisionning.notify_agents') |
|
58 |
mock_notify = mocker.patch('hobo.agent.authentic2.provisionning.notify_agents')
|
|
59 | 59 |
requests_get = mocker.patch('requests.get') |
60 | 60 |
meta1 = '''<?xml version="1.0"?> |
61 | 61 |
<EntityDescriptor xmlns="urn:oasis:names:tc:SAML:2.0:metadata" |
62 | 62 |
xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion" |
63 | 63 |
xmlns:ds="http://www.w3.org/2000/09/xmldsig#" |
64 | 64 |
entityID="http://eservices.example.net/saml/metadata"> |
65 | 65 |
<SPSSODescriptor |
66 | 66 |
AuthnRequestsSigned="true" WantAssertionsSigned="true" |
... | ... | |
302 | 302 |
}, |
303 | 303 |
] |
304 | 304 |
} |
305 | 305 |
hobo_json_content = json.dumps(env) |
306 | 306 |
hobo_json = tempfile.NamedTemporaryFile() |
307 | 307 |
hobo_json.write(hobo_json_content) |
308 | 308 |
hobo_json.flush() |
309 | 309 |
call_command('hobo_deploy', 'http://sso.example.net', hobo_json.name) |
310 |
assert mock_notify.call_count == len(env['services']) |
|
310 | 311 | |
311 | 312 |
from hobo.multitenant.middleware import TenantMiddleware |
312 | 313 |
tenants = list(TenantMiddleware.get_tenants()) |
313 | 314 |
assert len(tenants) == 1 |
314 | 315 |
tenant = tenants[0] |
315 | 316 |
assert tenant.domain_url == 'sso.example.net' |
316 | 317 |
assert tenant.schema_name == 'sso_example_net' |
317 | 318 |
tenant_directory = tenant.get_directory() |
318 |
- |