From a44e0be5d5caddfd51ae86c6c37d7ea5582027dd Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Tue, 20 Nov 2018 13:18:34 +0100 Subject: [PATCH 1/3] engine: close connections after usage (#27482) --- bijoe/engine.py | 206 ++++++++++++++++++++++++------------------------ 1 file changed, 103 insertions(+), 103 deletions(-) diff --git a/bijoe/engine.py b/bijoe/engine.py index be8645d..daa7857 100644 --- a/bijoe/engine.py +++ b/bijoe/engine.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +import contextlib import logging import itertools @@ -59,25 +60,25 @@ class EngineDimension(object): @property def members(self): assert self.type != 'date' - cursor = self.engine.get_cursor() - sql = self.members_query - if not sql: - if self.dimension.join: - join = self.engine_cube.get_join(self.dimension.join[-1]) - sql = ('SELECT %s AS value, %s::text AS label FROM %s AS "%s" ' - 'GROUP BY %s, %s ORDER BY %s' % ( - self.value, self.value_label or self.value, join.table, join.name, - self.value, self.value_label or self.value, self.order_by or self.value)) - else: - sql = ('SELECT %s AS value, %s::text AS label FROM {fact_table} ' - 'GROUP BY %s, %s ORDER BY %s' % ( - self.value, self.value_label or self.value, self.value, - self.value_label or self.value, self.self.order_by or self.value)) - sql = sql.format(fact_table=self.engine_cube.fact_table) - self.engine.log.debug('SQL: %s', sql) - cursor.execute(sql) - for row in cursor.fetchall(): - yield Member(*row) + with self.engine.get_cursor() as cursor: + sql = self.members_query + if not sql: + if self.dimension.join: + join = self.engine_cube.get_join(self.dimension.join[-1]) + sql = ('SELECT %s AS value, %s::text AS label FROM %s AS "%s" ' + 'GROUP BY %s, %s ORDER BY %s' % ( + self.value, self.value_label or self.value, join.table, join.name, + self.value, self.value_label or self.value, self.order_by or self.value)) + else: + sql = ('SELECT %s AS value, %s::text AS label FROM {fact_table} ' + 'GROUP BY %s, %s ORDER BY %s' % ( + self.value, self.value_label or self.value, self.value, + self.value_label or self.value, self.self.order_by or self.value)) + sql = sql.format(fact_table=self.engine_cube.fact_table) + self.engine.log.debug('SQL: %s', sql) + cursor.execute(sql) + for row in cursor.fetchall(): + yield Member(*row) class SchemaJSONDimension(schemas.Dimension): @@ -169,10 +170,10 @@ class JSONDimensions(object): if not self.engine_cube.json_field: return [] if not self.__cache: - cursor = self.engine.get_cursor() - sql = 'select distinct jsonb_object_keys(json_data) as a from formdata order by a' - cursor.execute(sql) - self.__cache = [row[0] for row in cursor.fetchall()] + with self.engine.get_cursor() as cursor: + sql = 'select distinct jsonb_object_keys(json_data) as a from formdata order by a' + cursor.execute(sql) + self.__cache = [row[0] for row in cursor.fetchall()] return self.__cache def __iter__(self): @@ -249,9 +250,9 @@ class EngineCube(object): return getattr(self.cube, name) def count(self): - cursor = self.engine.get_cursor() - cursor.execute('SELECT count(%s) FROM %s' % (self.key, self.fact_table)) - return cursor.fetchone()[0] + with self.engine.get_cursor() as cursor: + cursor.execute('SELECT count(%s) FROM %s' % (self.key, self.fact_table)) + return cursor.fetchone()[0] def get_join(self, name): if name.startswith('json_'): @@ -266,67 +267,66 @@ class EngineCube(object): return self.cube.get_join(name) def sql_query(self, filters, drilldown, measures, **kwargs): - cursor = self.engine.get_cursor() - - projections = [] - joins = set() - where = [] - group_by = [] - order_by = [] - join_conditions = [] - - for dimension_name, values in filters: - dimension = self.dimensions[dimension_name] - # assert dimension.filter - condition, values = dimension.build_filter(values) - condition = cursor.mogrify(condition, values) - if dimension.filter_needs_join: + with self.engine.get_cursor() as cursor: + projections = [] + joins = set() + where = [] + group_by = [] + order_by = [] + join_conditions = [] + + for dimension_name, values in filters: + dimension = self.dimensions[dimension_name] + # assert dimension.filter + condition, values = dimension.build_filter(values) + condition = cursor.mogrify(condition, values) + if dimension.filter_needs_join: + joins.update(dimension.join) + if dimension.filter_in_join: + join_conditions.append(condition) + else: + where.append(condition) + + for dimension_name in drilldown: + dimension = self.dimensions[dimension_name] joins.update(dimension.join) - if dimension.filter_in_join: - join_conditions.append(condition) - else: - where.append(condition) - - for dimension_name in drilldown: - dimension = self.dimensions[dimension_name] - joins.update(dimension.join) - projections.append('%s AS %s' % (dimension.value_label or dimension.value, - dimension.name)) - group_by.append(dimension.group_by or dimension.value) - order_by.append(dimension.order_by or dimension.value) - - for measure_name in measures: - measure = self.get_measure(measure_name) - if measure.expression not in projections: - projections.append(measure.expression + ' AS ' + measure.name) - sql = 'SELECT ' + ', '.join(projections) - table_expression = ' %s' % self.cube.fact_table - if joins: - join_tree = {} - # Build join tree - for join_name in joins: - join = self.get_join(join_name) - master_table = join.master_table or self.fact_table - join_tree.setdefault(master_table, {}).setdefault(join.kind, {})[join.name] = join - table_expression = build_table_expression(join_tree, - self.fact_table, - other_conditions=join_conditions) - sql += ' FROM %s' % table_expression - where_conditions = 'true' - if where: - where_conditions = ' AND '.join(where) - sql += ' WHERE %s' % where_conditions - if group_by: - sql += ' GROUP BY %s' % ', '.join(group_by) - if order_by: - sql += ' ORDER BY %s' % ', '.join(order_by) - sql = sql.format(fact_table=self.cube.fact_table, - table_expression=table_expression, - where_conditions=where_conditions) - sql = sql.format(fact_table=self.cube.fact_table, - table_expression=table_expression, - where_conditions=where_conditions) - return sql + projections.append('%s AS %s' % (dimension.value_label or dimension.value, + dimension.name)) + group_by.append(dimension.group_by or dimension.value) + order_by.append(dimension.order_by or dimension.value) + + for measure_name in measures: + measure = self.get_measure(measure_name) + if measure.expression not in projections: + projections.append(measure.expression + ' AS ' + measure.name) + sql = 'SELECT ' + ', '.join(projections) + table_expression = ' %s' % self.cube.fact_table + if joins: + join_tree = {} + # Build join tree + for join_name in joins: + join = self.get_join(join_name) + master_table = join.master_table or self.fact_table + join_tree.setdefault(master_table, {}).setdefault(join.kind, {})[join.name] = join + table_expression = build_table_expression(join_tree, + self.fact_table, + other_conditions=join_conditions) + sql += ' FROM %s' % table_expression + where_conditions = 'true' + if where: + where_conditions = ' AND '.join(where) + sql += ' WHERE %s' % where_conditions + if group_by: + sql += ' GROUP BY %s' % ', '.join(group_by) + if order_by: + sql += ' ORDER BY %s' % ', '.join(order_by) + sql = sql.format(fact_table=self.cube.fact_table, + table_expression=table_expression, + where_conditions=where_conditions) + sql = sql.format(fact_table=self.cube.fact_table, + table_expression=table_expression, + where_conditions=where_conditions) + return sql def query(self, filters, drilldown, measures, **kwargs): self.engine.log.debug('%s.%s query filters=%s drilldown=%s measures=%s', @@ -337,17 +337,17 @@ class EngineCube(object): cells.append(self.dimensions[dimension_name]) for measure_name in measures: cells.append(self.measures[measure_name]) - cursor = self.engine.get_cursor() - sql = self.sql_query(filters=filters, drilldown=drilldown, measures=measures, **kwargs) - self.engine.log.debug('SQL: %s', sql) - cursor.execute(sql) - for row in cursor.fetchall(): - yield [{ - 'name': cell.name, - 'label': cell.label, - 'type': cell.type, - 'value': value, - } for cell, value in zip(cells, row)] + with self.engine.get_cursor() as cursor: + sql = self.sql_query(filters=filters, drilldown=drilldown, measures=measures, **kwargs) + self.engine.log.debug('SQL: %s', sql) + cursor.execute(sql) + for row in cursor.fetchall(): + yield [{ + 'name': cell.name, + 'label': cell.label, + 'type': cell.type, + 'value': value, + } for cell, value in zip(cells, row)] class Engine(object): @@ -366,10 +366,10 @@ class Engine(object): def __getattr__(self, name): return getattr(self.warehouse, name) + @contextlib.contextmanager def get_cursor(self): - connection = psycopg2.connect( - self.warehouse.pg_dsn) - cursor = connection.cursor() - search_path = ', '.join(['"%s"' % namespace for namespace in self.warehouse.search_path]) - cursor.execute('SET SEARCH_PATH = %s' % search_path) - return cursor + with contextlib.closing(psycopg2.connect(self.warehouse.pg_dsn)) as connection: + with connection.cursor() as cursor: + search_path = ', '.join(['"%s"' % namespace for namespace in self.warehouse.search_path]) + cursor.execute('SET SEARCH_PATH = %s' % search_path) + yield cursor -- 2.18.0