14 |
14 |
# You should have received a copy of the GNU Affero General Public License
|
15 |
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
16 |
16 |
|
|
17 |
import contextlib
|
17 |
18 |
import logging
|
18 |
19 |
import itertools
|
19 |
20 |
|
... | ... | |
59 |
60 |
@property
|
60 |
61 |
def members(self):
|
61 |
62 |
assert self.type != 'date'
|
62 |
|
cursor = self.engine.get_cursor()
|
63 |
|
sql = self.members_query
|
64 |
|
if not sql:
|
65 |
|
if self.dimension.join:
|
66 |
|
join = self.engine_cube.get_join(self.dimension.join[-1])
|
67 |
|
sql = ('SELECT %s AS value, %s::text AS label FROM %s AS "%s" '
|
68 |
|
'GROUP BY %s, %s ORDER BY %s' % (
|
69 |
|
self.value, self.value_label or self.value, join.table, join.name,
|
70 |
|
self.value, self.value_label or self.value, self.order_by or self.value))
|
71 |
|
else:
|
72 |
|
sql = ('SELECT %s AS value, %s::text AS label FROM {fact_table} '
|
73 |
|
'GROUP BY %s, %s ORDER BY %s' % (
|
74 |
|
self.value, self.value_label or self.value, self.value,
|
75 |
|
self.value_label or self.value, self.self.order_by or self.value))
|
76 |
|
sql = sql.format(fact_table=self.engine_cube.fact_table)
|
77 |
|
self.engine.log.debug('SQL: %s', sql)
|
78 |
|
cursor.execute(sql)
|
79 |
|
for row in cursor.fetchall():
|
80 |
|
yield Member(*row)
|
|
63 |
with self.engine.get_cursor() as cursor:
|
|
64 |
sql = self.members_query
|
|
65 |
if not sql:
|
|
66 |
if self.dimension.join:
|
|
67 |
join = self.engine_cube.get_join(self.dimension.join[-1])
|
|
68 |
sql = ('SELECT %s AS value, %s::text AS label FROM %s AS "%s" '
|
|
69 |
'GROUP BY %s, %s ORDER BY %s' % (
|
|
70 |
self.value, self.value_label or self.value, join.table, join.name,
|
|
71 |
self.value, self.value_label or self.value, self.order_by or self.value))
|
|
72 |
else:
|
|
73 |
sql = ('SELECT %s AS value, %s::text AS label FROM {fact_table} '
|
|
74 |
'GROUP BY %s, %s ORDER BY %s' % (
|
|
75 |
self.value, self.value_label or self.value, self.value,
|
|
76 |
self.value_label or self.value, self.self.order_by or self.value))
|
|
77 |
sql = sql.format(fact_table=self.engine_cube.fact_table)
|
|
78 |
self.engine.log.debug('SQL: %s', sql)
|
|
79 |
cursor.execute(sql)
|
|
80 |
for row in cursor.fetchall():
|
|
81 |
yield Member(*row)
|
81 |
82 |
|
82 |
83 |
|
83 |
84 |
class SchemaJSONDimension(schemas.Dimension):
|
... | ... | |
169 |
170 |
if not self.engine_cube.json_field:
|
170 |
171 |
return []
|
171 |
172 |
if not self.__cache:
|
172 |
|
cursor = self.engine.get_cursor()
|
173 |
|
sql = 'select distinct jsonb_object_keys(json_data) as a from formdata order by a'
|
174 |
|
cursor.execute(sql)
|
175 |
|
self.__cache = [row[0] for row in cursor.fetchall()]
|
|
173 |
with self.engine.get_cursor() as cursor:
|
|
174 |
sql = 'select distinct jsonb_object_keys(json_data) as a from formdata order by a'
|
|
175 |
cursor.execute(sql)
|
|
176 |
self.__cache = [row[0] for row in cursor.fetchall()]
|
176 |
177 |
return self.__cache
|
177 |
178 |
|
178 |
179 |
def __iter__(self):
|
... | ... | |
249 |
250 |
return getattr(self.cube, name)
|
250 |
251 |
|
251 |
252 |
def count(self):
|
252 |
|
cursor = self.engine.get_cursor()
|
253 |
|
cursor.execute('SELECT count(%s) FROM %s' % (self.key, self.fact_table))
|
254 |
|
return cursor.fetchone()[0]
|
|
253 |
with self.engine.get_cursor() as cursor:
|
|
254 |
cursor.execute('SELECT count(%s) FROM %s' % (self.key, self.fact_table))
|
|
255 |
return cursor.fetchone()[0]
|
255 |
256 |
|
256 |
257 |
def get_join(self, name):
|
257 |
258 |
if name.startswith('json_'):
|
... | ... | |
266 |
267 |
return self.cube.get_join(name)
|
267 |
268 |
|
268 |
269 |
def sql_query(self, filters, drilldown, measures, **kwargs):
|
269 |
|
cursor = self.engine.get_cursor()
|
270 |
|
|
271 |
|
projections = []
|
272 |
|
joins = set()
|
273 |
|
where = []
|
274 |
|
group_by = []
|
275 |
|
order_by = []
|
276 |
|
join_conditions = []
|
277 |
|
|
278 |
|
for dimension_name, values in filters:
|
279 |
|
dimension = self.dimensions[dimension_name]
|
280 |
|
# assert dimension.filter
|
281 |
|
condition, values = dimension.build_filter(values)
|
282 |
|
condition = cursor.mogrify(condition, values)
|
283 |
|
if dimension.filter_needs_join:
|
|
270 |
with self.engine.get_cursor() as cursor:
|
|
271 |
projections = []
|
|
272 |
joins = set()
|
|
273 |
where = []
|
|
274 |
group_by = []
|
|
275 |
order_by = []
|
|
276 |
join_conditions = []
|
|
277 |
|
|
278 |
for dimension_name, values in filters:
|
|
279 |
dimension = self.dimensions[dimension_name]
|
|
280 |
# assert dimension.filter
|
|
281 |
condition, values = dimension.build_filter(values)
|
|
282 |
condition = cursor.mogrify(condition, values)
|
|
283 |
if dimension.filter_needs_join:
|
|
284 |
joins.update(dimension.join)
|
|
285 |
if dimension.filter_in_join:
|
|
286 |
join_conditions.append(condition)
|
|
287 |
else:
|
|
288 |
where.append(condition)
|
|
289 |
|
|
290 |
for dimension_name in drilldown:
|
|
291 |
dimension = self.dimensions[dimension_name]
|
284 |
292 |
joins.update(dimension.join)
|
285 |
|
if dimension.filter_in_join:
|
286 |
|
join_conditions.append(condition)
|
287 |
|
else:
|
288 |
|
where.append(condition)
|
289 |
|
|
290 |
|
for dimension_name in drilldown:
|
291 |
|
dimension = self.dimensions[dimension_name]
|
292 |
|
joins.update(dimension.join)
|
293 |
|
projections.append('%s AS %s' % (dimension.value_label or dimension.value,
|
294 |
|
dimension.name))
|
295 |
|
group_by.append(dimension.group_by or dimension.value)
|
296 |
|
order_by.append(dimension.order_by or dimension.value)
|
297 |
|
|
298 |
|
for measure_name in measures:
|
299 |
|
measure = self.get_measure(measure_name)
|
300 |
|
if measure.expression not in projections:
|
301 |
|
projections.append(measure.expression + ' AS ' + measure.name)
|
302 |
|
sql = 'SELECT ' + ', '.join(projections)
|
303 |
|
table_expression = ' %s' % self.cube.fact_table
|
304 |
|
if joins:
|
305 |
|
join_tree = {}
|
306 |
|
# Build join tree
|
307 |
|
for join_name in joins:
|
308 |
|
join = self.get_join(join_name)
|
309 |
|
master_table = join.master_table or self.fact_table
|
310 |
|
join_tree.setdefault(master_table, {}).setdefault(join.kind, {})[join.name] = join
|
311 |
|
table_expression = build_table_expression(join_tree,
|
312 |
|
self.fact_table,
|
313 |
|
other_conditions=join_conditions)
|
314 |
|
sql += ' FROM %s' % table_expression
|
315 |
|
where_conditions = 'true'
|
316 |
|
if where:
|
317 |
|
where_conditions = ' AND '.join(where)
|
318 |
|
sql += ' WHERE %s' % where_conditions
|
319 |
|
if group_by:
|
320 |
|
sql += ' GROUP BY %s' % ', '.join(group_by)
|
321 |
|
if order_by:
|
322 |
|
sql += ' ORDER BY %s' % ', '.join(order_by)
|
323 |
|
sql = sql.format(fact_table=self.cube.fact_table,
|
324 |
|
table_expression=table_expression,
|
325 |
|
where_conditions=where_conditions)
|
326 |
|
sql = sql.format(fact_table=self.cube.fact_table,
|
327 |
|
table_expression=table_expression,
|
328 |
|
where_conditions=where_conditions)
|
329 |
|
return sql
|
|
293 |
projections.append('%s AS %s' % (dimension.value_label or dimension.value,
|
|
294 |
dimension.name))
|
|
295 |
group_by.append(dimension.group_by or dimension.value)
|
|
296 |
order_by.append(dimension.order_by or dimension.value)
|
|
297 |
|
|
298 |
for measure_name in measures:
|
|
299 |
measure = self.get_measure(measure_name)
|
|
300 |
if measure.expression not in projections:
|
|
301 |
projections.append(measure.expression + ' AS ' + measure.name)
|
|
302 |
sql = 'SELECT ' + ', '.join(projections)
|
|
303 |
table_expression = ' %s' % self.cube.fact_table
|
|
304 |
if joins:
|
|
305 |
join_tree = {}
|
|
306 |
# Build join tree
|
|
307 |
for join_name in joins:
|
|
308 |
join = self.get_join(join_name)
|
|
309 |
master_table = join.master_table or self.fact_table
|
|
310 |
join_tree.setdefault(master_table, {}).setdefault(join.kind, {})[join.name] = join
|
|
311 |
table_expression = build_table_expression(join_tree,
|
|
312 |
self.fact_table,
|
|
313 |
other_conditions=join_conditions)
|
|
314 |
sql += ' FROM %s' % table_expression
|
|
315 |
where_conditions = 'true'
|
|
316 |
if where:
|
|
317 |
where_conditions = ' AND '.join(where)
|
|
318 |
sql += ' WHERE %s' % where_conditions
|
|
319 |
if group_by:
|
|
320 |
sql += ' GROUP BY %s' % ', '.join(group_by)
|
|
321 |
if order_by:
|
|
322 |
sql += ' ORDER BY %s' % ', '.join(order_by)
|
|
323 |
sql = sql.format(fact_table=self.cube.fact_table,
|
|
324 |
table_expression=table_expression,
|
|
325 |
where_conditions=where_conditions)
|
|
326 |
sql = sql.format(fact_table=self.cube.fact_table,
|
|
327 |
table_expression=table_expression,
|
|
328 |
where_conditions=where_conditions)
|
|
329 |
return sql
|
330 |
330 |
|
331 |
331 |
def query(self, filters, drilldown, measures, **kwargs):
|
332 |
332 |
self.engine.log.debug('%s.%s query filters=%s drilldown=%s measures=%s',
|
... | ... | |
337 |
337 |
cells.append(self.dimensions[dimension_name])
|
338 |
338 |
for measure_name in measures:
|
339 |
339 |
cells.append(self.measures[measure_name])
|
340 |
|
cursor = self.engine.get_cursor()
|
341 |
|
sql = self.sql_query(filters=filters, drilldown=drilldown, measures=measures, **kwargs)
|
342 |
|
self.engine.log.debug('SQL: %s', sql)
|
343 |
|
cursor.execute(sql)
|
344 |
|
for row in cursor.fetchall():
|
345 |
|
yield [{
|
346 |
|
'name': cell.name,
|
347 |
|
'label': cell.label,
|
348 |
|
'type': cell.type,
|
349 |
|
'value': value,
|
350 |
|
} for cell, value in zip(cells, row)]
|
|
340 |
with self.engine.get_cursor() as cursor:
|
|
341 |
sql = self.sql_query(filters=filters, drilldown=drilldown, measures=measures, **kwargs)
|
|
342 |
self.engine.log.debug('SQL: %s', sql)
|
|
343 |
cursor.execute(sql)
|
|
344 |
for row in cursor.fetchall():
|
|
345 |
yield [{
|
|
346 |
'name': cell.name,
|
|
347 |
'label': cell.label,
|
|
348 |
'type': cell.type,
|
|
349 |
'value': value,
|
|
350 |
} for cell, value in zip(cells, row)]
|
351 |
351 |
|
352 |
352 |
|
353 |
353 |
class Engine(object):
|
... | ... | |
366 |
366 |
def __getattr__(self, name):
|
367 |
367 |
return getattr(self.warehouse, name)
|
368 |
368 |
|
|
369 |
@contextlib.contextmanager
|
369 |
370 |
def get_cursor(self):
|
370 |
|
connection = psycopg2.connect(
|
371 |
|
self.warehouse.pg_dsn)
|
372 |
|
cursor = connection.cursor()
|
373 |
|
search_path = ', '.join(['"%s"' % namespace for namespace in self.warehouse.search_path])
|
374 |
|
cursor.execute('SET SEARCH_PATH = %s' % search_path)
|
375 |
|
return cursor
|
|
371 |
with contextlib.closing(psycopg2.connect(self.warehouse.pg_dsn)) as connection:
|
|
372 |
with connection.cursor() as cursor:
|
|
373 |
search_path = ', '.join(['"%s"' % namespace for namespace in self.warehouse.search_path])
|
|
374 |
cursor.execute('SET SEARCH_PATH = %s' % search_path)
|
|
375 |
yield cursor
|
376 |
|
-
|