Projet

Général

Profil

0001-sql-use-batch-iteration-on-ids-instead-of-named-curs.patch

Benjamin Dauvergne, 22 octobre 2021 17:43

Télécharger (7,44 ko)

Voir les différences:

Subject: [PATCH] sql: use batch iteration on ids instead of named cursors
 (#58013)

Named cursors imposed the use of isolated connections and were misused
resulting in reading using one SQL query by row (because of the use of
.fetchone() with cursors). This commit revert to the behaviour of one
connection per request and reading full SQL statement results at a time
without using cursors.
 wcs/qommon/storage.py |   1 +
 wcs/sql.py            | 103 ++++++++++++++++++++++++++++--------------
 wcs/wf/jump.py        |   2 +-
 3 files changed, 70 insertions(+), 36 deletions(-)
wcs/qommon/storage.py
431 431
        limit=None,
432 432
        offset=None,
433 433
        iterator=False,
434
        itersize=None,
434 435
        **kwargs,
435 436
    ):
436 437
        # iterator: only for compatibility with sql select()
wcs/sql.py
20 20
import json
21 21
import re
22 22
import time
23
import uuid
24 23

  
25 24
import psycopg2
26 25
import psycopg2.extensions
......
365 364
    return force_text(value, get_publisher().site_charset)
366 365

  
367 366

  
368
def get_connection(new=False, isolate=False):
369
    if new and not isolate:
367
def get_connection(new=False):
368
    if new:
370 369
        cleanup_connection()
371 370

  
372
    if isolate or not getattr(get_publisher(), 'pgconn', None):
371
    if not getattr(get_publisher(), 'pgconn', None):
373 372
        postgresql_cfg = {}
374 373
        for param in ('database', 'user', 'password', 'host', 'port'):
375 374
            value = get_cfg('postgresql', {}).get(param)
......
378 377
        try:
379 378
            pgconn = psycopg2.connect(**postgresql_cfg)
380 379
        except psycopg2.Error:
381
            if new or isolate:
380
            if new:
382 381
                raise
383 382
            pgconn = None
384
        if isolate:
385
            return pgconn
386 383

  
387 384
        get_publisher().pgconn = pgconn
388 385

  
......
1431 1428
    _table_name = None
1432 1429
    _numerical_id = True
1433 1430
    _table_select_skipped_fields = []
1434
    _iterate_on_server = True
1431
    _has_id = True
1435 1432

  
1436 1433
    @classmethod
1437 1434
    @guard_postgres
......
1651 1648

  
1652 1649
    @classmethod
1653 1650
    @guard_postgres
1654
    def select_iterator(cls, clause=None, order_by=None, ignore_errors=False, limit=None, offset=None):
1651
    def select_iterator(
1652
        cls,
1653
        clause=None,
1654
        order_by=None,
1655
        ignore_errors=False,
1656
        limit=None,
1657
        offset=None,
1658
        itersize=None,
1659
    ):
1655 1660
        table_static_fields = [
1656 1661
            x[0] if x[0] not in cls._table_select_skipped_fields else 'NULL AS %s' % x[0]
1657 1662
            for x in cls._table_static_fields
1658 1663
        ]
1659
        sql_statement = '''SELECT %s
1660
                             FROM %s''' % (
1661
            ', '.join(table_static_fields + cls.get_data_fields()),
1662
            cls._table_name,
1663
        )
1664

  
1665
        def retrieve():
1666
            for object in cls.get_objects(cur, iterator=True):
1667
                if object is None:
1668
                    continue
1669
                if func_clause and not func_clause(object):
1670
                    continue
1671
                yield object
1672

  
1673
        if itersize and cls._has_id:
1674
            # this case concerns almost all data tables: formdata, card, users, roles
1675
            sql_statement = '''SELECT id FROM %s''' % cls._table_name
1676
        else:
1677
            # this case concerns aggregated views like wcs_all_forms (class
1678
            # AnyFormData) which does not have a surrogate key id column
1679
            sql_statement = '''SELECT %s FROM %s''' % (
1680
                ', '.join(table_static_fields + cls.get_data_fields()),
1681
                cls._table_name,
1682
            )
1664 1683
        where_clauses, parameters, func_clause = parse_clause(clause)
1665 1684
        if where_clauses:
1666 1685
            sql_statement += ' WHERE ' + ' AND '.join(where_clauses)
......
1675 1694
                sql_statement += ' OFFSET %(offset)s'
1676 1695
                parameters['offset'] = offset
1677 1696

  
1678
        if cls._iterate_on_server:
1679
            conn = get_connection(isolate=True)
1680
            cur = conn.cursor(name='select_iterator_%s' % uuid.uuid4())
1681
        else:
1682
            conn, cur = get_connection_and_cursor()
1683
        cur.execute(sql_statement, parameters)
1684
        try:
1685
            for object in cls.get_objects(cur, iterator=True):
1686
                if object is None:
1687
                    continue
1688
                if func_clause and not func_clause(object):
1689
                    continue
1690
                yield object
1691
        finally:
1692
            cur.close()
1697
        conn, cur = get_connection_and_cursor()
1698
        with cur:
1699
            cur.execute(sql_statement, parameters)
1693 1700
            conn.commit()
1694
            if cls._iterate_on_server:
1695
                # close isolated connection
1696
                conn.close()
1701
            if itersize and cls._has_id:
1702
                sql_id_statement = '''SELECT %s FROM %s WHERE id IN %%s''' % (
1703
                    ', '.join(table_static_fields + cls.get_data_fields()),
1704
                    cls._table_name,
1705
                )
1706
                sql_id_statement += cls.get_order_by_clause(order_by)
1707
                ids = [row[0] for row in cur]
1708
                while ids:
1709
                    cur.execute(sql_id_statement, [tuple(ids[:itersize])])
1710
                    conn.commit()
1711
                    yield from retrieve()
1712
                    ids = ids[itersize:]
1713
            else:
1714
                yield from retrieve()
1697 1715

  
1698 1716
    @classmethod
1699 1717
    @guard_postgres
1700
    def select(cls, clause=None, order_by=None, ignore_errors=False, limit=None, offset=None, iterator=False):
1718
    def select(
1719
        cls,
1720
        clause=None,
1721
        order_by=None,
1722
        ignore_errors=False,
1723
        limit=None,
1724
        offset=None,
1725
        iterator=False,
1726
        itersize=None,
1727
    ):
1728
        if iterator and not itersize:
1729
            itersize = 200
1701 1730
        objects = cls.select_iterator(
1702
            clause=clause, order_by=order_by, ignore_errors=ignore_errors, limit=limit, offset=offset
1731
            clause=clause,
1732
            order_by=order_by,
1733
            ignore_errors=ignore_errors,
1734
            limit=limit,
1735
            offset=offset,
1703 1736
        )
1704 1737
        func_clause = parse_clause(clause)[2]
1705 1738
        if func_clause and (limit or offset):
......
3147 3180
class AnyFormData(SqlMixin):
3148 3181
    _table_name = 'wcs_all_forms'
3149 3182
    _formdef_cache = {}
3150
    _iterate_on_server = False
3183
    _has_id = False
3151 3184

  
3152 3185
    @classproperty
3153 3186
    def _table_static_fields(self):
wcs/wf/jump.py
350 350
                            (datetime.datetime.now() - datetime.timedelta(seconds=delay)).timetuple(),
351 351
                        ),
352 352
                    ]
353
                    formdatas = formdata_class.select_iterator(criterias, ignore_errors=True)
353
                    formdatas = formdata_class.select_iterator(criterias, ignore_errors=True, itersize=200)
354 354
                else:
355 355
                    formdatas = formdata_class.get_with_indexed_value('status', status_id, ignore_errors=True)
356 356

  
357
-