Projet

Général

Profil

0001-add-fault-tolerance-on-convert_value_from_anything-u.patch

Thomas Noël, 27 mars 2018 14:19

Télécharger (11,5 ko)

Voir les différences:

Subject: [PATCH] add fault tolerance on convert_value_from_anything usage
 (#22793)

... and make None a legitimate value.
 tests/test_hobo_notify.py   | 33 +++++++++++++++++++++++++
 tests/test_workflows.py     | 59 ++++++++++++++++++++++++++++++++++++---------
 wcs/ctl/hobo_notify.py      |  8 ++++--
 wcs/fields.py               |  7 +++---
 wcs/qommon/saml2.py         |  8 ++++--
 wcs/wf/backoffice_fields.py |  6 ++---
 wcs/wf/profile.py           | 10 ++++++--
 7 files changed, 108 insertions(+), 23 deletions(-)
tests/test_hobo_notify.py
544 544
    assert user.is_admin is True
545 545
    assert set(user.roles) == set([old_role.id])
546 546

  
547
    for birthdate in ('baddate', '', None):
548
        notification = {
549
            u'@type': u'provision',
550
            u'issuer': 'http://idp.example.net/idp/saml/metadata',
551
            u'audience': [u'test'],
552
            u'objects': {
553
                u'@type': 'user',
554
                u'data': [
555
                    {
556
                        u'uuid': u'a' * 32,
557
                        u'first_name': u'John',
558
                        u'last_name': u'Doe',
559
                        u'email': u'john.doe@example.net',
560
                        u'zipcode': u'13600',
561
                        u'birthdate': birthdate,
562
                        u'is_superuser': True,
563
                        u'roles': [
564
                            {
565
                                u'uuid': u'xyz',
566
                                u'name': u'Service état civil',
567
                                u'description': u'etc.',
568
                            },
569
                        ],
570
                    }
571
                ]
572
            }
573
        }
574
        CmdHoboNotify.process_notification(notification)
575
        assert User.count() == 1
576
        if birthdate is not None:  # wrong value : no nothing
577
            assert User.select()[0].form_data['_birthdate'].tm_year == 2000
578
        else:  # empty value : empty field
579
            assert User.select()[0].form_data['_birthdate'] is None
547 580

  
548 581
def notify_of_exception(exc_info, context):
549 582
    raise Exception(exc_info)
tests/test_workflows.py
2893 2893
            assert http_patch_request.call_count == 1
2894 2894
            assert http_patch_request.call_args[0][1] == '{"bar": "2018-03-20"}'
2895 2895

  
2896
    user.form_data['4'] = datetime.datetime.now().timetuple()
2897
    user.store()
2898
    year = User.get(user.id).form_data.get('4').tm_year
2899
    for date_value in ('baddate', '', {}, [], None):
2900
        item.fields = [{'field_id': 'bar', 'value': date_value}]
2901
        item.perform(formdata)
2902
        if date_value is not None:  # bad value : do nothing
2903
            assert User.get(user.id).form_data.get('4').tm_year == year
2904
        else:  # empty value : empty field
2905
            assert User.get(user.id).form_data.get('4') == None
2906

  
2907
        with mock.patch('wcs.wf.profile.http_patch_request') as http_patch_request:
2908
            http_patch_request.return_value = (None, 200, '', None)
2909
            get_response().process_after_jobs()
2910
            assert http_patch_request.call_count == 1
2911
            if date_value is not None:  # bad value : do nothing
2912
                assert http_patch_request.call_args[0][1] == '{}'
2913
            else:  # empty value : null field
2914
                assert http_patch_request.call_args[0][1] == '{"bar": null}'
2915

  
2896 2916
def test_set_backoffice_field(http_requests, two_pubs):
2897 2917
    Workflow.wipe()
2898 2918
    FormDef.wipe()
......
3054 3074
    assert formdata.data['bo1'].base_filename == 'hello.txt'
3055 3075
    assert formdata.data['bo1'].get_content() == 'HELLO WORLD'
3056 3076

  
3057
    # check wrong value
3058
    del formdata.data['bo1']
3059
    formdata.store()
3060
    assert not 'bo1' in formdata.data
3077
    hello_world = formdata.data['bo1']
3078
    # check wrong value, or None (no file)
3079
    for value in ('="HELLO"', '', 'BAD', '={}', '=[]', None):
3080
        formdata.data['bo1'] = hello_world
3081
        formdata.store()
3061 3082

  
3062
    item = SetBackofficeFieldsWorkflowStatusItem()
3063
    item.parent = st1
3064
    item.fields = [{'field_id': 'bo1', 'value': '="HELLO"'}]
3065
    item.perform(formdata)
3083
        item = SetBackofficeFieldsWorkflowStatusItem()
3084
        item.parent = st1
3085
        item.fields = [{'field_id': 'bo1', 'value': value}]
3086
        item.perform(formdata)
3066 3087

  
3067
    formdata = formdef.data_class().get(formdata.id)
3068
    assert formdata.data.get('bo1') is None
3088
        formdata = formdef.data_class().get(formdata.id)
3089
        if value is not None:  # wrong value : do nothing
3090
            assert formdata.data['bo1'].base_filename == 'hello.txt'
3091
            assert formdata.data['bo1'].get_content() == 'HELLO WORLD'
3092
        else:  # empty value : remove field
3093
            assert formdata.data['bo1'] is None
3069 3094

  
3070 3095
    # check wrong field
3071 3096
    item = SetBackofficeFieldsWorkflowStatusItem()
......
3230 3255
    formdata = formdef.data_class().get(formdata.id)
3231 3256
    assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date(2017, 3, 23)
3232 3257

  
3258
    # invalid values => do nothing
3259
    for value in ('plop', '', {}, [], '{{ blah }}'):
3260
        item = SetBackofficeFieldsWorkflowStatusItem()
3261
        item.parent = st1
3262
        item.fields = [{'field_id': 'bo1', 'value': value}]
3263

  
3264
        item.perform(formdata)
3265
        formdata = formdef.data_class().get(formdata.id)
3266
        assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date(2017, 3, 23)
3267

  
3268
    # None : empty date
3233 3269
    item = SetBackofficeFieldsWorkflowStatusItem()
3234 3270
    item.parent = st1
3235
    item.fields = [{'field_id': 'bo1', 'value': "plop"}]
3271
    item.fields = [{'field_id': 'bo1', 'value': None}]
3236 3272
    item.perform(formdata)
3237 3273

  
3238 3274
    formdata = formdef.data_class().get(formdata.id)
3239 3275
    assert formdata.data['bo1'] is None
3240 3276

  
3277

  
3241 3278
def test_set_backoffice_field_immediate_use(http_requests, two_pubs):
3242 3279
    Workflow.wipe()
3243 3280
    FormDef.wipe()
wcs/ctl/hobo_notify.py
189 189
                        if not field.id.startswith('_'):
190 190
                            continue
191 191
                        field_value = o.get(field.id[1:])
192
                        if field_value and field.convert_value_from_anything:
193
                            field_value = field.convert_value_from_anything(field_value)
192
                        if field.convert_value_from_anything:
193
                            try:
194
                                field_value = field.convert_value_from_anything(field_value)
195
                            except ValueError:
196
                                publisher.notify_of_exception(sys.exc_info(), context='[PROVISIONNING]')
197
                                continue
194 198
                        user.form_data[field.id] = field_value
195 199
                    user.name_identifiers = [uuid]
196 200
                    role_uuids = [role['uuid'] for role in o['roles']]
wcs/fields.py
788 788

  
789 789
    @classmethod
790 790
    def convert_value_from_anything(cls, value):
791
        if value is None:
792
            return None
791 793
        if hasattr(value, 'base_filename'):
792 794
            upload = PicklableUpload(value.base_filename,
793 795
                                     value.content_type or 'application/octet-stream')
......
972 974

  
973 975
    @classmethod
974 976
    def convert_value_from_anything(cls, value):
975
        try:
976
            date_value = evalutils.make_date(value).timetuple()
977
        except ValueError:
977
        if value is None:
978 978
            return None
979
        date_value = evalutils.make_date(value).timetuple()  # could raise ValueError
979 980
        return date_value
980 981

  
981 982
    def convert_value_from_str(self, value):
wcs/qommon/saml2.py
493 493
                continue
494 494
            field_value = d[key]
495 495
            field = dict_fields.get(field_id)
496
            if field and field_value and field.convert_value_from_anything:
497
                field_value = field.convert_value_from_anything(field_value)
496
            if field and field.convert_value_from_anything:
497
                try:
498
                    field_value = field.convert_value_from_anything(field_value)
499
                except ValueError:
500
                    publisher.notify_of_exception(sys.exc_info(), context='[SAML]')
501
                    continue
498 502
            if user.form_data.get(field_id) != field_value:
499 503
                user.form_data[field_id] = field_value
500 504
                logger.info('setting field %s of user %s to value %r', field_id, user.id, field_value)
wcs/wf/backoffice_fields.py
97 97
            try:
98 98
                new_value = self.compute(field['value'], raises=True)
99 99
            except:
100
                get_publisher().notify_of_exception(sys.exc_info())
100
                get_publisher().notify_of_exception(sys.exc_info(), context='[BO_FIELDS]')
101 101
                continue
102 102

  
103
            if new_value and formdef_field.convert_value_from_anything:
103
            if formdef_field.convert_value_from_anything:
104 104
                try:
105 105
                    new_value = formdef_field.convert_value_from_anything(new_value)
106 106
                except ValueError:
107
                    get_publisher().notify_of_exception(sys.exc_info())
107
                    get_publisher().notify_of_exception(sys.exc_info(), context='[BO_FIELDS]')
108 108
                    continue
109 109

  
110 110
            formdata.data['%s' % field['field_id']] = new_value
wcs/wf/profile.py
156 156
        for field in user_formdef.fields:
157 157
            if field.varname in new_data:
158 158
                field_value = new_data.get(field.varname)
159
                if field and field_value and field.convert_value_from_anything:
160
                    field_value = field.convert_value_from_anything(field_value)
159
                if field and field.convert_value_from_anything:
160
                    try:
161
                        field_value = field.convert_value_from_anything(field_value)
162
                    except ValueError:
163
                        get_publisher().notify_of_exception(sys.exc_info(), context='[PROFILE]')
164
                        # invalid attribute, do not update it
165
                        del new_data[field.varname]
166
                        continue
161 167
                new_user_data[field.id] = field_value
162 168
                # also change initial value to the converted one, as the
163 169
                # initial dictionary is used when sending the profile changes
164
-