Projet

Général

Profil

0001-jobs-use-uwsgi-spooler-to-run-jobs-50017.patch

Lauréline Guérin, 14 janvier 2021 15:01

Télécharger (11,4 ko)

Voir les différences:

Subject: [PATCH] jobs: use uwsgi spooler to run jobs (#50017)

 debian/control                                |  1 +
 debian/debian_config.py                       |  1 +
 debian/passerelle.cron.d                      |  1 -
 debian/passerelle.dirs                        |  1 +
 debian/passerelle.init                        |  1 +
 debian/passerelle.postinst                    |  1 +
 debian/passerelle.service                     |  3 +-
 debian/uwsgi.ini                              |  3 ++
 passerelle/base/management/commands/runjob.py | 33 ++++++++++++++++
 passerelle/base/models.py                     | 38 +++++++++++++------
 passerelle/settings.py                        |  4 ++
 passerelle/utils/spooler.py                   | 29 ++++++++++++++
 tests/test_jobs.py                            | 17 +++++++++
 13 files changed, 119 insertions(+), 14 deletions(-)
 create mode 100644 passerelle/base/management/commands/runjob.py
 create mode 100644 passerelle/utils/spooler.py
debian/control
19 19
    python3-gadjo,
20 20
    python3-django-model-utils,
21 21
    python3-requests,
22
    python3-uwsgidecorators,
22 23
    python3-setuptools,
23 24
    python3-suds,
24 25
    python3-cmislib,
debian/debian_config.py
6 6
DEBUG = False
7 7

  
8 8
PROJECT_NAME = 'passerelle'
9
PASSERELLE_MANAGE_COMMAND = '/usr/bin/passerelle-manage'
9 10

  
10 11
#
11 12
# hobotization (multitenant)
debian/passerelle.cron.d
1 1
MAILTO=root
2 2

  
3 3
*/5 * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants availability
4
*/5 * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants jobs
5 4
17  * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants hourly
6 5
25  1 * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants daily
7 6
47  2 * * 7 passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants weekly
debian/passerelle.dirs
1 1
/etc/passerelle
2 2
/usr/lib/passerelle
3 3
/var/lib/passerelle/collectstatic
4
/var/lib/passerelle/spooler
4 5
/var/lib/passerelle/tenants
5 6
/var/log/passerelle
debian/passerelle.init
38 38
DAEMON_ARGS=${DAEMON_ARGS:-"--pidfile=$PIDFILE
39 39
--uid $USER --gid $GROUP
40 40
--ini /etc/$NAME/uwsgi.ini
41
--spooler /var/lib/$NAME/spooler/
41 42
--daemonize /var/log/uwsgi.$NAME.log"}
42 43

  
43 44
# Load the VERBOSE setting and other rcS variables
debian/passerelle.postinst
20 20
    # ensure dirs ownership
21 21
    chown $USER:$GROUP /var/log/$NAME
22 22
    chown $USER:$GROUP /var/lib/$NAME/collectstatic
23
    chown $USER:$GROUP /var/lib/$NAME/spooler
23 24
    chown $USER:$GROUP /var/lib/$NAME/tenants
24 25
    # create a secret file
25 26
    SECRET_FILE=$CONFIG_DIR/secret
debian/passerelle.service
12 12
Group=%p
13 13
ExecStartPre=/usr/bin/passerelle-manage migrate_schemas --noinput --verbosity 1
14 14
ExecStartPre=/usr/bin/passerelle-manage collectstatic --noinput
15
ExecStart=/usr/bin/uwsgi --ini /etc/%p/uwsgi.ini
15
ExecStartPre=/bin/mkdir -p /var/lib/passerelle/spooler/%m/
16
ExecStart=/usr/bin/uwsgi --ini /etc/%p/uwsgi.ini --spooler /var/lib/passerelle/spooler/%m/
16 17
ExecReload=/bin/kill -HUP $MAINPID
17 18
KillSignal=SIGQUIT
18 19
TimeoutStartSec=0
debian/uwsgi.ini
12 12
chmod-socket = 666
13 13
vacuum = true
14 14

  
15
spooler-processes = 3
16
import = passerelle.utils.spooler
17

  
15 18
master = true
16 19
enable-threads = true
17 20
harakiri = 120
passerelle/base/management/commands/runjob.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2021  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
from django.core.management.base import BaseCommand, CommandError
18

  
19
from passerelle.base.models import Job
20

  
21

  
22
class Command(BaseCommand):
23
    '''Run a job (internal command)'''
24

  
25
    def add_arguments(self, parser):
26
        parser.add_argument('--job-id', action='store', required=True)
27

  
28
    def handle(self, *args, **options):
29
        try:
30
            job = Job.objects.get(pk=options['job_id'])
31
        except Job.DoesNotExist:
32
            raise CommandError('missing job')
33
        job.run()
passerelle/base/models.py
574 574
        skipped_jobs = []
575 575
        while True:
576 576
            with transaction.atomic():
577
                # lock an immediately runnable job
578 577
                job = self.jobs_set().exclude(
579 578
                        pk__in=skipped_jobs
580 579
                        ).filter(
......
587 586
                job.status = 'running'
588 587
                job.save()
589 588
                # release lock
590
            try:
591
                getattr(self, job.method_name)(**job.parameters)
592
            except SkipJob as e:
593
                job.status = 'registered'
594
                job.set_after_timestamp(e.after_timestamp)
589
            result = job.run()
590
            if result == 'skipped':
595 591
                skipped_jobs.append(job.id)
596
            except Exception as e:
597
                self.handle_job_error(job, sys.exc_info())
598
            else:
599
                job.status = 'completed'
600
                job.done_timestamp = timezone.now()
601
            job.save()
602 592

  
603 593
    def add_job(self, method_name, natural_id=None, after_timestamp=None, **kwargs):
604 594
        resource_type = ContentType.objects.get_for_model(self)
......
609 599
                  parameters=kwargs)
610 600
        job.set_after_timestamp(after_timestamp)
611 601
        job.save()
602
        job.run(spool=True)
612 603
        return job
613 604

  
614 605
    def handle_job_error(self, job, exc_info):
......
796 787
        self.status_details.update({'new_job_pk': new_job.pk})
797 788
        self.save()
798 789

  
790
    def run(self, spool=False):
791
        if spool and self.pk:
792
            if 'uwsgi' in sys.modules and settings.PASSERELLE_MANAGE_COMMAND:
793
                from passerelle.utils.spooler import run_after_job
794
                run_after_job.spool(job_id=self.pk)
795
            return
796

  
797
        self.status = 'running'
798
        self.save()
799
        try:
800
            getattr(self.resource, self.method_name)(**self.parameters)
801
        except SkipJob as e:
802
            self.status = 'registered'
803
            self.set_after_timestamp(e.after_timestamp)
804
            self.save()
805
            return 'skipped'
806
        except Exception:
807
            self.resource.handle_job_error(self, sys.exc_info())
808
        else:
809
            self.status = 'completed'
810
            self.done_timestamp = timezone.now()
811
        self.save()
812

  
799 813

  
800 814
@six.python_2_unicode_compatible
801 815
class ResourceLog(models.Model):
passerelle/settings.py
202 202

  
203 203
MELLON_IDENTITY_PROVIDERS = []
204 204

  
205
# management command, used to run afterjobs in uwsgi mode,
206
# usually /usr/bin/passerelle-manage.
207
PASSERELLE_MANAGE_COMMAND = None
208

  
205 209
# REQUESTS_PROXIES that can be used by requests methods
206 210
# see http://docs.python-requests.org/en/latest/user/advanced/#proxies
207 211
REQUESTS_PROXIES = None
passerelle/utils/spooler.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2021  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import subprocess
18

  
19
from uwsgidecorators import spool
20

  
21

  
22
@spool
23
def run_after_job(args):
24
    from django.conf import settings
25
    subprocess.run([
26
        settings.PASSERELLE_MANAGE_COMMAND,
27
        'runjob',
28
        '--job-id', args['job_id']
29
    ])
tests/test_jobs.py
3 3
import datetime
4 4
import os
5 5

  
6
from django.core.management import call_command
7

  
6 8
import mock
7 9
import pytest
8 10

  
......
142 144

  
143 145
        base_adresse.jobs()
144 146
        assert Job.objects.get(id=job.id).status == 'registered'
147

  
148

  
149
@mock.patch('passerelle.utils.Request.get')
150
def test_runjob(mocked_get, app, base_adresse, freezer):
151
    filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.gz')
152
    with open(filepath, 'rb') as ban_file:
153
        mocked_get.return_value = utils.FakedResponse(content=ban_file.read(), status_code=200)
154

  
155
    freezer.move_to('2019-01-01 00:00:00')
156
    job = base_adresse.add_job('update_streets_data')
157
    assert job.status == 'registered'
158

  
159
    call_command('runjob', '--job-id=%s' % job.pk)
160
    assert Job.objects.get(id=job.id).status == 'completed'
161
    assert StreetModel.objects.count() == 3
145
-