Projet

Général

Profil

0002-misc-more-logs-for-some-cron-jobs-57604.patch

Lauréline Guérin, 21 octobre 2021 09:55

Télécharger (12,3 ko)

Voir les différences:

Subject: [PATCH 2/2] misc: more logs for some cron jobs (#57604)

 wcs/formdef.py              | 16 +++---
 wcs/publisher.py            |  7 +--
 wcs/wf/aggregation_email.py | 97 +++++++++++++++++++------------------
 wcs/wf/jump.py              | 74 +++++++++++++++-------------
 4 files changed, 104 insertions(+), 90 deletions(-)
wcs/formdef.py
15 15
# along with this program; if not, see <http://www.gnu.org/licenses/>.
16 16

  
17 17
import base64
18
import contextlib
18 19
import copy
19 20
import datetime
20 21
import glob
......
1767 1768
Substitutions.register('form_name', category=_('Form'), comment=_('Form Name'))
1768 1769

  
1769 1770

  
1770
def clean_drafts(publisher):
1771
def clean_drafts(publisher, job=None):
1771 1772
    import wcs.qommon.storage as st
1772 1773
    from wcs.carddef import CardDef
1773 1774

  
1774 1775
    for formdef in FormDef.select() + CardDef.select():
1775
        removal_date = datetime.date.today() - datetime.timedelta(days=formdef.get_drafts_lifespan())
1776
        for formdata in formdef.data_class().select(
1777
            [st.Equal('status', 'draft'), st.Less('receipt_time', removal_date.timetuple())]
1778
        ):
1779
            formdata.remove_self()
1776
        with job.log_long_job(
1777
            '%s %s' % (formdef.xml_root_node, formdef.url_name)
1778
        ) if job else contextlib.ExitStack():
1779
            removal_date = datetime.date.today() - datetime.timedelta(days=formdef.get_drafts_lifespan())
1780
            for formdata in formdef.data_class().select(
1781
                [st.Equal('status', 'draft'), st.Less('receipt_time', removal_date.timetuple())]
1782
            ):
1783
                formdata.remove_self()
1780 1784

  
1781 1785

  
1782 1786
def clean_unused_files(publisher):
wcs/publisher.py
22 22
import sys
23 23
import traceback
24 24
import zipfile
25
from contextlib import contextmanager
25
from contextlib import ExitStack, contextmanager
26 26

  
27 27
from django.utils.encoding import force_text
28 28

  
......
427 427
            # Could also could happen on file descriptor exhaustion.
428 428
            pass
429 429

  
430
    def apply_global_action_timeouts(self):
430
    def apply_global_action_timeouts(self, job=None):
431 431
        from wcs.workflows import Workflow, WorkflowGlobalActionTimeoutTrigger
432 432

  
433 433
        for workflow in Workflow.select():
434
            WorkflowGlobalActionTimeoutTrigger.apply(workflow)
434
            with job.log_long_job('workflow %s' % workflow.url_name) if job else ExitStack():
435
                WorkflowGlobalActionTimeoutTrigger.apply(workflow)
435 436

  
436 437
    def migrate_sql(self):
437 438
        from . import sql
wcs/wf/aggregation_email.py
14 14
# You should have received a copy of the GNU General Public License
15 15
# along with this program; if not, see <http://www.gnu.org/licenses/>.
16 16

  
17
import contextlib
18

  
17 19
from quixote import get_publisher
18 20

  
19 21
from wcs.workflows import WorkflowStatusItem, register_item_class
......
98 100
        return 10000
99 101

  
100 102

  
101
def send_aggregation_emails(publisher):
103
def send_aggregation_emails(publisher, job=None):
102 104
    from wcs.formdef import FormDef
103 105

  
104 106
    publisher.reload_cfg()
......
106 108

  
107 109
    cache = {}
108 110
    for aggregate_id in AggregationEmail.keys():
109
        aggregate = AggregationEmail.get(aggregate_id)
110
        aggregate.remove_self()
111

  
112
        try:
113
            role = get_publisher().role_class.get(aggregate_id)
114
        except KeyError:
115
            continue
116
        if not role.get_emails():
117
            continue
118
        if not aggregate.items:
119
            continue
120

  
121
        last_formdef = None
122
        body = []
123
        for item in sorted(aggregate.items, key=lambda x: (lax_int(x['formdef']), lax_int(x['formdata']))):
124
            formdef_id = item.get('formdef')
125
            if formdef_id in cache:
126
                formdef, formdata, workflow = cache[formdef_id]
127
            else:
128
                try:
129
                    formdef = FormDef.get(formdef_id)
130
                except KeyError:
131
                    # formdef has been deleted after AggregationEmail creation
132
                    continue
133
                formdata = formdef.data_class()
134
                workflow = formdef.workflow
135
                cache[formdef_id] = (formdef, formdata, workflow)
111
        with job.log_long_job('aggregation email %s' % aggregate_id) if job else contextlib.ExitStack():
112
            aggregate = AggregationEmail.get(aggregate_id)
113
            aggregate.remove_self()
136 114

  
137 115
            try:
138
                data = formdata.get(item.get('formdata'))
116
                role = get_publisher().role_class.get(aggregate_id)
139 117
            except KeyError:
140 118
                continue
141
            status = data.get_status()
142
            url = item.get('formurl')
119
            if not role.get_emails():
120
                continue
121
            if not aggregate.items:
122
                continue
143 123

  
144
            if last_formdef != formdef:
145
                if last_formdef is not None:
146
                    body.append('')  # blank line
147
                last_formdef = formdef
148
                body.append(formdef.name)
149
                body.append('-' * len(formdef.name))
150
                body.append('')
124
            last_formdef = None
125
            body = []
126
            for item in sorted(
127
                aggregate.items, key=lambda x: (lax_int(x['formdef']), lax_int(x['formdata']))
128
            ):
129
                formdef_id = item.get('formdef')
130
                if formdef_id in cache:
131
                    formdef, formdata, workflow = cache[formdef_id]
132
                else:
133
                    try:
134
                        formdef = FormDef.get(formdef_id)
135
                    except KeyError:
136
                        # formdef has been deleted after AggregationEmail creation
137
                        continue
138
                    formdata = formdef.data_class()
139
                    workflow = formdef.workflow
140
                    cache[formdef_id] = (formdef, formdata, workflow)
151 141

  
152
            body.append('- %sstatus (%s)' % (url, status.name))
142
                try:
143
                    data = formdata.get(item.get('formdata'))
144
                except KeyError:
145
                    continue
146
                status = data.get_status()
147
                url = item.get('formurl')
153 148

  
154
        if not body:
155
            continue
149
                if last_formdef != formdef:
150
                    if last_formdef is not None:
151
                        body.append('')  # blank line
152
                    last_formdef = formdef
153
                    body.append(formdef.name)
154
                    body.append('-' * len(formdef.name))
155
                    body.append('')
156

  
157
                body.append('- %sstatus (%s)' % (url, status.name))
158

  
159
            if not body:
160
                continue
156 161

  
157
        body = '\n'.join(body)
162
            body = '\n'.join(body)
158 163

  
159
        mail_subject = _('New arrivals')
160
        if site_name:
161
            mail_subject += ' (%s)' % site_name
164
            mail_subject = _('New arrivals')
165
            if site_name:
166
                mail_subject += ' (%s)' % site_name
162 167

  
163
        emails.email(mail_subject, body, email_rcpt=role.get_emails())
168
            emails.email(mail_subject, body, email_rcpt=role.get_emails())
164 169

  
165 170

  
166 171
def register_cronjob():
wcs/wf/jump.py
14 14
# You should have received a copy of the GNU General Public License
15 15
# along with this program; if not, see <http://www.gnu.org/licenses/>.
16 16

  
17
import contextlib
17 18
import datetime
18 19
import itertools
19 20
import json
......
313 314
    return wfs_status
314 315

  
315 316

  
316
def _apply_timeouts(publisher):
317
def _apply_timeouts(publisher, job=None):
317 318
    '''Traverse all filled form and apply expired timeout jumps if needed'''
318 319
    from ..carddef import CardDef
319 320
    from ..formdef import FormDef
......
324 325
        status_ids = wfs_status.get(str(formdef.workflow_id))
325 326
        if not status_ids:
326 327
            continue
327
        formdata_class = formdef.data_class()
328
        for status_id in status_ids:
329
            if publisher.is_using_postgresql():
330
                # get minimum delay for jumps in this status
331
                delay = math.inf
332
                for jump_action in wfs_status[str(formdef.workflow_id)][status_id]:
333
                    if Template.is_template_string(jump_action.timeout):
334
                        delay = 0
335
                        break
336
                    delay = min(delay, int(jump_action.timeout))
337
                # limit delay to minimal delay
338
                if delay < JUMP_TIMEOUT_INTERVAL * 60:
339
                    delay = JUMP_TIMEOUT_INTERVAL * 60
340

  
341
                criterias = [
342
                    Equal('status', status_id),
343
                    LessOrEqual(
344
                        'last_update_time',
345
                        (datetime.datetime.now() - datetime.timedelta(seconds=delay)).timetuple(),
346
                    ),
347
                ]
348
                formdatas = formdata_class.select_iterator(criterias, ignore_errors=True)
349
            else:
350
                formdatas = formdata_class.get_with_indexed_value('status', status_id, ignore_errors=True)
351

  
352
            for formdata in formdatas:
353
                for jump_action in wfs_status[str(formdef.workflow_id)][formdata.status]:
354
                    get_publisher().substitutions.reset()
355
                    get_publisher().substitutions.feed(get_publisher())
356
                    get_publisher().substitutions.feed(formdef)
357
                    get_publisher().substitutions.feed(formdata)
358
                    if jump_action.must_jump(formdata):
359
                        jump_and_perform(formdata, jump_action, event=('timeout-jump', jump_action.id))
360
                        break
328
        with job.log_long_job(
329
            '%s %s' % (formdef.xml_root_node, formdef.url_name)
330
        ) if job else contextlib.ExitStack():
331
            formdata_class = formdef.data_class()
332
            for status_id in status_ids:
333
                if publisher.is_using_postgresql():
334
                    # get minimum delay for jumps in this status
335
                    delay = math.inf
336
                    for jump_action in wfs_status[str(formdef.workflow_id)][status_id]:
337
                        if Template.is_template_string(jump_action.timeout):
338
                            delay = 0
339
                            break
340
                        delay = min(delay, int(jump_action.timeout))
341
                    # limit delay to minimal delay
342
                    if delay < JUMP_TIMEOUT_INTERVAL * 60:
343
                        delay = JUMP_TIMEOUT_INTERVAL * 60
344

  
345
                    criterias = [
346
                        Equal('status', status_id),
347
                        LessOrEqual(
348
                            'last_update_time',
349
                            (datetime.datetime.now() - datetime.timedelta(seconds=delay)).timetuple(),
350
                        ),
351
                    ]
352
                    formdatas = formdata_class.select_iterator(criterias, ignore_errors=True)
353
                else:
354
                    formdatas = formdata_class.get_with_indexed_value('status', status_id, ignore_errors=True)
355

  
356
                for formdata in formdatas:
357
                    for jump_action in wfs_status[str(formdef.workflow_id)][formdata.status]:
358
                        get_publisher().substitutions.reset()
359
                        get_publisher().substitutions.feed(get_publisher())
360
                        get_publisher().substitutions.feed(formdef)
361
                        get_publisher().substitutions.feed(formdata)
362
                        if jump_action.must_jump(formdata):
363
                            jump_and_perform(formdata, jump_action, event=('timeout-jump', jump_action.id))
364
                            break
361 365

  
362 366

  
363 367
def register_cronjob():
364
-