Projet

Général

Profil

0001-sql-use-batch-iteration-on-ids-instead-of-named-curs.patch

Benjamin Dauvergne, 20 octobre 2021 16:01

Télécharger (8,19 ko)

Voir les différences:

Subject: [PATCH] sql: use batch iteration on ids instead of named cursors
 (#58013)

Named cursors imposed the use of isolated connections and were misused
resulting in reading using one SQL query by row (because of the use of
.fetchone() with cursors). This commit revert to the behaviour of one
connection per request and reading full SQL statement results at a time
without using cursors.
 wcs/qommon/storage.py |   1 +
 wcs/sql.py            | 131 ++++++++++++++++++++++++++++--------------
 wcs/wf/jump.py        |   2 +-
 3 files changed, 91 insertions(+), 43 deletions(-)
wcs/qommon/storage.py
431 431
        limit=None,
432 432
        offset=None,
433 433
        iterator=False,
434
        itersize=None,
434 435
        **kwargs,
435 436
    ):
436 437
        # iterator: only for compatibility with sql select()
wcs/sql.py
364 364
    return force_text(value, get_publisher().site_charset)
365 365

  
366 366

  
367
def get_connection(new=False, isolate=False):
368
    if new and not isolate:
367
def get_connection(new=False):
368
    if new:
369 369
        cleanup_connection()
370 370

  
371
    if isolate or not getattr(get_publisher(), 'pgconn', None):
371
    if not getattr(get_publisher(), 'pgconn', None):
372 372
        postgresql_cfg = {}
373 373
        for param in ('database', 'user', 'password', 'host', 'port'):
374 374
            value = get_cfg('postgresql', {}).get(param)
......
377 377
        try:
378 378
            pgconn = psycopg2.connect(**postgresql_cfg)
379 379
        except psycopg2.Error:
380
            if new or isolate:
380
            if new:
381 381
                raise
382 382
            pgconn = None
383
        if isolate:
384
            return pgconn
385 383

  
386 384
        get_publisher().pgconn = pgconn
387 385

  
......
1430 1428
    _table_name = None
1431 1429
    _numerical_id = True
1432 1430
    _table_select_skipped_fields = []
1433
    _iterate_on_server = True
1431
    _has_id = True
1434 1432

  
1435 1433
    @classmethod
1436 1434
    @guard_postgres
......
1650 1648

  
1651 1649
    @classmethod
1652 1650
    @guard_postgres
1653
    def select_iterator(cls, clause=None, order_by=None, ignore_errors=False, limit=None, offset=None):
1651
    def select_iterator(
1652
        cls,
1653
        clause=None,
1654
        order_by=None,
1655
        ignore_errors=False,
1656
        limit=None,
1657
        offset=None,
1658
        itersize=None,
1659
    ):
1654 1660
        table_static_fields = [
1655 1661
            x[0] if x[0] not in cls._table_select_skipped_fields else 'NULL AS %s' % x[0]
1656 1662
            for x in cls._table_static_fields
1657 1663
        ]
1658
        sql_statement = '''SELECT %s
1659
                             FROM %s''' % (
1660
            ', '.join(table_static_fields + cls.get_data_fields()),
1661
            cls._table_name,
1662
        )
1663
        where_clauses, parameters, func_clause = parse_clause(clause)
1664
        if where_clauses:
1665
            sql_statement += ' WHERE ' + ' AND '.join(where_clauses)
1666 1664

  
1667
        sql_statement += cls.get_order_by_clause(order_by)
1668

  
1669
        if not func_clause:
1670
            if limit:
1671
                sql_statement += ' LIMIT %(limit)s'
1672
                parameters['limit'] = limit
1673
            if offset:
1674
                sql_statement += ' OFFSET %(offset)s'
1675
                parameters['offset'] = offset
1676

  
1677
        if cls._iterate_on_server:
1678
            conn = get_connection(isolate=True)
1679
            cur = conn.cursor(name='select_iterator_%s' % uuid.uuid4())
1680
        else:
1681
            conn, cur = get_connection_and_cursor()
1682
        cur.execute(sql_statement, parameters)
1683
        try:
1665
        def retrieve():
1684 1666
            for object in cls.get_objects(cur, iterator=True):
1685 1667
                if object is None:
1686 1668
                    continue
1687 1669
                if func_clause and not func_clause(object):
1688 1670
                    continue
1671
                print(object.id)
1689 1672
                yield object
1690
        finally:
1691
            cur.close()
1692
            conn.commit()
1693
            if cls._iterate_on_server:
1694
                # close isolated connection
1695
                conn.close()
1673

  
1674
        if itersize and cls._has_id:
1675
            sql_statement = '''SELECT id FROM %s''' % cls._table_name
1676
            where_clauses, parameters, func_clause = parse_clause(clause)
1677
            if where_clauses:
1678
                sql_statement += ' WHERE ' + ' AND '.join(where_clauses)
1679

  
1680
            sql_statement += cls.get_order_by_clause(order_by)
1681

  
1682
            if not func_clause:
1683
                if limit:
1684
                    sql_statement += ' LIMIT %(limit)s'
1685
                    parameters['limit'] = limit
1686
                if offset:
1687
                    sql_statement += ' OFFSET %(offset)s'
1688
                    parameters['offset'] = offset
1689

  
1690
            sql_id_statement = '''SELECT %s
1691
                                 FROM %s WHERE id IN %%s''' % (
1692
                ', '.join(table_static_fields + cls.get_data_fields()),
1693
                cls._table_name,
1694
            )
1695
            sql_id_statement += cls.get_order_by_clause(order_by)
1696

  
1697
            conn, cur = get_connection_and_cursor()
1698
            with cur:
1699
                cur.execute(sql_statement, parameters)
1700
                ids = [row[0] for row in cur]
1701
                while ids:
1702
                    cur.execute(sql_id_statement, [tuple(ids[:itersize])])
1703
                    yield from retrieve()
1704
                    ids = ids[itersize:]
1705
        else:  # for AnyFormData
1706
            sql_statement = '''SELECT %s FROM %s''' % (
1707
                ', '.join(table_static_fields + cls.get_data_fields()),
1708
                cls._table_name,
1709
            )
1710
            where_clauses, parameters, func_clause = parse_clause(clause)
1711
            if where_clauses:
1712
                sql_statement += ' WHERE ' + ' AND '.join(where_clauses)
1713

  
1714
            sql_statement += cls.get_order_by_clause(order_by)
1715

  
1716
            if not func_clause:
1717
                if limit:
1718
                    sql_statement += ' LIMIT %(limit)s'
1719
                    parameters['limit'] = limit
1720
                if offset:
1721
                    sql_statement += ' OFFSET %(offset)s'
1722
                    parameters['offset'] = offset
1723

  
1724
            conn, cur = get_connection_and_cursor()
1725
            with cur:
1726
                cur.execute(sql_statement, parameters)
1727
                yield from retrieve()
1696 1728

  
1697 1729
    @classmethod
1698 1730
    @guard_postgres
1699
    def select(cls, clause=None, order_by=None, ignore_errors=False, limit=None, offset=None, iterator=False):
1731
    def select(
1732
        cls,
1733
        clause=None,
1734
        order_by=None,
1735
        ignore_errors=False,
1736
        limit=None,
1737
        offset=None,
1738
        iterator=False,
1739
        itersize=None,
1740
    ):
1741
        if iterator and not itersize:
1742
            itersize = 200
1700 1743
        objects = cls.select_iterator(
1701
            clause=clause, order_by=order_by, ignore_errors=ignore_errors, limit=limit, offset=offset
1744
            clause=clause,
1745
            order_by=order_by,
1746
            ignore_errors=ignore_errors,
1747
            limit=limit,
1748
            offset=offset,
1702 1749
        )
1703 1750
        func_clause = parse_clause(clause)[2]
1704 1751
        if func_clause and (limit or offset):
......
3145 3192
class AnyFormData(SqlMixin):
3146 3193
    _table_name = 'wcs_all_forms'
3147 3194
    _formdef_cache = {}
3148
    _iterate_on_server = False
3195
    _has_id = False
3149 3196

  
3150 3197
    @classproperty
3151 3198
    def _table_static_fields(self):
wcs/wf/jump.py
345 345
                        (datetime.datetime.now() - datetime.timedelta(seconds=delay)).timetuple(),
346 346
                    ),
347 347
                ]
348
                formdatas = formdata_class.select_iterator(criterias, ignore_errors=True)
348
                formdatas = formdata_class.select_iterator(criterias, ignore_errors=True, itersize=200)
349 349
            else:
350 350
                formdatas = formdata_class.get_with_indexed_value('status', status_id, ignore_errors=True)
351 351

  
352
-