Projet

Général

Profil

0001-engine-close-connections-after-usage-27482.patch

Benjamin Dauvergne, 20 novembre 2018 15:33

Télécharger (12,2 ko)

Voir les différences:

Subject: [PATCH 1/3] engine: close connections after usage (#27482)

 bijoe/engine.py | 206 ++++++++++++++++++++++++------------------------
 1 file changed, 103 insertions(+), 103 deletions(-)
bijoe/engine.py
14 14
# You should have received a copy of the GNU Affero General Public License
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16

  
17
import contextlib
17 18
import logging
18 19
import itertools
19 20

  
......
59 60
    @property
60 61
    def members(self):
61 62
        assert self.type != 'date'
62
        cursor = self.engine.get_cursor()
63
        sql = self.members_query
64
        if not sql:
65
            if self.dimension.join:
66
                join = self.engine_cube.get_join(self.dimension.join[-1])
67
                sql = ('SELECT %s AS value, %s::text AS label FROM %s AS "%s" '
68
                       'GROUP BY %s, %s ORDER BY %s' % (
69
                           self.value, self.value_label or self.value, join.table, join.name,
70
                           self.value, self.value_label or self.value, self.order_by or self.value))
71
            else:
72
                sql = ('SELECT %s AS value, %s::text AS label FROM {fact_table} '
73
                       'GROUP BY %s, %s ORDER BY %s' % (
74
                           self.value, self.value_label or self.value, self.value,
75
                           self.value_label or self.value, self.self.order_by or self.value))
76
        sql = sql.format(fact_table=self.engine_cube.fact_table)
77
        self.engine.log.debug('SQL: %s', sql)
78
        cursor.execute(sql)
79
        for row in cursor.fetchall():
80
            yield Member(*row)
63
        with self.engine.get_cursor() as cursor:
64
            sql = self.members_query
65
            if not sql:
66
                if self.dimension.join:
67
                    join = self.engine_cube.get_join(self.dimension.join[-1])
68
                    sql = ('SELECT %s AS value, %s::text AS label FROM %s AS "%s" '
69
                           'GROUP BY %s, %s ORDER BY %s' % (
70
                               self.value, self.value_label or self.value, join.table, join.name,
71
                               self.value, self.value_label or self.value, self.order_by or self.value))
72
                else:
73
                    sql = ('SELECT %s AS value, %s::text AS label FROM {fact_table} '
74
                           'GROUP BY %s, %s ORDER BY %s' % (
75
                               self.value, self.value_label or self.value, self.value,
76
                               self.value_label or self.value, self.self.order_by or self.value))
77
            sql = sql.format(fact_table=self.engine_cube.fact_table)
78
            self.engine.log.debug('SQL: %s', sql)
79
            cursor.execute(sql)
80
            for row in cursor.fetchall():
81
                yield Member(*row)
81 82

  
82 83

  
83 84
class SchemaJSONDimension(schemas.Dimension):
......
169 170
        if not self.engine_cube.json_field:
170 171
            return []
171 172
        if not self.__cache:
172
            cursor = self.engine.get_cursor()
173
            sql = 'select distinct jsonb_object_keys(json_data) as a from formdata order by a'
174
            cursor.execute(sql)
175
            self.__cache = [row[0] for row in cursor.fetchall()]
173
            with self.engine.get_cursor() as cursor:
174
                sql = 'select distinct jsonb_object_keys(json_data) as a from formdata order by a'
175
                cursor.execute(sql)
176
                self.__cache = [row[0] for row in cursor.fetchall()]
176 177
        return self.__cache
177 178

  
178 179
    def __iter__(self):
......
249 250
        return getattr(self.cube, name)
250 251

  
251 252
    def count(self):
252
        cursor = self.engine.get_cursor()
253
        cursor.execute('SELECT count(%s) FROM %s' % (self.key, self.fact_table))
254
        return cursor.fetchone()[0]
253
        with self.engine.get_cursor() as cursor:
254
            cursor.execute('SELECT count(%s) FROM %s' % (self.key, self.fact_table))
255
            return cursor.fetchone()[0]
255 256

  
256 257
    def get_join(self, name):
257 258
        if name.startswith('json_'):
......
266 267
        return self.cube.get_join(name)
267 268

  
268 269
    def sql_query(self, filters, drilldown, measures, **kwargs):
269
        cursor = self.engine.get_cursor()
270

  
271
        projections = []
272
        joins = set()
273
        where = []
274
        group_by = []
275
        order_by = []
276
        join_conditions = []
277

  
278
        for dimension_name, values in filters:
279
            dimension = self.dimensions[dimension_name]
280
            # assert dimension.filter
281
            condition, values = dimension.build_filter(values)
282
            condition = cursor.mogrify(condition, values)
283
            if dimension.filter_needs_join:
270
        with self.engine.get_cursor() as cursor:
271
            projections = []
272
            joins = set()
273
            where = []
274
            group_by = []
275
            order_by = []
276
            join_conditions = []
277

  
278
            for dimension_name, values in filters:
279
                dimension = self.dimensions[dimension_name]
280
                # assert dimension.filter
281
                condition, values = dimension.build_filter(values)
282
                condition = cursor.mogrify(condition, values)
283
                if dimension.filter_needs_join:
284
                    joins.update(dimension.join)
285
                if dimension.filter_in_join:
286
                    join_conditions.append(condition)
287
                else:
288
                    where.append(condition)
289

  
290
            for dimension_name in drilldown:
291
                dimension = self.dimensions[dimension_name]
284 292
                joins.update(dimension.join)
285
            if dimension.filter_in_join:
286
                join_conditions.append(condition)
287
            else:
288
                where.append(condition)
289

  
290
        for dimension_name in drilldown:
291
            dimension = self.dimensions[dimension_name]
292
            joins.update(dimension.join)
293
            projections.append('%s AS %s' % (dimension.value_label or dimension.value,
294
                                             dimension.name))
295
            group_by.append(dimension.group_by or dimension.value)
296
            order_by.append(dimension.order_by or dimension.value)
297

  
298
        for measure_name in measures:
299
            measure = self.get_measure(measure_name)
300
            if measure.expression not in projections:
301
                projections.append(measure.expression + ' AS ' + measure.name)
302
        sql = 'SELECT ' + ', '.join(projections)
303
        table_expression = ' %s' % self.cube.fact_table
304
        if joins:
305
            join_tree = {}
306
            # Build join tree
307
            for join_name in joins:
308
                join = self.get_join(join_name)
309
                master_table = join.master_table or self.fact_table
310
                join_tree.setdefault(master_table, {}).setdefault(join.kind, {})[join.name] = join
311
            table_expression = build_table_expression(join_tree,
312
                                                      self.fact_table,
313
                                                      other_conditions=join_conditions)
314
        sql += ' FROM %s' % table_expression
315
        where_conditions = 'true'
316
        if where:
317
            where_conditions = ' AND '.join(where)
318
            sql += ' WHERE %s' % where_conditions
319
        if group_by:
320
            sql += ' GROUP BY %s' % ', '.join(group_by)
321
        if order_by:
322
            sql += ' ORDER BY %s' % ', '.join(order_by)
323
        sql = sql.format(fact_table=self.cube.fact_table,
324
                         table_expression=table_expression,
325
                         where_conditions=where_conditions)
326
        sql = sql.format(fact_table=self.cube.fact_table,
327
                         table_expression=table_expression,
328
                         where_conditions=where_conditions)
329
        return sql
293
                projections.append('%s AS %s' % (dimension.value_label or dimension.value,
294
                                                 dimension.name))
295
                group_by.append(dimension.group_by or dimension.value)
296
                order_by.append(dimension.order_by or dimension.value)
297

  
298
            for measure_name in measures:
299
                measure = self.get_measure(measure_name)
300
                if measure.expression not in projections:
301
                    projections.append(measure.expression + ' AS ' + measure.name)
302
            sql = 'SELECT ' + ', '.join(projections)
303
            table_expression = ' %s' % self.cube.fact_table
304
            if joins:
305
                join_tree = {}
306
                # Build join tree
307
                for join_name in joins:
308
                    join = self.get_join(join_name)
309
                    master_table = join.master_table or self.fact_table
310
                    join_tree.setdefault(master_table, {}).setdefault(join.kind, {})[join.name] = join
311
                table_expression = build_table_expression(join_tree,
312
                                                          self.fact_table,
313
                                                          other_conditions=join_conditions)
314
            sql += ' FROM %s' % table_expression
315
            where_conditions = 'true'
316
            if where:
317
                where_conditions = ' AND '.join(where)
318
                sql += ' WHERE %s' % where_conditions
319
            if group_by:
320
                sql += ' GROUP BY %s' % ', '.join(group_by)
321
            if order_by:
322
                sql += ' ORDER BY %s' % ', '.join(order_by)
323
            sql = sql.format(fact_table=self.cube.fact_table,
324
                             table_expression=table_expression,
325
                             where_conditions=where_conditions)
326
            sql = sql.format(fact_table=self.cube.fact_table,
327
                             table_expression=table_expression,
328
                             where_conditions=where_conditions)
329
            return sql
330 330

  
331 331
    def query(self, filters, drilldown, measures, **kwargs):
332 332
        self.engine.log.debug('%s.%s query filters=%s drilldown=%s measures=%s',
......
337 337
            cells.append(self.dimensions[dimension_name])
338 338
        for measure_name in measures:
339 339
            cells.append(self.measures[measure_name])
340
        cursor = self.engine.get_cursor()
341
        sql = self.sql_query(filters=filters, drilldown=drilldown, measures=measures, **kwargs)
342
        self.engine.log.debug('SQL: %s', sql)
343
        cursor.execute(sql)
344
        for row in cursor.fetchall():
345
            yield [{
346
                'name': cell.name,
347
                'label': cell.label,
348
                'type': cell.type,
349
                'value': value,
350
            } for cell, value in zip(cells, row)]
340
        with self.engine.get_cursor() as cursor:
341
            sql = self.sql_query(filters=filters, drilldown=drilldown, measures=measures, **kwargs)
342
            self.engine.log.debug('SQL: %s', sql)
343
            cursor.execute(sql)
344
            for row in cursor.fetchall():
345
                yield [{
346
                    'name': cell.name,
347
                    'label': cell.label,
348
                    'type': cell.type,
349
                    'value': value,
350
                } for cell, value in zip(cells, row)]
351 351

  
352 352

  
353 353
class Engine(object):
......
366 366
    def __getattr__(self, name):
367 367
        return getattr(self.warehouse, name)
368 368

  
369
    @contextlib.contextmanager
369 370
    def get_cursor(self):
370
        connection = psycopg2.connect(
371
            self.warehouse.pg_dsn)
372
        cursor = connection.cursor()
373
        search_path = ', '.join(['"%s"' % namespace for namespace in self.warehouse.search_path])
374
        cursor.execute('SET SEARCH_PATH = %s' % search_path)
375
        return cursor
371
        with contextlib.closing(psycopg2.connect(self.warehouse.pg_dsn)) as connection:
372
            with connection.cursor() as cursor:
373
                search_path = ', '.join(['"%s"' % namespace for namespace in self.warehouse.search_path])
374
                cursor.execute('SET SEARCH_PATH = %s' % search_path)
375
                yield cursor
376
-