Projet

Général

Profil

0001-general-add-new-grep-command-53994.patch

Frédéric Péters, 15 novembre 2022 18:21

Télécharger (11,7 ko)

Voir les différences:

Subject: [PATCH] general: add new grep command (#53994)

 tests/test_ctl.py                   |  83 ++++++++++++++++
 wcs/ctl/management/commands/grep.py | 144 ++++++++++++++++++++++++++++
 wcs/formdef.py                      |  13 +--
 3 files changed, 234 insertions(+), 6 deletions(-)
 create mode 100644 wcs/ctl/management/commands/grep.py
tests/test_ctl.py
16 16
from wcs.ctl.wipe_data import CmdWipeData
17 17
from wcs.fields import EmailField, StringField
18 18
from wcs.formdef import FormDef
19
from wcs.mail_templates import MailTemplate
19 20
from wcs.qommon.afterjobs import AfterJob
20 21
from wcs.qommon.management.commands.collectstatic import Command as CmdCollectStatic
21 22
from wcs.qommon.management.commands.migrate import Command as CmdMigrate
22 23
from wcs.qommon.management.commands.migrate_schemas import Command as CmdMigrateSchemas
23 24
from wcs.sql import cleanup_connection, get_connection_and_cursor
24 25
from wcs.workflows import Workflow, WorkflowStatusItem
26
from wcs.wscalls import NamedWsCall
25 27

  
26 28
from .utilities import clean_temporary_pub, create_temporary_pub
27 29

  
......
530 532
    # just make sure it loads correctly
531 533
    with pytest.raises(SystemExit):
532 534
        call_command('makemessages', '--help')
535

  
536

  
537
def test_grep(pub):
538
    FormDef.wipe()
539
    Workflow.wipe()
540
    NamedWsCall.wipe()
541
    MailTemplate.wipe()
542

  
543
    formdef = FormDef()
544
    formdef.name = 'test'
545
    formdef.fields = [StringField(id='1', label='Your Name'), EmailField(id='2', label='Email')]
546
    formdef.options = {'x': 'Name'}
547
    formdef.store()
548

  
549
    workflow = Workflow()
550
    workflow.name = 'test'
551
    st = workflow.add_status('status')
552
    st.add_action('aggregationemail')
553
    workflow.store()
554

  
555
    wscall = NamedWsCall()
556
    wscall.name = 'Hello'
557
    wscall.request = {'url': 'http://example.org/api/test', 'qs_data': {'a': 'b'}}
558
    wscall.store()
559

  
560
    mail_template = MailTemplate(name='test mail template')
561
    mail_template.subject = 'test subject'
562
    mail_template.body = 'test body'
563
    mail_template.attachments = ['form_var_file1_raw']
564
    mail_template.store()
565

  
566
    with pytest.raises(CommandError):
567
        call_command('grep')
568

  
569
    with pytest.raises(CommandError):
570
        call_command('grep', 'xxx')
571

  
572
    with pytest.raises(CommandError):
573
        call_command('grep', '--all-tenants', '--domain', 'example.net', 'xxx')
574

  
575
    with pytest.raises(CommandError):
576
        call_command('grep', '--domain', 'example.net', '--type', 'foo', 'xxx')
577

  
578
    with mock.patch('wcs.ctl.management.commands.grep.Command.print_hit') as print_hit:
579
        call_command('grep', '--domain', 'example.net', '--type', 'action-types', 'email')
580
        assert print_hit.call_args[0] == ('http://example.net/backoffice/workflows/1/status/1/items/1/',)
581
        print_hit.reset_mock()
582

  
583
        call_command('grep', '--domain', 'example.net', '--type', 'field-types', 'email')
584
        assert print_hit.call_args[0] == ('http://example.net/backoffice/forms/1/fields/2/',)
585
        print_hit.reset_mock()
586

  
587
        call_command('grep', '--domain', 'example.net', 'Name')
588
        assert print_hit.call_count == 2
589
        assert print_hit.call_args_list[0].args == (
590
            'http://example.net/backoffice/forms/1/fields/1/',
591
            'Your Name',
592
        )
593
        assert print_hit.call_args_list[1].args == (
594
            'http://example.net/backoffice/forms/1/workflow-variables',
595
            'Name',
596
        )
597
        print_hit.reset_mock()
598

  
599
        call_command('grep', '--domain', 'example.net', '/api/test')
600
        assert print_hit.call_args[0] == (
601
            'http://example.net/backoffice/settings/wscalls/hello/',
602
            'http://example.org/api/test',
603
        )
604
        print_hit.reset_mock()
605

  
606
        call_command('grep', '--domain', 'example.net', 'form_var_file1_raw')
607
        assert print_hit.call_args[0] == (
608
            'http://example.net/backoffice/workflows/mail-templates/1/',
609
            'form_var_file1_raw',
610
        )
611
        print_hit.reset_mock()
612

  
613
        call_command('grep', '--domain', 'example.net', 'xxx')
614
        assert print_hit.call_count == 0
615
        print_hit.reset_mock()
wcs/ctl/management/commands/grep.py
1
# w.c.s. - web application for online forms
2
# Copyright (C) 2005-2022  Entr'ouvert
3
#
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU General Public License for more details.
13
#
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, see <http://www.gnu.org/licenses/>.
16

  
17
import itertools
18

  
19
from django.core.management import CommandError
20

  
21
from wcs.data_sources import NamedDataSource
22
from wcs.formdef import get_formdefs_of_all_kinds
23
from wcs.mail_templates import MailTemplate
24
from wcs.qommon.publisher import get_publisher_class
25
from wcs.workflows import Workflow
26
from wcs.wscalls import NamedWsCall
27

  
28
from . import TenantCommand
29

  
30

  
31
class Command(TenantCommand):
32
    help = '''Grep tenant(s) for a given pattern'''
33

  
34
    def add_arguments(self, parser):
35
        parser.add_argument('-d', '--domain', '--vhost', metavar='DOMAIN')
36
        parser.add_argument('--all-tenants', action='store_true')
37
        parser.add_argument(
38
            '--type',
39
            metavar='TYPE',
40
            default='strings',
41
            choices=['strings', 'action-types', 'field-types'],
42
            help='strings (default), action-types, field-types',
43
        )
44
        parser.add_argument('--urls', action='store_true', help='display URLs to hits')
45
        parser.add_argument('pattern', nargs='+')
46

  
47
    def handle(self, *args, **options):
48
        domain = options.get('domain')
49
        all_tenants = options.get('all_tenants')
50
        if domain and all_tenants:
51
            raise CommandError('--domain and --all-tenants are exclusive')
52
        if not (domain or all_tenants):
53
            raise CommandError('--domain or --all-tenants are required')
54
        if domain:
55
            domains = [domain]
56
        else:
57
            domains = [x.hostname for x in get_publisher_class().get_tenants()]
58

  
59
        self.pattern = ' '.join(options.get('pattern'))
60
        self.urls = options.get('urls')
61
        try:
62
            method = getattr(self, 'grep_' + options.get('type').replace('-', '_'))
63
        except AttributeError:
64
            raise CommandError('unknown grep type')
65
        for domain in domains:
66
            self.init_tenant_publisher(domain, register_tld_names=False)
67
            method()
68

  
69
    def grep_strings(self):
70
        for formdef in get_formdefs_of_all_kinds(order_by='id'):
71
            url = formdef.get_admin_url()
72
            for attr in formdef.TEXT_ATTRIBUTES:
73
                if self.check_string(getattr(formdef, attr, None), url=url):
74
                    break
75

  
76
            for field in formdef.fields or []:
77
                url = formdef.get_field_admin_url(field)
78
                for attr in field.get_admin_attributes():
79
                    if self.check_string(getattr(field, attr, None), url=url):
80
                        break
81
            if hasattr(formdef, 'options'):
82
                url = formdef.get_admin_url() + 'workflow-variables'
83
                for option_value in (formdef.options or {}).values():
84
                    if self.check_string(option_value, url=url):
85
                        break
86

  
87
        select_kwargs = {'ignore_errors': True, 'ignore_migration': True, 'order_by': 'id'}
88
        for workflow in Workflow.select(**select_kwargs):
89
            for action in workflow.get_all_items():
90
                url = action.get_admin_url()
91
                for parameter in action.get_parameters():
92
                    if self.check_string(getattr(action, parameter, None), url=url):
93
                        break
94

  
95
        for obj in itertools.chain(
96
            NamedDataSource.select(**select_kwargs),
97
            NamedWsCall.select(**select_kwargs),
98
            MailTemplate.select(**select_kwargs),
99
        ):
100
            url = obj.get_admin_url()
101
            for attr, attr_type in obj.XML_NODES:
102
                attr_value = getattr(obj, attr, None)
103
                if attr_type == 'str':
104
                    if self.check_string(attr_value, url=url):
105
                        break
106
                elif attr_type == 'str_list':
107
                    for str_item in attr_value or []:
108
                        if self.check_string(str_item, url=url):
109
                            break
110
                    else:
111
                        continue
112
                    break
113
                elif isinstance(attr_value, dict):
114
                    for str_item in attr_value.values():
115
                        if self.check_string(str_item, url=url):
116
                            break
117
                    else:
118
                        continue
119
                    break
120

  
121
    def check_string(self, value, url):
122
        if isinstance(value, str) and self.pattern in value:
123
            self.print_hit(url, value)
124
            return True
125

  
126
    def grep_action_types(self):
127
        for workflow in Workflow.select(ignore_errors=True, ignore_migration=True, order_by='id'):
128
            for action in workflow.get_all_items():
129
                url = action.get_admin_url()
130
                if self.pattern in action.key:
131
                    self.print_hit(url)
132

  
133
    def grep_field_types(self):
134
        for formdef in get_formdefs_of_all_kinds(order_by='id'):
135
            for field in formdef.fields or []:
136
                url = formdef.get_field_admin_url(field)
137
                if self.pattern in field.key:
138
                    self.print_hit(url)
139

  
140
    def print_hit(self, url, *args):
141
        if self.urls:
142
            print(url)
143
        else:
144
            print(url, *args)
wcs/formdef.py
2183 2183
            pass
2184 2184

  
2185 2185

  
2186
def get_formdefs_of_all_kinds():
2186
def get_formdefs_of_all_kinds(**kwargs):
2187 2187
    from wcs.admin.settings import UserFieldsFormDef
2188 2188
    from wcs.blocks import BlockDef
2189 2189
    from wcs.carddef import CardDef
2190 2190
    from wcs.wf.form import FormWorkflowStatusItem
2191 2191
    from wcs.workflows import Workflow
2192 2192

  
2193
    kwargs = {
2193
    select_kwargs = {
2194 2194
        'ignore_errors': True,
2195 2195
        'ignore_migration': True,
2196 2196
    }
2197
    select_kwargs.update(kwargs)
2197 2198
    formdefs = [UserFieldsFormDef()]
2198
    formdefs += FormDef.select(**kwargs)
2199
    formdefs += BlockDef.select(**kwargs)
2200
    formdefs += CardDef.select(**kwargs)
2201
    for workflow in Workflow.select(**kwargs):
2199
    formdefs += FormDef.select(**select_kwargs)
2200
    formdefs += BlockDef.select(**select_kwargs)
2201
    formdefs += CardDef.select(**select_kwargs)
2202
    for workflow in Workflow.select(**select_kwargs):
2202 2203
        for status in workflow.possible_status:
2203 2204
            for item in status.items:
2204 2205
                if isinstance(item, FormWorkflowStatusItem) and item.formdef:
2205
-