0001-sql-remove-formdef-references-from-global-views-afte.patch
tests/test_sql.py | ||
---|---|---|
1274 | 1274 | |
1275 | 1275 |
cur.execute('''SELECT user_name FROM wcs_all_forms WHERE id = %s ''', (formdata2.id,)) |
1276 | 1276 |
assert bool(cur.fetchone()[0] == user.name) |
1277 | ||
1278 |
@postgresql |
|
1279 |
def test_select_formdata_after_formdef_removal(): |
|
1280 |
drop_formdef_tables() |
|
1281 |
conn, cur = sql.get_connection_and_cursor() |
|
1282 | ||
1283 |
now = datetime.datetime.now() |
|
1284 | ||
1285 |
for i in range(2): |
|
1286 |
formdef = FormDef() |
|
1287 |
formdef.name = 'test formdef removal' |
|
1288 |
formdef.fields = [] |
|
1289 |
formdef.store() |
|
1290 | ||
1291 |
data_class = formdef.data_class(mode='sql') |
|
1292 |
formdata = data_class() |
|
1293 |
formdata.just_created() |
|
1294 |
formdata.store() |
|
1295 | ||
1296 |
# test generic select |
|
1297 |
objects = sql.AnyFormData.select() |
|
1298 |
assert len(objects) == 2 |
|
1299 | ||
1300 |
formdef.remove_self() |
|
1301 | ||
1302 |
objects = sql.AnyFormData.select() |
|
1303 |
assert len(objects) == 1 |
wcs/formdef.py | ||
---|---|---|
197 | 197 |
if changed: |
198 | 198 |
self.store() |
199 | 199 | |
200 |
@classmethod |
|
201 |
def remove_object(cls, id): |
|
202 |
super(FormDef, cls).remove_object(id) |
|
203 |
if get_publisher().is_using_postgresql(): |
|
204 |
# recreate global views so they don't reference formdata from |
|
205 |
# deleted formefs |
|
206 |
import sql |
|
207 |
conn, cur = sql.get_connection_and_cursor() |
|
208 |
sql.do_global_views(conn, cur) |
|
209 |
conn.commit() |
|
210 |
cur.close() |
|
211 | ||
200 | 212 |
def data_class(self, mode=None): |
201 | 213 |
if hasattr(sys.modules['formdef'], self.url_name.title()): |
202 | 214 |
data_class = getattr(sys.modules['formdef'], self.url_name.title()) |
wcs/sql.py | ||
---|---|---|
265 | 265 |
formdef.store() |
266 | 266 |
return formdef.table_name |
267 | 267 | |
268 |
def get_formdef_view_name(formdef): |
|
269 |
return 'wcs_view_%s_%s' % (formdef.id, |
|
270 |
get_name_as_sql_identifier(formdef.url_name)[:40]) |
|
271 | ||
268 | 272 |
def guard_postgres(func): |
269 | 273 |
def f(*args, **kwargs): |
270 | 274 |
try: |
... | ... | |
565 | 569 |
def do_views(formdef, conn, cur, rebuild_global_views=True): |
566 | 570 |
# create new view |
567 | 571 |
table_name = get_formdef_table_name(formdef) |
568 |
view_name = 'wcs_view_%s_%s' % (formdef.id, |
|
569 |
get_name_as_sql_identifier(formdef.url_name)[:40]) |
|
570 | ||
572 |
view_name = get_formdef_view_name(formdef) |
|
571 | 573 |
view_fields = get_view_fields(formdef) |
572 | 574 | |
573 | 575 |
column_names = {} |
... | ... | |
648 | 650 | |
649 | 651 |
def do_global_views(conn, cur): |
650 | 652 |
# recreate global views |
651 |
view_names = [] |
|
653 |
from wcs.formdef import FormDef |
|
654 |
view_names = [get_formdef_view_name(x) for x in FormDef.select()] |
|
655 | ||
652 | 656 |
cur.execute('''SELECT table_name FROM information_schema.views |
653 | 657 |
WHERE table_name LIKE %s''', ('wcs\_view\_%',)) |
658 |
existing_views = set() |
|
654 | 659 |
while True: |
655 | 660 |
row = cur.fetchone() |
656 | 661 |
if row is None: |
657 | 662 |
break |
658 |
view_names.append(row[0])
|
|
663 |
existing_views.add(row[0])
|
|
659 | 664 | |
665 |
view_names = existing_views.intersection(view_names) |
|
660 | 666 |
if not view_names: |
661 | 667 |
return |
662 | 668 | |
663 |
from wcs.formdef import FormDef |
|
669 |
cur.execute('''DROP VIEW IF EXISTS wcs_all_forms CASCADE''') |
|
670 | ||
664 | 671 |
fake_formdef = FormDef() |
665 | 672 |
common_fields = get_view_fields(fake_formdef) |
666 | 673 |
common_fields.append(('concerned_roles_array', 'concerned_roles_array')) |
667 |
- |