Projet

Général

Profil

0001-applications-handle-roles-in-app-bundles-68061.patch

Frédéric Péters, 12 août 2022 12:09

Télécharger (8,55 ko)

Voir les différences:

Subject: [PATCH] applications: handle roles in app bundles (#68061)

 hobo/applications/models.py |  35 +++++++++++++
 hobo/applications/utils.py  |   3 +-
 tests/test_application.py   | 101 +++++++++++++++++++++++++++++++++++-
 3 files changed, 137 insertions(+), 2 deletions(-)
hobo/applications/models.py
14 14
# You should have received a copy of the GNU Affero General Public License
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16

  
17
import io
18
import json
19
import tarfile
17 20
import urllib.parse
18 21

  
19 22
from django.conf import settings
......
88 91

  
89 92
    def deploy(self):
90 93
        bundle_content = self.bundle.read()
94
        self.deploy_roles(bundle_content)
91 95
        for service_id, services in getattr(settings, 'KNOWN_SERVICES', {}).items():
92 96
            if service_id not in Application.SUPPORTED_MODULES:
93 97
                continue
......
102 106
                    continue
103 107
                # TODO: look at response content for afterjob URLs to display a progress bar
104 108
                pass
109

  
110
    def get_authentic_service(self):
111
        for service_id, services in getattr(settings, 'KNOWN_SERVICES', {}).items():
112
            if service_id == 'authentic':
113
                for service in services.values():
114
                    return service
115
        return None
116

  
117
    def deploy_roles(self, bundle):
118
        tar_io = io.BytesIO(bundle)
119
        service = self.get_authentic_service()
120
        if not service:
121
            return
122
        roles_api_url = urllib.parse.urljoin(service['url'], 'api/roles/?update_or_create=slug')
123
        provision_api_url = urllib.parse.urljoin(service['url'], 'api/provision/')
124
        with tarfile.open(fileobj=tar_io) as tar:
125
            manifest = json.loads(tar.extractfile('manifest.json').read().decode())
126
            for element in manifest.get('elements'):
127
                if element.get('type') != 'roles':
128
                    continue
129
                role_info = json.loads(tar.extractfile('%s/%s' % (element['type'], element['slug'])).read())
130
                # create or update
131
                response = requests.post(roles_api_url, json=role_info)
132
                if not response.ok:
133
                    # TODO: report failures
134
                    continue
135
                # then force provisionning
136
                response = requests.post(provision_api_url, json={'role_uuid': response.json()['uuid']})
137
                if not response.ok:
138
                    # TODO: report failures
139
                    continue
hobo/applications/utils.py
55 55
        scheme, netloc, path, params, query, fragment = urllib.parse.urlparse(url)
56 56
        url = urllib.parse.urlunparse(('', '', path, params, query, fragment))
57 57

  
58
        query_params = {'orig': remote_service.get('orig')}
58
        query_params = dict(urllib.parse.parse_qsl(query))
59
        query_params['orig'] = remote_service.get('orig')
59 60

  
60 61
        remote_service_base_url = remote_service.get('url')
61 62
        scheme, netloc, dummy, params, old_query, fragment = urllib.parse.urlparse(remote_service_base_url)
tests/test_application.py
2 2
import json
3 3
import tarfile
4 4

  
5
import httmock
5 6
import pytest
6 7
from httmock import HTTMock
7 8
from test_manager import login
8 9
from webtest import Upload
9 10

  
10 11
from hobo.applications.models import Application
11
from hobo.environment.models import Wcs
12
from hobo.environment.models import Authentic, Wcs
12 13

  
13 14
pytestmark = pytest.mark.django_db
14 15

  
......
103 104

  
104 105
def mocked_http(url, request):
105 106
    assert '&signature=' in url.query
107

  
108
    if url.netloc == 'idp.example.invalid':
109
        if url.path == '/api/roles/':
110
            return {
111
                'content': json.dumps({'name': 'test', 'uuid': '123', 'slug': 'test'}),
112
                'status_code': 200,
113
            }
114
        if url.path == '/api/provision/':
115
            return {
116
                'content': json.dumps({'name': 'test', 'uuid': '123', 'slug': 'test'}),
117
                'status_code': 200,
118
            }
119
        return {'content': '{}', 'status_code': 500}
120

  
106 121
    if url.netloc == 'wcs.example.invalid' and url.path == '/api/export-import/':
107 122
        return {'content': json.dumps(WCS_AVAILABLE_OBJECTS), 'status_code': 200}
108 123

  
......
260 275
        assert Application.objects.count() == 1
261 276
        assert Application.objects.get(slug='test').name == 'Test'
262 277
        assert Application.objects.get(slug='test').elements.count() == 3
278

  
279

  
280
@pytest.fixture
281
def app_bundle_roles():
282
    tar_io = io.BytesIO()
283
    with tarfile.open(mode='w', fileobj=tar_io) as tar:
284
        manifest_json = {
285
            'application': 'Test',
286
            'slug': 'test',
287
            'description': '',
288
            'elements': [
289
                {'type': 'forms', 'slug': 'test', 'name': 'test', 'auto-dependency': False},
290
                {'type': 'roles', 'slug': 'test-role', 'name': 'test', 'auto-dependency': True},
291
                {'type': 'roles', 'slug': 'test-role2', 'name': 'test2', 'auto-dependency': True},
292
            ],
293
        }
294
        manifest_fd = io.BytesIO(json.dumps(manifest_json, indent=2).encode())
295
        tarinfo = tarfile.TarInfo('manifest.json')
296
        tarinfo.size = len(manifest_fd.getvalue())
297
        tar.addfile(tarinfo, fileobj=manifest_fd)
298

  
299
        role_json = {'name': 'Test', 'slug': 'test-role', 'uuid': '061e5de7023946c79a2f7f1273afc5a2'}
300
        role_json_fd = io.BytesIO(json.dumps(role_json, indent=2).encode())
301
        tarinfo = tarfile.TarInfo('roles/test-role')
302
        tarinfo.size = len(role_json_fd.getvalue())
303
        tar.addfile(tarinfo, fileobj=role_json_fd)
304

  
305
        role_json = {'name': 'Test', 'slug': 'test-role2', 'uuid': '061e5de7023946c79a2f7f1273afc5a3'}
306
        role_json_fd = io.BytesIO(json.dumps(role_json, indent=2).encode())
307
        tarinfo = tarfile.TarInfo('roles/test-role2')
308
        tarinfo.size = len(role_json_fd.getvalue())
309
        tar.addfile(tarinfo, fileobj=role_json_fd)
310

  
311
    return tar_io.getvalue()
312

  
313

  
314
def test_deploy_application_roles(app, admin_user, settings, app_bundle_roles):
315
    Application.objects.all().delete()
316
    Authentic.objects.create(base_url='https://idp.example.invalid', slug='idp', title='Foobar')
317
    Wcs.objects.create(base_url='https://wcs.example.invalid', slug='foobar', title='Foobar')
318

  
319
    settings.KNOWN_SERVICES = {
320
        'authentic': {
321
            'idp': {
322
                'title': 'Foobar',
323
                'url': 'https://idp.example.invalid/',
324
                'orig': 'example.org',
325
                'secret': 'xxx',
326
            }
327
        },
328
        'wcs': {
329
            'foobar': {
330
                'title': 'Foobar',
331
                'url': 'https://wcs.example.invalid/',
332
                'orig': 'example.org',
333
                'secret': 'xxx',
334
            }
335
        },
336
    }
337

  
338
    login(app)
339

  
340
    resp = app.get('/applications/')
341
    for i in range(2):
342
        resp = resp.click('Install')
343
        resp.form['bundle'] = Upload('app.tar', app_bundle_roles, 'application/x-tar')
344
        with HTTMock(httmock.remember_called(mocked_http)):
345
            resp = resp.form.submit().follow()
346
            # roles
347
            assert mocked_http.call['requests'][0].url.startswith(
348
                'https://idp.example.invalid/api/roles/?update'
349
            )
350
            assert mocked_http.call['requests'][1].url.startswith(
351
                'https://idp.example.invalid/api/provision/'
352
            )
353
            assert mocked_http.call['requests'][2].url.startswith(
354
                'https://idp.example.invalid/api/roles/?update'
355
            )
356
            assert mocked_http.call['requests'][3].url.startswith(
357
                'https://idp.example.invalid/api/provision/'
358
            )
359
            # then form
360
            assert 'wcs.example.invalid' in mocked_http.call['requests'][4].url
361
            assert mocked_http.call['count'] == 5
263
-