0009-sql-update-user_name-column-of-wcs_all_forms-on-name.patch
tests/test_sql.py | ||
---|---|---|
1367 | 1367 |
assert bool(cur.fetchone()[0] == 2) |
1368 | 1368 | |
1369 | 1369 | |
1370 |
def test_all_forms_user_name_change(pub, formdef): |
|
1371 |
sql.SqlUser.wipe() |
|
1372 | ||
1373 |
user = sql.SqlUser() |
|
1374 |
user.name = 'Foo' |
|
1375 |
user.store() |
|
1376 | ||
1377 |
formdata = formdef.data_class()() |
|
1378 |
formdata.user_id = user.id |
|
1379 |
formdata.store() |
|
1380 | ||
1381 |
objects = sql.AnyFormData.select() |
|
1382 |
assert len(objects) == 1 |
|
1383 |
assert objects[0].user_id == str(user.id) |
|
1384 | ||
1385 |
conn, cur = sql.get_connection_and_cursor() |
|
1386 |
cur.execute('SELECT user_name FROM wcs_all_forms') |
|
1387 |
row = cur.fetchone() |
|
1388 |
assert row[0] == 'Foo' |
|
1389 | ||
1390 |
user.name = 'Foo Bar' |
|
1391 |
user.store() |
|
1392 |
cur.execute('SELECT user_name FROM wcs_all_forms') |
|
1393 |
row = cur.fetchone() |
|
1394 |
assert row[0] == 'Foo Bar' |
|
1395 |
cur.close() |
|
1396 |
conn.commit() |
|
1397 | ||
1398 | ||
1370 | 1399 |
def test_views_fts(pub): |
1371 | 1400 |
drop_formdef_tables() |
1372 | 1401 |
_, cur = sql.get_connection_and_cursor() |
wcs/ctl/management/commands/convert_to_sql.py | ||
---|---|---|
87 | 87 |
atomic_write(options_file, force_bytes(stringio.getvalue())) |
88 | 88 | |
89 | 89 |
def store_users(self): |
90 |
self.convert_objects('user', User, sql.SqlUser) |
|
90 |
self.convert_objects('user', User, sql.SqlUser, skip_global_forms_table_update=True)
|
|
91 | 91 |
sql.SqlUser.fix_sequences() |
92 | 92 | |
93 | 93 |
def store_roles(self): |
94 | 94 |
self.convert_objects('role', Role, sql.Role) |
95 | 95 | |
96 |
def convert_objects(self, object_name, object_class, object_sql_class): |
|
96 |
def convert_objects(self, object_name, object_class, object_sql_class, **kwargs):
|
|
97 | 97 |
errors = [] |
98 | 98 |
print('converting %ss' % object_name) |
99 | 99 |
getattr(sql, 'do_%s_table' % object_name)() |
... | ... | |
102 | 102 |
obj = object_class.get(obj_id) |
103 | 103 |
obj.__class__ = object_sql_class |
104 | 104 |
try: |
105 |
obj.store() |
|
105 |
obj.store(**kwargs)
|
|
106 | 106 |
except AssertionError: |
107 | 107 |
errors.append((obj, traceback.format_exc())) |
108 | 108 |
self.update_progress(100 * i / count) |
wcs/sql.py | ||
---|---|---|
862 | 862 |
LANGUAGE plpgsql |
863 | 863 |
AS $$ |
864 | 864 |
BEGIN |
865 |
-- TODO : sync back from users change ! |
|
866 | 865 |
IF TG_OP = 'DELETE' THEN |
867 | 866 |
DELETE FROM wcs_all_forms WHERE formdef_id = {formdef_id} AND id = OLD.id; |
868 | 867 |
RETURN OLD; |
... | ... | |
2876 | 2875 | |
2877 | 2876 |
@guard_postgres |
2878 | 2877 |
@invalidate_substitution_cache |
2879 |
def store(self): |
|
2878 |
def store(self, skip_global_forms_table_update=False):
|
|
2880 | 2879 |
sql_dict = { |
2881 | 2880 |
'name': self.name, |
2882 | 2881 |
'ascii_name': self.ascii_name, |
... | ... | |
2955 | 2954 |
) |
2956 | 2955 |
cur.execute(sql_statement, {'id': self.id, 'fts': ' '.join(fts_strings)}) |
2957 | 2956 | |
2957 |
if not skip_global_forms_table_update: |
|
2958 |
# update wcs_all_forms rows with potential name change |
|
2959 |
sql_statement = 'UPDATE wcs_all_forms SET user_name = %(user_name)s WHERE user_id = %(user_id)s' |
|
2960 |
cur.execute(sql_statement, {'user_id': str(self.id), 'user_name': self.name}) |
|
2961 | ||
2958 | 2962 |
conn.commit() |
2959 | 2963 |
cur.close() |
2960 | 2964 | |
2961 |
- |