20 |
20 |
import json
|
21 |
21 |
import re
|
22 |
22 |
import time
|
23 |
|
import uuid
|
24 |
23 |
|
25 |
24 |
import psycopg2
|
26 |
25 |
import psycopg2.extensions
|
... | ... | |
364 |
363 |
return force_text(value, get_publisher().site_charset)
|
365 |
364 |
|
366 |
365 |
|
367 |
|
def get_connection(new=False, isolate=False):
|
368 |
|
if new and not isolate:
|
|
366 |
def get_connection(new=False):
|
|
367 |
if new:
|
369 |
368 |
cleanup_connection()
|
370 |
369 |
|
371 |
|
if isolate or not getattr(get_publisher(), 'pgconn', None):
|
|
370 |
if not getattr(get_publisher(), 'pgconn', None):
|
372 |
371 |
postgresql_cfg = {}
|
373 |
372 |
for param in ('database', 'user', 'password', 'host', 'port'):
|
374 |
373 |
value = get_cfg('postgresql', {}).get(param)
|
... | ... | |
377 |
376 |
try:
|
378 |
377 |
pgconn = psycopg2.connect(**postgresql_cfg)
|
379 |
378 |
except psycopg2.Error:
|
380 |
|
if new or isolate:
|
|
379 |
if new:
|
381 |
380 |
raise
|
382 |
381 |
pgconn = None
|
383 |
|
if isolate:
|
384 |
|
return pgconn
|
385 |
382 |
|
386 |
383 |
get_publisher().pgconn = pgconn
|
387 |
384 |
|
... | ... | |
1430 |
1427 |
_table_name = None
|
1431 |
1428 |
_numerical_id = True
|
1432 |
1429 |
_table_select_skipped_fields = []
|
1433 |
|
_iterate_on_server = True
|
|
1430 |
_has_id = True
|
1434 |
1431 |
|
1435 |
1432 |
@classmethod
|
1436 |
1433 |
@guard_postgres
|
... | ... | |
1650 |
1647 |
|
1651 |
1648 |
@classmethod
|
1652 |
1649 |
@guard_postgres
|
1653 |
|
def select_iterator(cls, clause=None, order_by=None, ignore_errors=False, limit=None, offset=None):
|
|
1650 |
def select_iterator(
|
|
1651 |
cls,
|
|
1652 |
clause=None,
|
|
1653 |
order_by=None,
|
|
1654 |
ignore_errors=False,
|
|
1655 |
limit=None,
|
|
1656 |
offset=None,
|
|
1657 |
itersize=None,
|
|
1658 |
):
|
1654 |
1659 |
table_static_fields = [
|
1655 |
1660 |
x[0] if x[0] not in cls._table_select_skipped_fields else 'NULL AS %s' % x[0]
|
1656 |
1661 |
for x in cls._table_static_fields
|
1657 |
1662 |
]
|
1658 |
|
sql_statement = '''SELECT %s
|
1659 |
|
FROM %s''' % (
|
1660 |
|
', '.join(table_static_fields + cls.get_data_fields()),
|
1661 |
|
cls._table_name,
|
1662 |
|
)
|
1663 |
|
where_clauses, parameters, func_clause = parse_clause(clause)
|
1664 |
|
if where_clauses:
|
1665 |
|
sql_statement += ' WHERE ' + ' AND '.join(where_clauses)
|
1666 |
1663 |
|
1667 |
|
sql_statement += cls.get_order_by_clause(order_by)
|
1668 |
|
|
1669 |
|
if not func_clause:
|
1670 |
|
if limit:
|
1671 |
|
sql_statement += ' LIMIT %(limit)s'
|
1672 |
|
parameters['limit'] = limit
|
1673 |
|
if offset:
|
1674 |
|
sql_statement += ' OFFSET %(offset)s'
|
1675 |
|
parameters['offset'] = offset
|
1676 |
|
|
1677 |
|
if cls._iterate_on_server:
|
1678 |
|
conn = get_connection(isolate=True)
|
1679 |
|
cur = conn.cursor(name='select_iterator_%s' % uuid.uuid4())
|
1680 |
|
else:
|
1681 |
|
conn, cur = get_connection_and_cursor()
|
1682 |
|
cur.execute(sql_statement, parameters)
|
1683 |
|
try:
|
|
1664 |
def retrieve():
|
1684 |
1665 |
for object in cls.get_objects(cur, iterator=True):
|
1685 |
1666 |
if object is None:
|
1686 |
1667 |
continue
|
1687 |
1668 |
if func_clause and not func_clause(object):
|
1688 |
1669 |
continue
|
1689 |
1670 |
yield object
|
1690 |
|
finally:
|
1691 |
|
cur.close()
|
1692 |
|
conn.commit()
|
1693 |
|
if cls._iterate_on_server:
|
1694 |
|
# close isolated connection
|
1695 |
|
conn.close()
|
|
1671 |
|
|
1672 |
if itersize and cls._has_id:
|
|
1673 |
sql_statement = '''SELECT id FROM %s''' % cls._table_name
|
|
1674 |
where_clauses, parameters, func_clause = parse_clause(clause)
|
|
1675 |
if where_clauses:
|
|
1676 |
sql_statement += ' WHERE ' + ' AND '.join(where_clauses)
|
|
1677 |
|
|
1678 |
sql_statement += cls.get_order_by_clause(order_by)
|
|
1679 |
|
|
1680 |
if not func_clause:
|
|
1681 |
if limit:
|
|
1682 |
sql_statement += ' LIMIT %(limit)s'
|
|
1683 |
parameters['limit'] = limit
|
|
1684 |
if offset:
|
|
1685 |
sql_statement += ' OFFSET %(offset)s'
|
|
1686 |
parameters['offset'] = offset
|
|
1687 |
|
|
1688 |
sql_id_statement = '''SELECT %s
|
|
1689 |
FROM %s WHERE id IN %%s''' % (
|
|
1690 |
', '.join(table_static_fields + cls.get_data_fields()),
|
|
1691 |
cls._table_name,
|
|
1692 |
)
|
|
1693 |
sql_id_statement += cls.get_order_by_clause(order_by)
|
|
1694 |
|
|
1695 |
conn, cur = get_connection_and_cursor()
|
|
1696 |
with cur:
|
|
1697 |
cur.execute(sql_statement, parameters)
|
|
1698 |
conn.commit()
|
|
1699 |
ids = [row[0] for row in cur]
|
|
1700 |
while ids:
|
|
1701 |
cur.execute(sql_id_statement, [tuple(ids[:itersize])])
|
|
1702 |
conn.commit()
|
|
1703 |
yield from retrieve()
|
|
1704 |
ids = ids[itersize:]
|
|
1705 |
else: # for AnyFormData
|
|
1706 |
sql_statement = '''SELECT %s FROM %s''' % (
|
|
1707 |
', '.join(table_static_fields + cls.get_data_fields()),
|
|
1708 |
cls._table_name,
|
|
1709 |
)
|
|
1710 |
where_clauses, parameters, func_clause = parse_clause(clause)
|
|
1711 |
if where_clauses:
|
|
1712 |
sql_statement += ' WHERE ' + ' AND '.join(where_clauses)
|
|
1713 |
|
|
1714 |
sql_statement += cls.get_order_by_clause(order_by)
|
|
1715 |
|
|
1716 |
if not func_clause:
|
|
1717 |
if limit:
|
|
1718 |
sql_statement += ' LIMIT %(limit)s'
|
|
1719 |
parameters['limit'] = limit
|
|
1720 |
if offset:
|
|
1721 |
sql_statement += ' OFFSET %(offset)s'
|
|
1722 |
parameters['offset'] = offset
|
|
1723 |
|
|
1724 |
conn, cur = get_connection_and_cursor()
|
|
1725 |
with cur:
|
|
1726 |
cur.execute(sql_statement, parameters)
|
|
1727 |
conn.commit()
|
|
1728 |
yield from retrieve()
|
1696 |
1729 |
|
1697 |
1730 |
@classmethod
|
1698 |
1731 |
@guard_postgres
|
1699 |
|
def select(cls, clause=None, order_by=None, ignore_errors=False, limit=None, offset=None, iterator=False):
|
|
1732 |
def select(
|
|
1733 |
cls,
|
|
1734 |
clause=None,
|
|
1735 |
order_by=None,
|
|
1736 |
ignore_errors=False,
|
|
1737 |
limit=None,
|
|
1738 |
offset=None,
|
|
1739 |
iterator=False,
|
|
1740 |
itersize=None,
|
|
1741 |
):
|
|
1742 |
if iterator and not itersize:
|
|
1743 |
itersize = 200
|
1700 |
1744 |
objects = cls.select_iterator(
|
1701 |
|
clause=clause, order_by=order_by, ignore_errors=ignore_errors, limit=limit, offset=offset
|
|
1745 |
clause=clause,
|
|
1746 |
order_by=order_by,
|
|
1747 |
ignore_errors=ignore_errors,
|
|
1748 |
limit=limit,
|
|
1749 |
offset=offset,
|
1702 |
1750 |
)
|
1703 |
1751 |
func_clause = parse_clause(clause)[2]
|
1704 |
1752 |
if func_clause and (limit or offset):
|
... | ... | |
3145 |
3193 |
class AnyFormData(SqlMixin):
|
3146 |
3194 |
_table_name = 'wcs_all_forms'
|
3147 |
3195 |
_formdef_cache = {}
|
3148 |
|
_iterate_on_server = False
|
|
3196 |
_has_id = False
|
3149 |
3197 |
|
3150 |
3198 |
@classproperty
|
3151 |
3199 |
def _table_static_fields(self):
|