14 |
14 |
from .utils import Whatever
|
15 |
15 |
import psycopg2
|
16 |
16 |
import psycopg2.errorcodes
|
|
17 |
import time
|
17 |
18 |
|
18 |
19 |
from cached_property import cached_property
|
19 |
20 |
from wcs_olap.wcs_api import WcsApiError
|
... | ... | |
330 |
331 |
def default_ctx(self):
|
331 |
332 |
return self.ctx.as_dict()
|
332 |
333 |
|
|
334 |
def schema_exists(self, name):
|
|
335 |
self.ex('SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s', vars=[name])
|
|
336 |
return bool(self.cur.fetchone())
|
|
337 |
|
333 |
338 |
def ex(self, query, ctx=None, vars=None):
|
334 |
339 |
ctx = ctx or {}
|
335 |
340 |
ctx.update(self.default_ctx)
|
... | ... | |
341 |
346 |
self.logger.warning('Failed to execute %r with vars %s, raised %s', sql, reprlib.repr(vars or []), e)
|
342 |
347 |
raise
|
343 |
348 |
|
|
349 |
@contextlib.contextmanager
|
|
350 |
def atomic(self):
|
|
351 |
self.ex('BEGIN')
|
|
352 |
try:
|
|
353 |
yield
|
|
354 |
self.ex('COMMIT')
|
|
355 |
except Exception:
|
|
356 |
self.ex('ROLLBACK')
|
|
357 |
raise
|
|
358 |
|
|
359 |
def retry_on_db_error(self, function, times, sleep_time):
|
|
360 |
for i in range(times):
|
|
361 |
try:
|
|
362 |
function()
|
|
363 |
break
|
|
364 |
except psycopg2.Error:
|
|
365 |
if i == times - 1:
|
|
366 |
raise
|
|
367 |
self.logger.warning('failure during retry', exc_info=True)
|
|
368 |
time.sleep(sleep_time)
|
|
369 |
|
344 |
370 |
def do_schema(self):
|
345 |
371 |
self.ex('SET search_path = public')
|
346 |
372 |
self.logger.debug('dropping schema %s', self.schema_temp)
|
... | ... | |
622 |
648 |
else:
|
623 |
649 |
if self.do_feed:
|
624 |
650 |
if not self.fake:
|
625 |
|
self.logger.debug('dropping schema %s', self.schema)
|
626 |
|
self.drop_tables_sequencially(self.schema)
|
627 |
|
self.ex('DROP SCHEMA IF EXISTS {schema} CASCADE')
|
628 |
|
self.logger.debug('renaming schema %s to %s', self.schema + '_temp', self.schema)
|
629 |
|
self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}')
|
|
651 |
self.switch_schemas()
|
630 |
652 |
|
631 |
653 |
if 'cubes_model_dirs' in self.config:
|
632 |
654 |
model_path = os.path.join(self.config['cubes_model_dirs'], '%s.model' % self.schema)
|
... | ... | |
637 |
659 |
self.cur.close()
|
638 |
660 |
self.connection.close()
|
639 |
661 |
|
|
662 |
def switch_schemas(self):
|
|
663 |
self.old_schema = truncate_pg_identifier(
|
|
664 |
self.schema
|
|
665 |
+ '_old_'
|
|
666 |
+ datetime.datetime.now().strftime('%Y%m%d%H%M'))
|
|
667 |
self.ctx.push({'old_schema': self.old_schema})
|
|
668 |
|
|
669 |
try:
|
|
670 |
def switch():
|
|
671 |
with self.atomic():
|
|
672 |
self.logger.info('renaming schema %s to %s and schema %s to %s',
|
|
673 |
self.schema, self.old_schema,
|
|
674 |
self.schema_temp, self.schema)
|
|
675 |
if self.schema_exists(self.schema):
|
|
676 |
self.ex('ALTER SCHEMA {schema} RENAME TO {old_schema}')
|
|
677 |
self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}')
|
|
678 |
self.retry_on_db_error(
|
|
679 |
switch,
|
|
680 |
times=33,
|
|
681 |
sleep_time=1)
|
|
682 |
except Exception:
|
|
683 |
self.logger.warning('failed to switch schemas 3-times in 33 seconds')
|
|
684 |
raise
|
|
685 |
|
|
686 |
try:
|
|
687 |
def drop_old_schema():
|
|
688 |
self.logger.info('dropping schema %s', self.old_schema)
|
|
689 |
self.drop_tables_sequencially(self.old_schema)
|
|
690 |
self.ex('DROP SCHEMA IF EXISTS {old_schema} CASCADE')
|
|
691 |
self.retry_on_db_error(
|
|
692 |
drop_old_schema,
|
|
693 |
times=33,
|
|
694 |
sleep_time=1)
|
|
695 |
except Exception:
|
|
696 |
self.logger.exception('could not drop schema %s', self.old_schema)
|
|
697 |
|
640 |
698 |
def insert_agent(self, name):
|
641 |
699 |
self.ex('SELECT id FROM {agent_table} WHERE label = %s', vars=(name,))
|
642 |
700 |
res = self.cur.fetchone()
|
643 |
|
-
|