From 01a0e57815f947a060d2cdb149326f630339caf4 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Sat, 7 Aug 2021 18:43:29 +0200 Subject: [PATCH] feeder: prevent situation of half-dropped schema (#54658) To prevent loosing currently loaded data wcs-olap, failing ro rename the temporary schema to its final name, wcs-olap will: - first, inside a transaction, rename the current schema instead of dropping it, then rename the new schema to the current schema's name; in case of failure it will retry 33 times sleeping 1 second between each attempt; - if successfull, drop the renamed old schema, again in a retry loop, if it fails to drop it logs an error, without aborting the current feeding. --- wcs_olap/feeder.py | 66 +++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 60 insertions(+), 6 deletions(-) diff --git a/wcs_olap/feeder.py b/wcs_olap/feeder.py index 9eb0a08..eabb679 100644 --- a/wcs_olap/feeder.py +++ b/wcs_olap/feeder.py @@ -14,6 +14,7 @@ import reprlib from .utils import Whatever import psycopg2 import psycopg2.errorcodes +import time from cached_property import cached_property from wcs_olap.wcs_api import WcsApiError @@ -329,6 +330,10 @@ class WcsOlapFeeder(object): def default_ctx(self): return self.ctx.as_dict() + def schema_exists(self, name): + self.ex('SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s', vars=[name]) + return bool(self.cur.fetchone()) + def ex(self, query, ctx=None, vars=None): ctx = ctx or {} ctx.update(self.default_ctx) @@ -337,9 +342,26 @@ class WcsOlapFeeder(object): try: self.cur.execute(sql, vars=vars) except Exception as e: - self.logger.error('Failed to execute %r with vars %s, raised %s', sql, reprlib.repr(vars or []), e) + self.logger.warning('Failed to execute %r with vars %s, raised %s', sql, reprlib.repr(vars or []), e) raise + def retry_on_db_error(self, function, times, sleep_time, transaction=False): + for i in range(times): + try: + if transaction: + self.ex('BEGIN') + function() + if transaction: + self.ex('COMMIT') + break + except psycopg2.Error: + if transaction: + self.ex('ROLLBACK') + if i == times - 1: + raise + self.logger.warning('failure during retry', exc_info=True) + time.sleep(sleep_time) + def do_schema(self): self.ex('SET search_path = public') self.logger.debug('dropping schema %s', self.schema_temp) @@ -620,11 +642,7 @@ class WcsOlapFeeder(object): else: if self.do_feed: if not self.fake: - self.logger.debug('dropping schema %s', self.schema) - self.drop_tables_sequencially(self.schema) - self.ex('DROP SCHEMA IF EXISTS {schema} CASCADE') - self.logger.debug('renaming schema %s to %s', self.schema + '_temp', self.schema) - self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}') + self.switch_schemas() if 'cubes_model_dirs' in self.config: model_path = os.path.join(self.config['cubes_model_dirs'], '%s.model' % self.schema) @@ -635,6 +653,42 @@ class WcsOlapFeeder(object): self.cur.close() self.connection.close() + def switch_schemas(self): + self.old_schema = truncate_pg_identifier( + self.schema + + '_old_' + + datetime.datetime.now().strftime('%Y%m%d%H%M')) + self.ctx.push({'old_schema': self.old_schema}) + + try: + def switch(): + self.logger.info('renaming schema %s to %s and schema %s to %s', + self.schema, self.old_schema, + self.schema_temp, self.schema) + if self.schema_exists(self.schema): + self.ex('ALTER SCHEMA {schema} RENAME TO {old_schema}') + self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}') + self.retry_on_db_error( + switch, + times=33, + sleep_time=1, + transaction=True) + except Exception: + self.logger.warning('failed to switch schemas 3-times in 33 seconds') + raise + + try: + def drop_old_schema(): + self.logger.info('dropping schema %s', self.old_schema) + self.drop_tables_sequencially(self.old_schema) + self.ex('DROP SCHEMA IF EXISTS {old_schema} CASCADE') + self.retry_on_db_error( + drop_old_schema, + times=33, + sleep_time=1) + except Exception: + self.logger.exception('could not drop schema %s', self.old_schema) + def insert_agent(self, name): self.ex('SELECT id FROM {agent_table} WHERE label = %s', vars=(name,)) res = self.cur.fetchone() -- 2.32.0.rc0