0001-add-fault-tolerance-on-convert_value_from_anything-u.patch
tests/test_hobo_notify.py | ||
---|---|---|
544 | 544 |
assert user.is_admin is True |
545 | 545 |
assert set(user.roles) == set([old_role.id]) |
546 | 546 | |
547 |
for birthdate in ('baddate', '', None): |
|
548 |
notification = { |
|
549 |
u'@type': u'provision', |
|
550 |
u'issuer': 'http://idp.example.net/idp/saml/metadata', |
|
551 |
u'audience': [u'test'], |
|
552 |
u'objects': { |
|
553 |
u'@type': 'user', |
|
554 |
u'data': [ |
|
555 |
{ |
|
556 |
u'uuid': u'a' * 32, |
|
557 |
u'first_name': u'John', |
|
558 |
u'last_name': u'Doe', |
|
559 |
u'email': u'john.doe@example.net', |
|
560 |
u'zipcode': u'13600', |
|
561 |
u'birthdate': birthdate, |
|
562 |
u'is_superuser': True, |
|
563 |
u'roles': [ |
|
564 |
{ |
|
565 |
u'uuid': u'xyz', |
|
566 |
u'name': u'Service état civil', |
|
567 |
u'description': u'etc.', |
|
568 |
}, |
|
569 |
], |
|
570 |
} |
|
571 |
] |
|
572 |
} |
|
573 |
} |
|
574 |
CmdHoboNotify.process_notification(notification) |
|
575 |
assert User.count() == 1 |
|
576 |
if birthdate is not None: # wrong value : no nothing |
|
577 |
assert User.select()[0].form_data['_birthdate'].tm_year == 2000 |
|
578 |
else: # empty value : empty field |
|
579 |
assert User.select()[0].form_data['_birthdate'] is None |
|
547 | 580 | |
548 | 581 |
def notify_of_exception(exc_info, context): |
549 | 582 |
raise Exception(exc_info) |
tests/test_workflows.py | ||
---|---|---|
2893 | 2893 |
assert http_patch_request.call_count == 1 |
2894 | 2894 |
assert http_patch_request.call_args[0][1] == '{"bar": "2018-03-20"}' |
2895 | 2895 | |
2896 |
user.form_data['4'] = datetime.datetime.now().timetuple() |
|
2897 |
user.store() |
|
2898 |
year = User.get(user.id).form_data.get('4').tm_year |
|
2899 |
for date_value in ('baddate', '', {}, [], None): |
|
2900 |
item.fields = [{'field_id': 'bar', 'value': date_value}] |
|
2901 |
item.perform(formdata) |
|
2902 |
if date_value is not None: # bad value : do nothing |
|
2903 |
assert User.get(user.id).form_data.get('4').tm_year == year |
|
2904 |
else: # empty value : empty field |
|
2905 |
assert User.get(user.id).form_data.get('4') == None |
|
2906 | ||
2907 |
with mock.patch('wcs.wf.profile.http_patch_request') as http_patch_request: |
|
2908 |
http_patch_request.return_value = (None, 200, '', None) |
|
2909 |
get_response().process_after_jobs() |
|
2910 |
assert http_patch_request.call_count == 1 |
|
2911 |
if date_value is not None: # bad value : do nothing |
|
2912 |
assert http_patch_request.call_args[0][1] == '{}' |
|
2913 |
else: # empty value : null field |
|
2914 |
assert http_patch_request.call_args[0][1] == '{"bar": null}' |
|
2915 | ||
2896 | 2916 |
def test_set_backoffice_field(http_requests, two_pubs): |
2897 | 2917 |
Workflow.wipe() |
2898 | 2918 |
FormDef.wipe() |
... | ... | |
3054 | 3074 |
assert formdata.data['bo1'].base_filename == 'hello.txt' |
3055 | 3075 |
assert formdata.data['bo1'].get_content() == 'HELLO WORLD' |
3056 | 3076 | |
3057 |
# check wrong value |
|
3058 |
del formdata.data['bo1'] |
|
3059 |
formdata.store() |
|
3060 |
assert not 'bo1' in formdata.data |
|
3077 |
hello_world = formdata.data['bo1'] |
|
3078 |
# check wrong value, or None (no file) |
|
3079 |
for value in ('="HELLO"', '', 'BAD', '={}', '=[]', None): |
|
3080 |
formdata.data['bo1'] = hello_world |
|
3081 |
formdata.store() |
|
3061 | 3082 | |
3062 |
item = SetBackofficeFieldsWorkflowStatusItem() |
|
3063 |
item.parent = st1 |
|
3064 |
item.fields = [{'field_id': 'bo1', 'value': '="HELLO"'}]
|
|
3065 |
item.perform(formdata) |
|
3083 |
item = SetBackofficeFieldsWorkflowStatusItem()
|
|
3084 |
item.parent = st1
|
|
3085 |
item.fields = [{'field_id': 'bo1', 'value': value}]
|
|
3086 |
item.perform(formdata)
|
|
3066 | 3087 | |
3067 |
formdata = formdef.data_class().get(formdata.id) |
|
3068 |
assert formdata.data.get('bo1') is None |
|
3088 |
formdata = formdef.data_class().get(formdata.id) |
|
3089 |
if value is not None: # wrong value : do nothing |
|
3090 |
assert formdata.data['bo1'].base_filename == 'hello.txt' |
|
3091 |
assert formdata.data['bo1'].get_content() == 'HELLO WORLD' |
|
3092 |
else: # empty value : remove field |
|
3093 |
assert formdata.data['bo1'] is None |
|
3069 | 3094 | |
3070 | 3095 |
# check wrong field |
3071 | 3096 |
item = SetBackofficeFieldsWorkflowStatusItem() |
... | ... | |
3230 | 3255 |
formdata = formdef.data_class().get(formdata.id) |
3231 | 3256 |
assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date(2017, 3, 23) |
3232 | 3257 | |
3258 |
# invalid values => do nothing |
|
3259 |
for value in ('plop', '', {}, [], '{{ blah }}'): |
|
3260 |
item = SetBackofficeFieldsWorkflowStatusItem() |
|
3261 |
item.parent = st1 |
|
3262 |
item.fields = [{'field_id': 'bo1', 'value': value}] |
|
3263 | ||
3264 |
item.perform(formdata) |
|
3265 |
formdata = formdef.data_class().get(formdata.id) |
|
3266 |
assert datetime.date(*formdata.data['bo1'][:3]) == datetime.date(2017, 3, 23) |
|
3267 | ||
3268 |
# None : empty date |
|
3233 | 3269 |
item = SetBackofficeFieldsWorkflowStatusItem() |
3234 | 3270 |
item.parent = st1 |
3235 |
item.fields = [{'field_id': 'bo1', 'value': "plop"}]
|
|
3271 |
item.fields = [{'field_id': 'bo1', 'value': None}]
|
|
3236 | 3272 |
item.perform(formdata) |
3237 | 3273 | |
3238 | 3274 |
formdata = formdef.data_class().get(formdata.id) |
3239 | 3275 |
assert formdata.data['bo1'] is None |
3240 | 3276 | |
3277 | ||
3241 | 3278 |
def test_set_backoffice_field_immediate_use(http_requests, two_pubs): |
3242 | 3279 |
Workflow.wipe() |
3243 | 3280 |
FormDef.wipe() |
wcs/ctl/hobo_notify.py | ||
---|---|---|
189 | 189 |
if not field.id.startswith('_'): |
190 | 190 |
continue |
191 | 191 |
field_value = o.get(field.id[1:]) |
192 |
if field_value and field.convert_value_from_anything: |
|
193 |
field_value = field.convert_value_from_anything(field_value) |
|
192 |
if field.convert_value_from_anything: |
|
193 |
try: |
|
194 |
field_value = field.convert_value_from_anything(field_value) |
|
195 |
except ValueError: |
|
196 |
publisher.notify_of_exception(sys.exc_info(), context='[PROVISIONNING]') |
|
197 |
continue |
|
194 | 198 |
user.form_data[field.id] = field_value |
195 | 199 |
user.name_identifiers = [uuid] |
196 | 200 |
role_uuids = [role['uuid'] for role in o['roles']] |
wcs/fields.py | ||
---|---|---|
788 | 788 | |
789 | 789 |
@classmethod |
790 | 790 |
def convert_value_from_anything(cls, value): |
791 |
if value is None: |
|
792 |
return None |
|
791 | 793 |
if hasattr(value, 'base_filename'): |
792 | 794 |
upload = PicklableUpload(value.base_filename, |
793 | 795 |
value.content_type or 'application/octet-stream') |
... | ... | |
972 | 974 | |
973 | 975 |
@classmethod |
974 | 976 |
def convert_value_from_anything(cls, value): |
975 |
try: |
|
976 |
date_value = evalutils.make_date(value).timetuple() |
|
977 |
except ValueError: |
|
977 |
if value is None: |
|
978 | 978 |
return None |
979 |
date_value = evalutils.make_date(value).timetuple() # could raise ValueError |
|
979 | 980 |
return date_value |
980 | 981 | |
981 | 982 |
def convert_value_from_str(self, value): |
wcs/qommon/saml2.py | ||
---|---|---|
493 | 493 |
continue |
494 | 494 |
field_value = d[key] |
495 | 495 |
field = dict_fields.get(field_id) |
496 |
if field and field_value and field.convert_value_from_anything: |
|
497 |
field_value = field.convert_value_from_anything(field_value) |
|
496 |
if field and field.convert_value_from_anything: |
|
497 |
try: |
|
498 |
field_value = field.convert_value_from_anything(field_value) |
|
499 |
except ValueError: |
|
500 |
publisher.notify_of_exception(sys.exc_info(), context='[SAML]') |
|
501 |
continue |
|
498 | 502 |
if user.form_data.get(field_id) != field_value: |
499 | 503 |
user.form_data[field_id] = field_value |
500 | 504 |
logger.info('setting field %s of user %s to value %r', field_id, user.id, field_value) |
wcs/wf/backoffice_fields.py | ||
---|---|---|
97 | 97 |
try: |
98 | 98 |
new_value = self.compute(field['value'], raises=True) |
99 | 99 |
except: |
100 |
get_publisher().notify_of_exception(sys.exc_info()) |
|
100 |
get_publisher().notify_of_exception(sys.exc_info(), context='[BO_FIELDS]')
|
|
101 | 101 |
continue |
102 | 102 | |
103 |
if new_value and formdef_field.convert_value_from_anything:
|
|
103 |
if formdef_field.convert_value_from_anything: |
|
104 | 104 |
try: |
105 | 105 |
new_value = formdef_field.convert_value_from_anything(new_value) |
106 | 106 |
except ValueError: |
107 |
get_publisher().notify_of_exception(sys.exc_info()) |
|
107 |
get_publisher().notify_of_exception(sys.exc_info(), context='[BO_FIELDS]')
|
|
108 | 108 |
continue |
109 | 109 | |
110 | 110 |
formdata.data['%s' % field['field_id']] = new_value |
wcs/wf/profile.py | ||
---|---|---|
156 | 156 |
for field in user_formdef.fields: |
157 | 157 |
if field.varname in new_data: |
158 | 158 |
field_value = new_data.get(field.varname) |
159 |
if field and field_value and field.convert_value_from_anything: |
|
160 |
field_value = field.convert_value_from_anything(field_value) |
|
159 |
if field and field.convert_value_from_anything: |
|
160 |
try: |
|
161 |
field_value = field.convert_value_from_anything(field_value) |
|
162 |
except ValueError: |
|
163 |
get_publisher().notify_of_exception(sys.exc_info(), context='[PROFILE]') |
|
164 |
# invalid attribute, do not update it |
|
165 |
del new_data[field.varname] |
|
166 |
continue |
|
161 | 167 |
new_user_data[field.id] = field_value |
162 | 168 |
# also change initial value to the converted one, as the |
163 | 169 |
# initial dictionary is used when sending the profile changes |
164 |
- |