Projet

Général

Profil

0001-sql-use-batch-iteration-on-ids-instead-of-named-curs.patch

Benjamin Dauvergne, 20 octobre 2021 16:04

Télécharger (8,16 ko)

Voir les différences:

Subject: [PATCH] sql: use batch iteration on ids instead of named cursors
 (#58013)

Named cursors imposed the use of isolated connections and were misused
resulting in reading using one SQL query by row (because of the use of
.fetchone() with cursors). This commit revert to the behaviour of one
connection per request and reading full SQL statement results at a time
without using cursors.
 wcs/qommon/storage.py |   1 +
 wcs/sql.py            | 130 ++++++++++++++++++++++++++++--------------
 wcs/wf/jump.py        |   2 +-
 3 files changed, 90 insertions(+), 43 deletions(-)
wcs/qommon/storage.py
431 431
        limit=None,
432 432
        offset=None,
433 433
        iterator=False,
434
        itersize=None,
434 435
        **kwargs,
435 436
    ):
436 437
        # iterator: only for compatibility with sql select()
wcs/sql.py
364 364
    return force_text(value, get_publisher().site_charset)
365 365

  
366 366

  
367
def get_connection(new=False, isolate=False):
368
    if new and not isolate:
367
def get_connection(new=False):
368
    if new:
369 369
        cleanup_connection()
370 370

  
371
    if isolate or not getattr(get_publisher(), 'pgconn', None):
371
    if not getattr(get_publisher(), 'pgconn', None):
372 372
        postgresql_cfg = {}
373 373
        for param in ('database', 'user', 'password', 'host', 'port'):
374 374
            value = get_cfg('postgresql', {}).get(param)
......
377 377
        try:
378 378
            pgconn = psycopg2.connect(**postgresql_cfg)
379 379
        except psycopg2.Error:
380
            if new or isolate:
380
            if new:
381 381
                raise
382 382
            pgconn = None
383
        if isolate:
384
            return pgconn
385 383

  
386 384
        get_publisher().pgconn = pgconn
387 385

  
......
1430 1428
    _table_name = None
1431 1429
    _numerical_id = True
1432 1430
    _table_select_skipped_fields = []
1433
    _iterate_on_server = True
1431
    _has_id = True
1434 1432

  
1435 1433
    @classmethod
1436 1434
    @guard_postgres
......
1650 1648

  
1651 1649
    @classmethod
1652 1650
    @guard_postgres
1653
    def select_iterator(cls, clause=None, order_by=None, ignore_errors=False, limit=None, offset=None):
1651
    def select_iterator(
1652
        cls,
1653
        clause=None,
1654
        order_by=None,
1655
        ignore_errors=False,
1656
        limit=None,
1657
        offset=None,
1658
        itersize=None,
1659
    ):
1654 1660
        table_static_fields = [
1655 1661
            x[0] if x[0] not in cls._table_select_skipped_fields else 'NULL AS %s' % x[0]
1656 1662
            for x in cls._table_static_fields
1657 1663
        ]
1658
        sql_statement = '''SELECT %s
1659
                             FROM %s''' % (
1660
            ', '.join(table_static_fields + cls.get_data_fields()),
1661
            cls._table_name,
1662
        )
1663
        where_clauses, parameters, func_clause = parse_clause(clause)
1664
        if where_clauses:
1665
            sql_statement += ' WHERE ' + ' AND '.join(where_clauses)
1666 1664

  
1667
        sql_statement += cls.get_order_by_clause(order_by)
1668

  
1669
        if not func_clause:
1670
            if limit:
1671
                sql_statement += ' LIMIT %(limit)s'
1672
                parameters['limit'] = limit
1673
            if offset:
1674
                sql_statement += ' OFFSET %(offset)s'
1675
                parameters['offset'] = offset
1676

  
1677
        if cls._iterate_on_server:
1678
            conn = get_connection(isolate=True)
1679
            cur = conn.cursor(name='select_iterator_%s' % uuid.uuid4())
1680
        else:
1681
            conn, cur = get_connection_and_cursor()
1682
        cur.execute(sql_statement, parameters)
1683
        try:
1665
        def retrieve():
1684 1666
            for object in cls.get_objects(cur, iterator=True):
1685 1667
                if object is None:
1686 1668
                    continue
1687 1669
                if func_clause and not func_clause(object):
1688 1670
                    continue
1689 1671
                yield object
1690
        finally:
1691
            cur.close()
1692
            conn.commit()
1693
            if cls._iterate_on_server:
1694
                # close isolated connection
1695
                conn.close()
1672

  
1673
        if itersize and cls._has_id:
1674
            sql_statement = '''SELECT id FROM %s''' % cls._table_name
1675
            where_clauses, parameters, func_clause = parse_clause(clause)
1676
            if where_clauses:
1677
                sql_statement += ' WHERE ' + ' AND '.join(where_clauses)
1678

  
1679
            sql_statement += cls.get_order_by_clause(order_by)
1680

  
1681
            if not func_clause:
1682
                if limit:
1683
                    sql_statement += ' LIMIT %(limit)s'
1684
                    parameters['limit'] = limit
1685
                if offset:
1686
                    sql_statement += ' OFFSET %(offset)s'
1687
                    parameters['offset'] = offset
1688

  
1689
            sql_id_statement = '''SELECT %s
1690
                                 FROM %s WHERE id IN %%s''' % (
1691
                ', '.join(table_static_fields + cls.get_data_fields()),
1692
                cls._table_name,
1693
            )
1694
            sql_id_statement += cls.get_order_by_clause(order_by)
1695

  
1696
            conn, cur = get_connection_and_cursor()
1697
            with cur:
1698
                cur.execute(sql_statement, parameters)
1699
                ids = [row[0] for row in cur]
1700
                while ids:
1701
                    cur.execute(sql_id_statement, [tuple(ids[:itersize])])
1702
                    yield from retrieve()
1703
                    ids = ids[itersize:]
1704
        else:  # for AnyFormData
1705
            sql_statement = '''SELECT %s FROM %s''' % (
1706
                ', '.join(table_static_fields + cls.get_data_fields()),
1707
                cls._table_name,
1708
            )
1709
            where_clauses, parameters, func_clause = parse_clause(clause)
1710
            if where_clauses:
1711
                sql_statement += ' WHERE ' + ' AND '.join(where_clauses)
1712

  
1713
            sql_statement += cls.get_order_by_clause(order_by)
1714

  
1715
            if not func_clause:
1716
                if limit:
1717
                    sql_statement += ' LIMIT %(limit)s'
1718
                    parameters['limit'] = limit
1719
                if offset:
1720
                    sql_statement += ' OFFSET %(offset)s'
1721
                    parameters['offset'] = offset
1722

  
1723
            conn, cur = get_connection_and_cursor()
1724
            with cur:
1725
                cur.execute(sql_statement, parameters)
1726
                yield from retrieve()
1696 1727

  
1697 1728
    @classmethod
1698 1729
    @guard_postgres
1699
    def select(cls, clause=None, order_by=None, ignore_errors=False, limit=None, offset=None, iterator=False):
1730
    def select(
1731
        cls,
1732
        clause=None,
1733
        order_by=None,
1734
        ignore_errors=False,
1735
        limit=None,
1736
        offset=None,
1737
        iterator=False,
1738
        itersize=None,
1739
    ):
1740
        if iterator and not itersize:
1741
            itersize = 200
1700 1742
        objects = cls.select_iterator(
1701
            clause=clause, order_by=order_by, ignore_errors=ignore_errors, limit=limit, offset=offset
1743
            clause=clause,
1744
            order_by=order_by,
1745
            ignore_errors=ignore_errors,
1746
            limit=limit,
1747
            offset=offset,
1702 1748
        )
1703 1749
        func_clause = parse_clause(clause)[2]
1704 1750
        if func_clause and (limit or offset):
......
3145 3191
class AnyFormData(SqlMixin):
3146 3192
    _table_name = 'wcs_all_forms'
3147 3193
    _formdef_cache = {}
3148
    _iterate_on_server = False
3194
    _has_id = False
3149 3195

  
3150 3196
    @classproperty
3151 3197
    def _table_static_fields(self):
wcs/wf/jump.py
345 345
                        (datetime.datetime.now() - datetime.timedelta(seconds=delay)).timetuple(),
346 346
                    ),
347 347
                ]
348
                formdatas = formdata_class.select_iterator(criterias, ignore_errors=True)
348
                formdatas = formdata_class.select_iterator(criterias, ignore_errors=True, itersize=200)
349 349
            else:
350 350
                formdatas = formdata_class.get_with_indexed_value('status', status_id, ignore_errors=True)
351 351

  
352
-