Projet

Général

Profil

0001-sql-use-batch-iteration-on-ids-instead-of-named-curs.patch

Benjamin Dauvergne, 20 octobre 2021 18:16

Télécharger (8,38 ko)

Voir les différences:

Subject: [PATCH] sql: use batch iteration on ids instead of named cursors
 (#58013)

Named cursors imposed the use of isolated connections and were misused
resulting in reading using one SQL query by row (because of the use of
.fetchone() with cursors). This commit revert to the behaviour of one
connection per request and reading full SQL statement results at a time
without using cursors.
 wcs/qommon/storage.py |   1 +
 wcs/sql.py            | 134 ++++++++++++++++++++++++++++--------------
 wcs/wf/jump.py        |   2 +-
 3 files changed, 93 insertions(+), 44 deletions(-)
wcs/qommon/storage.py
431 431
        limit=None,
432 432
        offset=None,
433 433
        iterator=False,
434
        itersize=None,
434 435
        **kwargs,
435 436
    ):
436 437
        # iterator: only for compatibility with sql select()
wcs/sql.py
20 20
import json
21 21
import re
22 22
import time
23
import uuid
24 23

  
25 24
import psycopg2
26 25
import psycopg2.extensions
......
364 363
    return force_text(value, get_publisher().site_charset)
365 364

  
366 365

  
367
def get_connection(new=False, isolate=False):
368
    if new and not isolate:
366
def get_connection(new=False):
367
    if new:
369 368
        cleanup_connection()
370 369

  
371
    if isolate or not getattr(get_publisher(), 'pgconn', None):
370
    if not getattr(get_publisher(), 'pgconn', None):
372 371
        postgresql_cfg = {}
373 372
        for param in ('database', 'user', 'password', 'host', 'port'):
374 373
            value = get_cfg('postgresql', {}).get(param)
......
377 376
        try:
378 377
            pgconn = psycopg2.connect(**postgresql_cfg)
379 378
        except psycopg2.Error:
380
            if new or isolate:
379
            if new:
381 380
                raise
382 381
            pgconn = None
383
        if isolate:
384
            return pgconn
385 382

  
386 383
        get_publisher().pgconn = pgconn
387 384

  
......
1430 1427
    _table_name = None
1431 1428
    _numerical_id = True
1432 1429
    _table_select_skipped_fields = []
1433
    _iterate_on_server = True
1430
    _has_id = True
1434 1431

  
1435 1432
    @classmethod
1436 1433
    @guard_postgres
......
1650 1647

  
1651 1648
    @classmethod
1652 1649
    @guard_postgres
1653
    def select_iterator(cls, clause=None, order_by=None, ignore_errors=False, limit=None, offset=None):
1650
    def select_iterator(
1651
        cls,
1652
        clause=None,
1653
        order_by=None,
1654
        ignore_errors=False,
1655
        limit=None,
1656
        offset=None,
1657
        itersize=None,
1658
    ):
1654 1659
        table_static_fields = [
1655 1660
            x[0] if x[0] not in cls._table_select_skipped_fields else 'NULL AS %s' % x[0]
1656 1661
            for x in cls._table_static_fields
1657 1662
        ]
1658
        sql_statement = '''SELECT %s
1659
                             FROM %s''' % (
1660
            ', '.join(table_static_fields + cls.get_data_fields()),
1661
            cls._table_name,
1662
        )
1663
        where_clauses, parameters, func_clause = parse_clause(clause)
1664
        if where_clauses:
1665
            sql_statement += ' WHERE ' + ' AND '.join(where_clauses)
1666 1663

  
1667
        sql_statement += cls.get_order_by_clause(order_by)
1668

  
1669
        if not func_clause:
1670
            if limit:
1671
                sql_statement += ' LIMIT %(limit)s'
1672
                parameters['limit'] = limit
1673
            if offset:
1674
                sql_statement += ' OFFSET %(offset)s'
1675
                parameters['offset'] = offset
1676

  
1677
        if cls._iterate_on_server:
1678
            conn = get_connection(isolate=True)
1679
            cur = conn.cursor(name='select_iterator_%s' % uuid.uuid4())
1680
        else:
1681
            conn, cur = get_connection_and_cursor()
1682
        cur.execute(sql_statement, parameters)
1683
        try:
1664
        def retrieve():
1684 1665
            for object in cls.get_objects(cur, iterator=True):
1685 1666
                if object is None:
1686 1667
                    continue
1687 1668
                if func_clause and not func_clause(object):
1688 1669
                    continue
1689 1670
                yield object
1690
        finally:
1691
            cur.close()
1692
            conn.commit()
1693
            if cls._iterate_on_server:
1694
                # close isolated connection
1695
                conn.close()
1671

  
1672
        if itersize and cls._has_id:
1673
            sql_statement = '''SELECT id FROM %s''' % cls._table_name
1674
            where_clauses, parameters, func_clause = parse_clause(clause)
1675
            if where_clauses:
1676
                sql_statement += ' WHERE ' + ' AND '.join(where_clauses)
1677

  
1678
            sql_statement += cls.get_order_by_clause(order_by)
1679

  
1680
            if not func_clause:
1681
                if limit:
1682
                    sql_statement += ' LIMIT %(limit)s'
1683
                    parameters['limit'] = limit
1684
                if offset:
1685
                    sql_statement += ' OFFSET %(offset)s'
1686
                    parameters['offset'] = offset
1687

  
1688
            sql_id_statement = '''SELECT %s
1689
                                 FROM %s WHERE id IN %%s''' % (
1690
                ', '.join(table_static_fields + cls.get_data_fields()),
1691
                cls._table_name,
1692
            )
1693
            sql_id_statement += cls.get_order_by_clause(order_by)
1694

  
1695
            conn, cur = get_connection_and_cursor()
1696
            with cur:
1697
                cur.execute(sql_statement, parameters)
1698
                conn.commit()
1699
                ids = [row[0] for row in cur]
1700
                while ids:
1701
                    cur.execute(sql_id_statement, [tuple(ids[:itersize])])
1702
                    conn.commit()
1703
                    yield from retrieve()
1704
                    ids = ids[itersize:]
1705
        else:  # for AnyFormData
1706
            sql_statement = '''SELECT %s FROM %s''' % (
1707
                ', '.join(table_static_fields + cls.get_data_fields()),
1708
                cls._table_name,
1709
            )
1710
            where_clauses, parameters, func_clause = parse_clause(clause)
1711
            if where_clauses:
1712
                sql_statement += ' WHERE ' + ' AND '.join(where_clauses)
1713

  
1714
            sql_statement += cls.get_order_by_clause(order_by)
1715

  
1716
            if not func_clause:
1717
                if limit:
1718
                    sql_statement += ' LIMIT %(limit)s'
1719
                    parameters['limit'] = limit
1720
                if offset:
1721
                    sql_statement += ' OFFSET %(offset)s'
1722
                    parameters['offset'] = offset
1723

  
1724
            conn, cur = get_connection_and_cursor()
1725
            with cur:
1726
                cur.execute(sql_statement, parameters)
1727
                conn.commit()
1728
                yield from retrieve()
1696 1729

  
1697 1730
    @classmethod
1698 1731
    @guard_postgres
1699
    def select(cls, clause=None, order_by=None, ignore_errors=False, limit=None, offset=None, iterator=False):
1732
    def select(
1733
        cls,
1734
        clause=None,
1735
        order_by=None,
1736
        ignore_errors=False,
1737
        limit=None,
1738
        offset=None,
1739
        iterator=False,
1740
        itersize=None,
1741
    ):
1742
        if iterator and not itersize:
1743
            itersize = 200
1700 1744
        objects = cls.select_iterator(
1701
            clause=clause, order_by=order_by, ignore_errors=ignore_errors, limit=limit, offset=offset
1745
            clause=clause,
1746
            order_by=order_by,
1747
            ignore_errors=ignore_errors,
1748
            limit=limit,
1749
            offset=offset,
1702 1750
        )
1703 1751
        func_clause = parse_clause(clause)[2]
1704 1752
        if func_clause and (limit or offset):
......
3145 3193
class AnyFormData(SqlMixin):
3146 3194
    _table_name = 'wcs_all_forms'
3147 3195
    _formdef_cache = {}
3148
    _iterate_on_server = False
3196
    _has_id = False
3149 3197

  
3150 3198
    @classproperty
3151 3199
    def _table_static_fields(self):
wcs/wf/jump.py
345 345
                        (datetime.datetime.now() - datetime.timedelta(seconds=delay)).timetuple(),
346 346
                    ),
347 347
                ]
348
                formdatas = formdata_class.select_iterator(criterias, ignore_errors=True)
348
                formdatas = formdata_class.select_iterator(criterias, ignore_errors=True, itersize=200)
349 349
            else:
350 350
                formdatas = formdata_class.get_with_indexed_value('status', status_id, ignore_errors=True)
351 351

  
352
-