Projet

Général

Profil

0002-misc-more-logs-for-some-cron-jobs-57604.patch

Lauréline Guérin, 21 octobre 2021 15:09

Télécharger (15,4 ko)

Voir les différences:

Subject: [PATCH 2/2] misc: more logs for some cron jobs (#57604)

 wcs/formdef.py               | 19 ++++---
 wcs/publisher.py             |  8 +--
 wcs/qommon/ident/password.py |  4 +-
 wcs/qommon/publisher.py      | 12 ++---
 wcs/wf/aggregation_email.py  | 98 +++++++++++++++++++-----------------
 wcs/wf/jump.py               | 75 ++++++++++++++-------------
 6 files changed, 117 insertions(+), 99 deletions(-)
wcs/formdef.py
15 15
# along with this program; if not, see <http://www.gnu.org/licenses/>.
16 16

  
17 17
import base64
18
import contextlib
18 19
import copy
19 20
import datetime
20 21
import glob
......
1767 1768
Substitutions.register('form_name', category=_('Form'), comment=_('Form Name'))
1768 1769

  
1769 1770

  
1770
def clean_drafts(publisher):
1771
def clean_drafts(publisher, **kwargs):
1771 1772
    import wcs.qommon.storage as st
1772 1773
    from wcs.carddef import CardDef
1773 1774

  
1775
    job = kwargs.pop('job', None)
1774 1776
    for formdef in FormDef.select() + CardDef.select():
1775
        removal_date = datetime.date.today() - datetime.timedelta(days=formdef.get_drafts_lifespan())
1776
        for formdata in formdef.data_class().select(
1777
            [st.Equal('status', 'draft'), st.Less('receipt_time', removal_date.timetuple())]
1778
        ):
1779
            formdata.remove_self()
1777
        with job.log_long_job(
1778
            '%s %s' % (formdef.xml_root_node, formdef.url_name)
1779
        ) if job else contextlib.ExitStack():
1780
            removal_date = datetime.date.today() - datetime.timedelta(days=formdef.get_drafts_lifespan())
1781
            for formdata in formdef.data_class().select(
1782
                [st.Equal('status', 'draft'), st.Less('receipt_time', removal_date.timetuple())]
1783
            ):
1784
                formdata.remove_self()
1780 1785

  
1781 1786

  
1782
def clean_unused_files(publisher):
1787
def clean_unused_files(publisher, **kwargs):
1783 1788
    unused_files_behaviour = publisher.get_site_option('unused-files-behaviour')
1784 1789
    if unused_files_behaviour not in ('move', 'remove'):
1785 1790
        return
wcs/publisher.py
22 22
import sys
23 23
import traceback
24 24
import zipfile
25
from contextlib import contextmanager
25
from contextlib import ExitStack, contextmanager
26 26

  
27 27
from django.utils.encoding import force_text
28 28

  
......
427 427
            # Could also could happen on file descriptor exhaustion.
428 428
            pass
429 429

  
430
    def apply_global_action_timeouts(self):
430
    def apply_global_action_timeouts(self, **kwargs):
431 431
        from wcs.workflows import Workflow, WorkflowGlobalActionTimeoutTrigger
432 432

  
433
        job = kwargs.pop('job', None)
433 434
        for workflow in Workflow.select():
434
            WorkflowGlobalActionTimeoutTrigger.apply(workflow)
435
            with job.log_long_job('workflow %s' % workflow.url_name) if job else ExitStack():
436
                WorkflowGlobalActionTimeoutTrigger.apply(workflow)
435 437

  
436 438
    def migrate_sql(self):
437 439
        from . import sql
wcs/qommon/ident/password.py
1373 1373
)
1374 1374

  
1375 1375

  
1376
def handle_unused_accounts(publisher):
1376
def handle_unused_accounts(publisher, **kwargs):
1377 1377
    if 'password' not in get_cfg('identification', {}).get('methods', []):
1378 1378
        return
1379 1379
    identities_cfg = get_cfg('identities', {})
......
1425 1425
            # XXX: notify admin too
1426 1426

  
1427 1427

  
1428
def handle_expired_tokens(publisher):
1428
def handle_expired_tokens(publisher, **kwargs):
1429 1429
    if 'password' not in get_cfg('identification', {}).get('methods', []):
1430 1430
        return
1431 1431
    now = time.time()
wcs/qommon/publisher.py
569 569
            return
570 570
        cls.cronjobs.append(cronjob)
571 571

  
572
    def clean_nonces(self, delta=60, now=None):
572
    def clean_nonces(self, delta=60, now=None, **kwargs):
573 573
        nonce_dir = os.path.join(get_publisher().app_dir, 'nonces')
574 574
        if not os.path.exists(nonce_dir):
575 575
            return
......
585 585
        except locket.LockError:
586 586
            pass
587 587

  
588
    def clean_sessions(self):
588
    def clean_sessions(self, **kwargs):
589 589
        cleaning_lock_file = os.path.join(self.app_dir, 'cleaning_sessions.lock')
590 590
        try:
591 591
            with locket.lock_file(cleaning_lock_file, timeout=0):
......
619 619
        except locket.LockError:
620 620
            pass
621 621

  
622
    def clean_afterjobs(self):
622
    def clean_afterjobs(self, **kwargs):
623 623
        now = time.time()
624 624
        for job_id in AfterJob.keys():
625 625
            job = AfterJob.get(job_id)
......
640 640
                except OSError:
641 641
                    pass
642 642

  
643
    def clean_tempfiles(self):
643
    def clean_tempfiles(self, **kwargs):
644 644
        now = time.time()
645 645
        one_month_ago = now - 30 * 86400
646 646
        self._clean_files(one_month_ago, os.path.join(self.app_dir, 'tempfiles'))
647 647

  
648
    def clean_thumbnails(self):
648
    def clean_thumbnails(self, **kwargs):
649 649
        now = time.time()
650 650
        one_month_ago = now - 30 * 86400
651 651
        self._clean_files(one_month_ago, os.path.join(self.app_dir, 'thumbs'))
652 652

  
653
    def clean_loggederrors(self):
653
    def clean_loggederrors(self, **kwargs):
654 654
        if not self.loggederror_class:
655 655
            return
656 656

  
wcs/wf/aggregation_email.py
14 14
# You should have received a copy of the GNU General Public License
15 15
# along with this program; if not, see <http://www.gnu.org/licenses/>.
16 16

  
17
import contextlib
18

  
17 19
from quixote import get_publisher
18 20

  
19 21
from wcs.workflows import WorkflowStatusItem, register_item_class
......
98 100
        return 10000
99 101

  
100 102

  
101
def send_aggregation_emails(publisher):
103
def send_aggregation_emails(publisher, **kwargs):
102 104
    from wcs.formdef import FormDef
103 105

  
104 106
    publisher.reload_cfg()
105 107
    site_name = publisher.cfg.get('misc', {}).get('sitename', None)
108
    job = kwargs.pop('job', None)
106 109

  
107 110
    cache = {}
108 111
    for aggregate_id in AggregationEmail.keys():
109
        aggregate = AggregationEmail.get(aggregate_id)
110
        aggregate.remove_self()
111

  
112
        try:
113
            role = get_publisher().role_class.get(aggregate_id)
114
        except KeyError:
115
            continue
116
        if not role.get_emails():
117
            continue
118
        if not aggregate.items:
119
            continue
120

  
121
        last_formdef = None
122
        body = []
123
        for item in sorted(aggregate.items, key=lambda x: (lax_int(x['formdef']), lax_int(x['formdata']))):
124
            formdef_id = item.get('formdef')
125
            if formdef_id in cache:
126
                formdef, formdata, workflow = cache[formdef_id]
127
            else:
128
                try:
129
                    formdef = FormDef.get(formdef_id)
130
                except KeyError:
131
                    # formdef has been deleted after AggregationEmail creation
132
                    continue
133
                formdata = formdef.data_class()
134
                workflow = formdef.workflow
135
                cache[formdef_id] = (formdef, formdata, workflow)
112
        with job.log_long_job('aggregation email %s' % aggregate_id) if job else contextlib.ExitStack():
113
            aggregate = AggregationEmail.get(aggregate_id)
114
            aggregate.remove_self()
136 115

  
137 116
            try:
138
                data = formdata.get(item.get('formdata'))
117
                role = get_publisher().role_class.get(aggregate_id)
139 118
            except KeyError:
140 119
                continue
141
            status = data.get_status()
142
            url = item.get('formurl')
120
            if not role.get_emails():
121
                continue
122
            if not aggregate.items:
123
                continue
143 124

  
144
            if last_formdef != formdef:
145
                if last_formdef is not None:
146
                    body.append('')  # blank line
147
                last_formdef = formdef
148
                body.append(formdef.name)
149
                body.append('-' * len(formdef.name))
150
                body.append('')
125
            last_formdef = None
126
            body = []
127
            for item in sorted(
128
                aggregate.items, key=lambda x: (lax_int(x['formdef']), lax_int(x['formdata']))
129
            ):
130
                formdef_id = item.get('formdef')
131
                if formdef_id in cache:
132
                    formdef, formdata, workflow = cache[formdef_id]
133
                else:
134
                    try:
135
                        formdef = FormDef.get(formdef_id)
136
                    except KeyError:
137
                        # formdef has been deleted after AggregationEmail creation
138
                        continue
139
                    formdata = formdef.data_class()
140
                    workflow = formdef.workflow
141
                    cache[formdef_id] = (formdef, formdata, workflow)
151 142

  
152
            body.append('- %sstatus (%s)' % (url, status.name))
143
                try:
144
                    data = formdata.get(item.get('formdata'))
145
                except KeyError:
146
                    continue
147
                status = data.get_status()
148
                url = item.get('formurl')
153 149

  
154
        if not body:
155
            continue
150
                if last_formdef != formdef:
151
                    if last_formdef is not None:
152
                        body.append('')  # blank line
153
                    last_formdef = formdef
154
                    body.append(formdef.name)
155
                    body.append('-' * len(formdef.name))
156
                    body.append('')
157

  
158
                body.append('- %sstatus (%s)' % (url, status.name))
159

  
160
            if not body:
161
                continue
156 162

  
157
        body = '\n'.join(body)
163
            body = '\n'.join(body)
158 164

  
159
        mail_subject = _('New arrivals')
160
        if site_name:
161
            mail_subject += ' (%s)' % site_name
165
            mail_subject = _('New arrivals')
166
            if site_name:
167
                mail_subject += ' (%s)' % site_name
162 168

  
163
        emails.email(mail_subject, body, email_rcpt=role.get_emails())
169
            emails.email(mail_subject, body, email_rcpt=role.get_emails())
164 170

  
165 171

  
166 172
def register_cronjob():
wcs/wf/jump.py
14 14
# You should have received a copy of the GNU General Public License
15 15
# along with this program; if not, see <http://www.gnu.org/licenses/>.
16 16

  
17
import contextlib
17 18
import datetime
18 19
import itertools
19 20
import json
......
313 314
    return wfs_status
314 315

  
315 316

  
316
def _apply_timeouts(publisher):
317
def _apply_timeouts(publisher, **kwargs):
317 318
    '''Traverse all filled form and apply expired timeout jumps if needed'''
318 319
    from ..carddef import CardDef
319 320
    from ..formdef import FormDef
320 321

  
321 322
    wfs_status = workflows_with_timeout()
323
    job = kwargs.pop('job', None)
322 324

  
323 325
    for formdef in itertools.chain(FormDef.select(ignore_errors=True), CardDef.select(ignore_errors=True)):
324 326
        status_ids = wfs_status.get(str(formdef.workflow_id))
325 327
        if not status_ids:
326 328
            continue
327
        formdata_class = formdef.data_class()
328
        for status_id in status_ids:
329
            if publisher.is_using_postgresql():
330
                # get minimum delay for jumps in this status
331
                delay = math.inf
332
                for jump_action in wfs_status[str(formdef.workflow_id)][status_id]:
333
                    if Template.is_template_string(jump_action.timeout):
334
                        delay = 0
335
                        break
336
                    delay = min(delay, int(jump_action.timeout))
337
                # limit delay to minimal delay
338
                if delay < JUMP_TIMEOUT_INTERVAL * 60:
339
                    delay = JUMP_TIMEOUT_INTERVAL * 60
340

  
341
                criterias = [
342
                    Equal('status', status_id),
343
                    LessOrEqual(
344
                        'last_update_time',
345
                        (datetime.datetime.now() - datetime.timedelta(seconds=delay)).timetuple(),
346
                    ),
347
                ]
348
                formdatas = formdata_class.select_iterator(criterias, ignore_errors=True)
349
            else:
350
                formdatas = formdata_class.get_with_indexed_value('status', status_id, ignore_errors=True)
351

  
352
            for formdata in formdatas:
353
                for jump_action in wfs_status[str(formdef.workflow_id)][formdata.status]:
354
                    get_publisher().substitutions.reset()
355
                    get_publisher().substitutions.feed(get_publisher())
356
                    get_publisher().substitutions.feed(formdef)
357
                    get_publisher().substitutions.feed(formdata)
358
                    if jump_action.must_jump(formdata):
359
                        jump_and_perform(formdata, jump_action, event=('timeout-jump', jump_action.id))
360
                        break
329
        with job.log_long_job(
330
            '%s %s' % (formdef.xml_root_node, formdef.url_name)
331
        ) if job else contextlib.ExitStack():
332
            formdata_class = formdef.data_class()
333
            for status_id in status_ids:
334
                if publisher.is_using_postgresql():
335
                    # get minimum delay for jumps in this status
336
                    delay = math.inf
337
                    for jump_action in wfs_status[str(formdef.workflow_id)][status_id]:
338
                        if Template.is_template_string(jump_action.timeout):
339
                            delay = 0
340
                            break
341
                        delay = min(delay, int(jump_action.timeout))
342
                    # limit delay to minimal delay
343
                    if delay < JUMP_TIMEOUT_INTERVAL * 60:
344
                        delay = JUMP_TIMEOUT_INTERVAL * 60
345

  
346
                    criterias = [
347
                        Equal('status', status_id),
348
                        LessOrEqual(
349
                            'last_update_time',
350
                            (datetime.datetime.now() - datetime.timedelta(seconds=delay)).timetuple(),
351
                        ),
352
                    ]
353
                    formdatas = formdata_class.select_iterator(criterias, ignore_errors=True)
354
                else:
355
                    formdatas = formdata_class.get_with_indexed_value('status', status_id, ignore_errors=True)
356

  
357
                for formdata in formdatas:
358
                    for jump_action in wfs_status[str(formdef.workflow_id)][formdata.status]:
359
                        get_publisher().substitutions.reset()
360
                        get_publisher().substitutions.feed(get_publisher())
361
                        get_publisher().substitutions.feed(formdef)
362
                        get_publisher().substitutions.feed(formdata)
363
                        if jump_action.must_jump(formdata):
364
                            jump_and_perform(formdata, jump_action, event=('timeout-jump', jump_action.id))
365
                            break
361 366

  
362 367

  
363 368
def register_cronjob():
364
-