0001-jobs-use-uwsgi-spooler-to-run-jobs-50017.patch
debian/control | ||
---|---|---|
19 | 19 |
python3-gadjo, |
20 | 20 |
python3-django-model-utils, |
21 | 21 |
python3-requests, |
22 |
python3-uwsgidecorators, |
|
22 | 23 |
python3-setuptools, |
23 | 24 |
python3-suds, |
24 | 25 |
python3-cmislib, |
debian/debian_config.py | ||
---|---|---|
6 | 6 |
DEBUG = False |
7 | 7 | |
8 | 8 |
PROJECT_NAME = 'passerelle' |
9 |
PASSERELLE_MANAGE_COMMAND = '/usr/bin/passerelle-manage' |
|
9 | 10 | |
10 | 11 |
# |
11 | 12 |
# hobotization (multitenant) |
debian/passerelle.cron.d | ||
---|---|---|
1 | 1 |
MAILTO=root |
2 | 2 | |
3 | 3 |
*/5 * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants availability |
4 |
*/5 * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants jobs |
|
5 | 4 |
17 * * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants hourly |
6 | 5 |
25 1 * * * passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants daily |
7 | 6 |
47 2 * * 7 passerelle /usr/bin/passerelle-manage tenant_command cron --all-tenants weekly |
debian/passerelle.dirs | ||
---|---|---|
1 | 1 |
/etc/passerelle |
2 | 2 |
/usr/lib/passerelle |
3 | 3 |
/var/lib/passerelle/collectstatic |
4 |
/var/lib/passerelle/spooler |
|
4 | 5 |
/var/lib/passerelle/tenants |
5 | 6 |
/var/log/passerelle |
debian/passerelle.init | ||
---|---|---|
38 | 38 |
DAEMON_ARGS=${DAEMON_ARGS:-"--pidfile=$PIDFILE |
39 | 39 |
--uid $USER --gid $GROUP |
40 | 40 |
--ini /etc/$NAME/uwsgi.ini |
41 |
--spooler /var/lib/$NAME/spooler/ |
|
41 | 42 |
--daemonize /var/log/uwsgi.$NAME.log"} |
42 | 43 | |
43 | 44 |
# Load the VERBOSE setting and other rcS variables |
debian/passerelle.postinst | ||
---|---|---|
20 | 20 |
# ensure dirs ownership |
21 | 21 |
chown $USER:$GROUP /var/log/$NAME |
22 | 22 |
chown $USER:$GROUP /var/lib/$NAME/collectstatic |
23 |
chown $USER:$GROUP /var/lib/$NAME/spooler |
|
23 | 24 |
chown $USER:$GROUP /var/lib/$NAME/tenants |
24 | 25 |
# create a secret file |
25 | 26 |
SECRET_FILE=$CONFIG_DIR/secret |
debian/passerelle.service | ||
---|---|---|
12 | 12 |
Group=%p |
13 | 13 |
ExecStartPre=/usr/bin/passerelle-manage migrate_schemas --noinput --verbosity 1 |
14 | 14 |
ExecStartPre=/usr/bin/passerelle-manage collectstatic --noinput |
15 |
ExecStart=/usr/bin/uwsgi --ini /etc/%p/uwsgi.ini |
|
15 |
ExecStartPre=/bin/mkdir -p /var/lib/passerelle/spooler/%m/ |
|
16 |
ExecStart=/usr/bin/uwsgi --ini /etc/%p/uwsgi.ini --spooler /var/lib/passerelle/spooler/%m/ |
|
16 | 17 |
ExecReload=/bin/kill -HUP $MAINPID |
17 | 18 |
KillSignal=SIGQUIT |
18 | 19 |
TimeoutStartSec=0 |
debian/uwsgi.ini | ||
---|---|---|
12 | 12 |
chmod-socket = 666 |
13 | 13 |
vacuum = true |
14 | 14 | |
15 |
spooler-processes = 3 |
|
16 |
spooler-python-import = passerelle.utils.spooler |
|
17 | ||
15 | 18 |
master = true |
16 | 19 |
enable-threads = true |
17 | 20 |
harakiri = 120 |
passerelle/base/management/commands/runjob.py | ||
---|---|---|
1 |
# passerelle - uniform access to multiple data sources and services |
|
2 |
# Copyright (C) 2021 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
from django.core.management.base import BaseCommand, CommandError |
|
18 | ||
19 |
from passerelle.base.models import Job |
|
20 | ||
21 | ||
22 |
class Command(BaseCommand): |
|
23 |
'''Run a job (internal command)''' |
|
24 | ||
25 |
def add_arguments(self, parser): |
|
26 |
parser.add_argument('--job-id', action='store', required=True) |
|
27 | ||
28 |
def handle(self, *args, **options): |
|
29 |
try: |
|
30 |
job = Job.objects.get(pk=options['job_id']) |
|
31 |
except Job.DoesNotExist: |
|
32 |
raise CommandError('missing job') |
|
33 |
job.run() |
passerelle/base/models.py | ||
---|---|---|
574 | 574 |
skipped_jobs = [] |
575 | 575 |
while True: |
576 | 576 |
with transaction.atomic(): |
577 |
# lock an immediately runnable job |
|
578 | 577 |
job = self.jobs_set().exclude( |
579 | 578 |
pk__in=skipped_jobs |
580 | 579 |
).filter( |
... | ... | |
587 | 586 |
job.status = 'running' |
588 | 587 |
job.save() |
589 | 588 |
# release lock |
590 |
try: |
|
591 |
getattr(self, job.method_name)(**job.parameters) |
|
592 |
except SkipJob as e: |
|
593 |
job.status = 'registered' |
|
594 |
job.set_after_timestamp(e.after_timestamp) |
|
589 |
result = job.run() |
|
590 |
if result == 'skipped': |
|
595 | 591 |
skipped_jobs.append(job.id) |
596 |
except Exception as e: |
|
597 |
self.handle_job_error(job, sys.exc_info()) |
|
598 |
else: |
|
599 |
job.status = 'completed' |
|
600 |
job.done_timestamp = timezone.now() |
|
601 |
job.save() |
|
602 | 592 | |
603 | 593 |
def add_job(self, method_name, natural_id=None, after_timestamp=None, **kwargs): |
604 | 594 |
resource_type = ContentType.objects.get_for_model(self) |
... | ... | |
609 | 599 |
parameters=kwargs) |
610 | 600 |
job.set_after_timestamp(after_timestamp) |
611 | 601 |
job.save() |
602 |
transaction.on_commit(lambda: job.run(spool=True)) |
|
612 | 603 |
return job |
613 | 604 | |
614 | 605 |
def handle_job_error(self, job, exc_info): |
... | ... | |
796 | 787 |
self.status_details.update({'new_job_pk': new_job.pk}) |
797 | 788 |
self.save() |
798 | 789 | |
790 |
def run(self, spool=False): |
|
791 |
if spool and self.pk: |
|
792 |
if 'uwsgi' in sys.modules and settings.PASSERELLE_MANAGE_COMMAND: |
|
793 |
from passerelle.utils.spooler import run_job |
|
794 |
tenant = getattr(connection, 'tenant', None) |
|
795 |
run_job.spool(job_id=str(self.pk), domain=getattr(tenant, 'domain_url', None)) |
|
796 |
return |
|
797 | ||
798 |
self.status = 'running' |
|
799 |
self.save() |
|
800 |
try: |
|
801 |
getattr(self.resource, self.method_name)(**self.parameters) |
|
802 |
except SkipJob as e: |
|
803 |
self.status = 'registered' |
|
804 |
self.set_after_timestamp(e.after_timestamp) |
|
805 |
self.save() |
|
806 |
return 'skipped' |
|
807 |
except Exception: |
|
808 |
self.resource.handle_job_error(self, sys.exc_info()) |
|
809 |
else: |
|
810 |
self.status = 'completed' |
|
811 |
self.done_timestamp = timezone.now() |
|
812 |
self.save() |
|
813 | ||
799 | 814 | |
800 | 815 |
@six.python_2_unicode_compatible |
801 | 816 |
class ResourceLog(models.Model): |
passerelle/settings.py | ||
---|---|---|
202 | 202 | |
203 | 203 |
MELLON_IDENTITY_PROVIDERS = [] |
204 | 204 | |
205 |
# management command, used to run afterjobs in uwsgi mode, |
|
206 |
# usually /usr/bin/passerelle-manage. |
|
207 |
PASSERELLE_MANAGE_COMMAND = None |
|
208 | ||
205 | 209 |
# REQUESTS_PROXIES that can be used by requests methods |
206 | 210 |
# see http://docs.python-requests.org/en/latest/user/advanced/#proxies |
207 | 211 |
REQUESTS_PROXIES = None |
passerelle/utils/spooler.py | ||
---|---|---|
1 |
# passerelle - uniform access to multiple data sources and services |
|
2 |
# Copyright (C) 2021 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
import subprocess |
|
18 | ||
19 |
from uwsgidecorators import spool |
|
20 | ||
21 | ||
22 |
@spool |
|
23 |
def run_job(args): |
|
24 |
from django.conf import settings |
|
25 | ||
26 |
cmd_args = [ |
|
27 |
settings.PASSERELLE_MANAGE_COMMAND, |
|
28 |
] |
|
29 | ||
30 |
if args.get('domain'): |
|
31 |
# multitenant installation |
|
32 |
cmd_args.append('tenant_command') |
|
33 | ||
34 |
cmd_args += [ |
|
35 |
'runjob', |
|
36 |
'--job-id', args['job_id'] |
|
37 |
] |
|
38 | ||
39 |
if args.get('domain'): |
|
40 |
# multitenant installation |
|
41 |
cmd_args.append('--domain') |
|
42 |
cmd_args.append(args['domain']) |
|
43 | ||
44 |
subprocess.run(cmd_args) |
tests/test_jobs.py | ||
---|---|---|
3 | 3 |
import datetime |
4 | 4 |
import os |
5 | 5 | |
6 |
from django.core.management import call_command |
|
7 | ||
6 | 8 |
import mock |
7 | 9 |
import pytest |
8 | 10 | |
... | ... | |
142 | 144 | |
143 | 145 |
base_adresse.jobs() |
144 | 146 |
assert Job.objects.get(id=job.id).status == 'registered' |
147 | ||
148 | ||
149 |
@mock.patch('passerelle.utils.Request.get') |
|
150 |
def test_runjob(mocked_get, app, base_adresse, freezer): |
|
151 |
filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.gz') |
|
152 |
with open(filepath, 'rb') as ban_file: |
|
153 |
mocked_get.return_value = utils.FakedResponse(content=ban_file.read(), status_code=200) |
|
154 | ||
155 |
freezer.move_to('2019-01-01 00:00:00') |
|
156 |
job = base_adresse.add_job('update_streets_data') |
|
157 |
assert job.status == 'registered' |
|
158 | ||
159 |
call_command('runjob', '--job-id=%s' % job.pk) |
|
160 |
assert Job.objects.get(id=job.id).status == 'completed' |
|
161 |
assert StreetModel.objects.count() == 3 |
tests/test_mdel_ddpacs.py | ||
---|---|---|
97 | 97 |
resource.outgoing_sftp = sftp.SFTP( |
98 | 98 |
'sftp://john:doe@{server.host}:{server.port}/output/'.format( |
99 | 99 |
server=sftpserver)) |
100 |
resource.save() |
|
100 | 101 |
resource.jobs() |
101 | 102 |
assert not content['output'] |
102 | 103 |
# Jump over the 6 hour wait time for retry |
... | ... | |
123 | 124 |
resource.incoming_sftp = sftp.SFTP( |
124 | 125 |
'sftp://john:doe@{server.host}:{server.port}/input/'.format( |
125 | 126 |
server=sftpserver)) |
127 |
resource.save() |
|
126 | 128 | |
127 | 129 |
response_name, response_content = build_response_zip( |
128 | 130 |
reference='A-1-1', |
tests/test_sms.py | ||
---|---|---|
182 | 182 |
'to': ['+33688888888'], |
183 | 183 |
} |
184 | 184 |
for path in (base_path, base_path + '?nostop=1', base_path + '?nostop=foo', base_path + '?nostop'): |
185 |
with mock.patch.object(connector, 'send_msg') as send_function: |
|
185 |
send_patch = mock.patch( |
|
186 |
'passerelle.apps.%s.models.%s.send_msg' |
|
187 |
% (connector.__class__._meta.app_label, connector.__class__.__name__)) |
|
188 |
with send_patch as send_function: |
|
186 | 189 |
send_function.return_value = {} |
187 | 190 |
result = app.post_json(base_path, params=payload) |
188 | 191 |
connector.jobs() |
... | ... | |
284 | 287 |
ovh_url = connector.API_URL % {'serviceName': 'sms-test42'} |
285 | 288 |
with utils.mock_url(ovh_url, resp, 200) as mocked: |
286 | 289 |
connector.jobs() |
290 |
connector.refresh_from_db() |
|
287 | 291 |
assert connector.credit_left == 123 |
288 | 292 | |
289 | 293 |
resp = app.get(manager_url) |
290 |
- |