14 |
14 |
from .utils import Whatever
|
15 |
15 |
import psycopg2
|
16 |
16 |
import psycopg2.errorcodes
|
|
17 |
import time
|
17 |
18 |
|
18 |
19 |
from cached_property import cached_property
|
19 |
20 |
from wcs_olap.wcs_api import WcsApiError
|
... | ... | |
329 |
330 |
def default_ctx(self):
|
330 |
331 |
return self.ctx.as_dict()
|
331 |
332 |
|
|
333 |
def schema_exists(self, name):
|
|
334 |
self.ex('SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s', vars=[name])
|
|
335 |
return bool(self.cur.fetchone())
|
|
336 |
|
332 |
337 |
def ex(self, query, ctx=None, vars=None):
|
333 |
338 |
ctx = ctx or {}
|
334 |
339 |
ctx.update(self.default_ctx)
|
... | ... | |
337 |
342 |
try:
|
338 |
343 |
self.cur.execute(sql, vars=vars)
|
339 |
344 |
except Exception as e:
|
340 |
|
self.logger.error('Failed to execute %r with vars %s, raised %s', sql, reprlib.repr(vars or []), e)
|
|
345 |
self.logger.warning('Failed to execute %r with vars %s, raised %s', sql, reprlib.repr(vars or []), e)
|
341 |
346 |
raise
|
342 |
347 |
|
|
348 |
def retry_on_db_error(self, function, times, sleep_time, transaction=False):
|
|
349 |
for i in range(times):
|
|
350 |
try:
|
|
351 |
if transaction:
|
|
352 |
self.ex('BEGIN')
|
|
353 |
function()
|
|
354 |
if transaction:
|
|
355 |
self.ex('COMMIT')
|
|
356 |
break
|
|
357 |
except psycopg2.Error:
|
|
358 |
if transaction:
|
|
359 |
self.ex('ROLLBACK')
|
|
360 |
if i == times - 1:
|
|
361 |
raise
|
|
362 |
self.logger.warning('failure during retry', exc_info=True)
|
|
363 |
time.sleep(sleep_time)
|
|
364 |
|
343 |
365 |
def do_schema(self):
|
344 |
366 |
self.ex('SET search_path = public')
|
345 |
367 |
self.logger.debug('dropping schema %s', self.schema_temp)
|
... | ... | |
620 |
642 |
else:
|
621 |
643 |
if self.do_feed:
|
622 |
644 |
if not self.fake:
|
623 |
|
self.logger.debug('dropping schema %s', self.schema)
|
624 |
|
self.drop_tables_sequencially(self.schema)
|
625 |
|
self.ex('DROP SCHEMA IF EXISTS {schema} CASCADE')
|
626 |
|
self.logger.debug('renaming schema %s to %s', self.schema + '_temp', self.schema)
|
627 |
|
self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}')
|
|
645 |
self.switch_schemas()
|
628 |
646 |
|
629 |
647 |
if 'cubes_model_dirs' in self.config:
|
630 |
648 |
model_path = os.path.join(self.config['cubes_model_dirs'], '%s.model' % self.schema)
|
... | ... | |
635 |
653 |
self.cur.close()
|
636 |
654 |
self.connection.close()
|
637 |
655 |
|
|
656 |
def switch_schemas(self):
|
|
657 |
self.old_schema = truncate_pg_identifier(
|
|
658 |
self.schema
|
|
659 |
+ '_old_'
|
|
660 |
+ datetime.datetime.now().strftime('%Y%m%d%H%M'))
|
|
661 |
self.ctx.push({'old_schema': self.old_schema})
|
|
662 |
|
|
663 |
try:
|
|
664 |
def switch():
|
|
665 |
self.logger.info('renaming schema %s to %s and schema %s to %s',
|
|
666 |
self.schema, self.old_schema,
|
|
667 |
self.schema_temp, self.schema)
|
|
668 |
if self.schema_exists(self.schema):
|
|
669 |
self.ex('ALTER SCHEMA {schema} RENAME TO {old_schema}')
|
|
670 |
self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}')
|
|
671 |
self.retry_on_db_error(
|
|
672 |
switch,
|
|
673 |
times=3,
|
|
674 |
sleep_time=16,
|
|
675 |
transaction=True)
|
|
676 |
except Exception:
|
|
677 |
self.logger.warning('failed to switch schemas 3-times in 33 seconds')
|
|
678 |
raise
|
|
679 |
|
|
680 |
try:
|
|
681 |
def drop_old_schema():
|
|
682 |
self.logger.info('dropping schema %s', self.old_schema)
|
|
683 |
self.drop_tables_sequencially(self.old_schema)
|
|
684 |
self.ex('DROP SCHEMA IF EXISTS {old_schema} CASCADE')
|
|
685 |
self.retry_on_db_error(
|
|
686 |
drop_old_schema,
|
|
687 |
times=4,
|
|
688 |
sleep_time=16)
|
|
689 |
except Exception:
|
|
690 |
self.logger.exception('could not drop schema %s', self.old_schema)
|
|
691 |
|
638 |
692 |
def insert_agent(self, name):
|
639 |
693 |
self.ex('SELECT id FROM {agent_table} WHERE label = %s', vars=(name,))
|
640 |
694 |
res = self.cur.fetchone()
|
641 |
|
-
|