Projet

Général

Profil

0001-jobs-use-uwsgi-spooler-to-run-jobs-50017.patch

Lauréline Guérin, 15 janvier 2021 09:33

Télécharger (14 ko)

Voir les différences:

Subject: [PATCH] jobs: use uwsgi spooler to run jobs (#50017)

 debian/control                                |  1 +
 debian/debian_config.py                       |  1 +
 debian/passerelle.cron.d                      |  1 -
 debian/passerelle.dirs                        |  1 +
 debian/passerelle.init                        |  1 +
 debian/passerelle.postinst                    |  1 +
 debian/passerelle.service                     |  3 +-
 debian/uwsgi.ini                              |  3 ++
 passerelle/base/management/commands/runjob.py | 33 ++++++++++++++
 passerelle/base/models.py                     | 39 +++++++++++-----
 passerelle/settings.py                        |  4 ++
 passerelle/utils/spooler.py                   | 44 +++++++++++++++++++
 tests/test_jobs.py                            | 17 +++++++
 tests/test_mdel_ddpacs.py                     |  2 +
 tests/test_sms.py                             |  6 ++-
 15 files changed, 142 insertions(+), 15 deletions(-)
 create mode 100644 passerelle/base/management/commands/runjob.py
 create mode 100644 passerelle/utils/spooler.py
debian/control
19 19
    python3-gadjo,
20 20
    python3-django-model-utils,
21 21
    python3-requests,
22
    python3-uwsgidecorators,
22 23
    python3-setuptools,
23 24
    python3-suds,
24 25
    python3-cmislib,
debian/debian_config.py
6 6
DEBUG = False
7 7

  
8 8
PROJECT_NAME = 'passerelle'
9
PASSERELLE_MANAGE_COMMAND = '/usr/bin/passerelle-manage'
9 10

  
10 11
#
11 12
# hobotization (multitenant)
debian/passerelle.cron.d
1 1
MAILTO=root
2 2

  
3 3
*/5 * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants availability
4
*/5 * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants jobs
5 4
17  * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants hourly
6 5
25  1 * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants daily
7 6
47  2 * * 7 passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants weekly
debian/passerelle.dirs
1 1
/etc/passerelle
2 2
/usr/lib/passerelle
3 3
/var/lib/passerelle/collectstatic
4
/var/lib/passerelle/spooler
4 5
/var/lib/passerelle/tenants
5 6
/var/log/passerelle
debian/passerelle.init
38 38
DAEMON_ARGS=${DAEMON_ARGS:-"--pidfile=$PIDFILE
39 39
--uid $USER --gid $GROUP
40 40
--ini /etc/$NAME/uwsgi.ini
41
--spooler /var/lib/$NAME/spooler/
41 42
--daemonize /var/log/uwsgi.$NAME.log"}
42 43

  
43 44
# Load the VERBOSE setting and other rcS variables
debian/passerelle.postinst
20 20
    # ensure dirs ownership
21 21
    chown $USER:$GROUP /var/log/$NAME
22 22
    chown $USER:$GROUP /var/lib/$NAME/collectstatic
23
    chown $USER:$GROUP /var/lib/$NAME/spooler
23 24
    chown $USER:$GROUP /var/lib/$NAME/tenants
24 25
    # create a secret file
25 26
    SECRET_FILE=$CONFIG_DIR/secret
debian/passerelle.service
12 12
Group=%p
13 13
ExecStartPre=/usr/bin/passerelle-manage migrate_schemas --noinput --verbosity 1
14 14
ExecStartPre=/usr/bin/passerelle-manage collectstatic --noinput
15
ExecStart=/usr/bin/uwsgi --ini /etc/%p/uwsgi.ini
15
ExecStartPre=/bin/mkdir -p /var/lib/passerelle/spooler/%m/
16
ExecStart=/usr/bin/uwsgi --ini /etc/%p/uwsgi.ini --spooler /var/lib/passerelle/spooler/%m/
16 17
ExecReload=/bin/kill -HUP $MAINPID
17 18
KillSignal=SIGQUIT
18 19
TimeoutStartSec=0
debian/uwsgi.ini
12 12
chmod-socket = 666
13 13
vacuum = true
14 14

  
15
spooler-processes = 3
16
spooler-python-import = passerelle.utils.spooler
17

  
15 18
master = true
16 19
enable-threads = true
17 20
harakiri = 120
passerelle/base/management/commands/runjob.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2021  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
from django.core.management.base import BaseCommand, CommandError
18

  
19
from passerelle.base.models import Job
20

  
21

  
22
class Command(BaseCommand):
23
    '''Run a job (internal command)'''
24

  
25
    def add_arguments(self, parser):
26
        parser.add_argument('--job-id', action='store', required=True)
27

  
28
    def handle(self, *args, **options):
29
        try:
30
            job = Job.objects.get(pk=options['job_id'])
31
        except Job.DoesNotExist:
32
            raise CommandError('missing job')
33
        job.run()
passerelle/base/models.py
574 574
        skipped_jobs = []
575 575
        while True:
576 576
            with transaction.atomic():
577
                # lock an immediately runnable job
578 577
                job = self.jobs_set().exclude(
579 578
                        pk__in=skipped_jobs
580 579
                        ).filter(
......
587 586
                job.status = 'running'
588 587
                job.save()
589 588
                # release lock
590
            try:
591
                getattr(self, job.method_name)(**job.parameters)
592
            except SkipJob as e:
593
                job.status = 'registered'
594
                job.set_after_timestamp(e.after_timestamp)
589
            result = job.run()
590
            if result == 'skipped':
595 591
                skipped_jobs.append(job.id)
596
            except Exception as e:
597
                self.handle_job_error(job, sys.exc_info())
598
            else:
599
                job.status = 'completed'
600
                job.done_timestamp = timezone.now()
601
            job.save()
602 592

  
603 593
    def add_job(self, method_name, natural_id=None, after_timestamp=None, **kwargs):
604 594
        resource_type = ContentType.objects.get_for_model(self)
......
609 599
                  parameters=kwargs)
610 600
        job.set_after_timestamp(after_timestamp)
611 601
        job.save()
602
        transaction.on_commit(lambda: job.run(spool=True))
612 603
        return job
613 604

  
614 605
    def handle_job_error(self, job, exc_info):
......
796 787
        self.status_details.update({'new_job_pk': new_job.pk})
797 788
        self.save()
798 789

  
790
    def run(self, spool=False):
791
        if spool and self.pk:
792
            if 'uwsgi' in sys.modules and settings.PASSERELLE_MANAGE_COMMAND:
793
                from passerelle.utils.spooler import run_job
794
                tenant = getattr(connection, 'tenant', None)
795
                run_job.spool(job_id=str(self.pk), domain=getattr(tenant, 'domain_url', None))
796
            return
797

  
798
        self.status = 'running'
799
        self.save()
800
        try:
801
            getattr(self.resource, self.method_name)(**self.parameters)
802
        except SkipJob as e:
803
            self.status = 'registered'
804
            self.set_after_timestamp(e.after_timestamp)
805
            self.save()
806
            return 'skipped'
807
        except Exception:
808
            self.resource.handle_job_error(self, sys.exc_info())
809
        else:
810
            self.status = 'completed'
811
            self.done_timestamp = timezone.now()
812
        self.save()
813

  
799 814

  
800 815
@six.python_2_unicode_compatible
801 816
class ResourceLog(models.Model):
passerelle/settings.py
202 202

  
203 203
MELLON_IDENTITY_PROVIDERS = []
204 204

  
205
# management command, used to run afterjobs in uwsgi mode,
206
# usually /usr/bin/passerelle-manage.
207
PASSERELLE_MANAGE_COMMAND = None
208

  
205 209
# REQUESTS_PROXIES that can be used by requests methods
206 210
# see http://docs.python-requests.org/en/latest/user/advanced/#proxies
207 211
REQUESTS_PROXIES = None
passerelle/utils/spooler.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2021  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import subprocess
18

  
19
from uwsgidecorators import spool
20

  
21

  
22
@spool
23
def run_job(args):
24
    from django.conf import settings
25

  
26
    cmd_args = [
27
        settings.PASSERELLE_MANAGE_COMMAND,
28
    ]
29

  
30
    if args.get('domain'):
31
        # multitenant installation
32
        cmd_args.append('tenant_command')
33

  
34
    cmd_args += [
35
        'runjob',
36
        '--job-id', args['job_id']
37
    ]
38

  
39
    if args.get('domain'):
40
        # multitenant installation
41
        cmd_args.append('--domain')
42
        cmd_args.append(args['domain'])
43

  
44
    subprocess.run(cmd_args)
tests/test_jobs.py
3 3
import datetime
4 4
import os
5 5

  
6
from django.core.management import call_command
7

  
6 8
import mock
7 9
import pytest
8 10

  
......
142 144

  
143 145
        base_adresse.jobs()
144 146
        assert Job.objects.get(id=job.id).status == 'registered'
147

  
148

  
149
@mock.patch('passerelle.utils.Request.get')
150
def test_runjob(mocked_get, app, base_adresse, freezer):
151
    filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.gz')
152
    with open(filepath, 'rb') as ban_file:
153
        mocked_get.return_value = utils.FakedResponse(content=ban_file.read(), status_code=200)
154

  
155
    freezer.move_to('2019-01-01 00:00:00')
156
    job = base_adresse.add_job('update_streets_data')
157
    assert job.status == 'registered'
158

  
159
    call_command('runjob', '--job-id=%s' % job.pk)
160
    assert Job.objects.get(id=job.id).status == 'completed'
161
    assert StreetModel.objects.count() == 3
tests/test_mdel_ddpacs.py
97 97
        resource.outgoing_sftp = sftp.SFTP(
98 98
            'sftp://john:doe@{server.host}:{server.port}/output/'.format(
99 99
                server=sftpserver))
100
        resource.save()
100 101
        resource.jobs()
101 102
        assert not content['output']
102 103
        # Jump over the 6 hour wait time for retry
......
123 124
        resource.incoming_sftp = sftp.SFTP(
124 125
            'sftp://john:doe@{server.host}:{server.port}/input/'.format(
125 126
                server=sftpserver))
127
        resource.save()
126 128

  
127 129
        response_name, response_content = build_response_zip(
128 130
            reference='A-1-1',
tests/test_sms.py
182 182
        'to': ['+33688888888'],
183 183
    }
184 184
    for path in (base_path, base_path + '?nostop=1', base_path + '?nostop=foo', base_path + '?nostop'):
185
        with mock.patch.object(connector, 'send_msg') as send_function:
185
        send_patch = mock.patch(
186
            'passerelle.apps.%s.models.%s.send_msg'
187
            % (connector.__class__._meta.app_label, connector.__class__.__name__))
188
        with send_patch as send_function:
186 189
            send_function.return_value = {}
187 190
            result = app.post_json(base_path, params=payload)
188 191
            connector.jobs()
......
284 287
    ovh_url = connector.API_URL % {'serviceName': 'sms-test42'}
285 288
    with utils.mock_url(ovh_url, resp, 200) as mocked:
286 289
        connector.jobs()
290
    connector.refresh_from_db()
287 291
    assert connector.credit_left == 123
288 292

  
289 293
    resp = app.get(manager_url)
290
-