From 8ce55f33c567f46cf357cb3a7ce322997bb63987 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Sat, 7 Aug 2021 18:43:29 +0200 Subject: [PATCH] feeder: prevent situation of half-dropped schema (#54658) To prevent loosing currently loaded data wcs-olap, failing ro rename the temporary schema to its final name, wcs-olap will: - first, inside a transaction, rename the current schema instead of dropping it, then rename the new schema to the current schema's name; in case of failure it will retry 33 times sleeping 1 second between each attempt; - if successfull, drop the renamed old schema, again in a retry loop, if it fails to drop it logs an error, without aborting the current feeding. --- wcs_olap/feeder.py | 68 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 63 insertions(+), 5 deletions(-) diff --git a/wcs_olap/feeder.py b/wcs_olap/feeder.py index bd1dee4..6031d4d 100644 --- a/wcs_olap/feeder.py +++ b/wcs_olap/feeder.py @@ -14,6 +14,7 @@ import reprlib from .utils import Whatever import psycopg2 import psycopg2.errorcodes +import time from cached_property import cached_property from wcs_olap.wcs_api import WcsApiError @@ -330,6 +331,10 @@ class WcsOlapFeeder(object): def default_ctx(self): return self.ctx.as_dict() + def schema_exists(self, name): + self.ex('SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s', vars=[name]) + return bool(self.cur.fetchone()) + def ex(self, query, ctx=None, vars=None): ctx = ctx or {} ctx.update(self.default_ctx) @@ -341,6 +346,27 @@ class WcsOlapFeeder(object): self.logger.warning('Failed to execute %r with vars %s, raised %s', sql, reprlib.repr(vars or []), e) raise + @contextlib.contextmanager + def atomic(self): + self.ex('BEGIN') + try: + yield + self.ex('COMMIT') + except Exception: + self.ex('ROLLBACK') + raise + + def retry_on_db_error(self, function, times, sleep_time): + for i in range(times): + try: + function() + break + except psycopg2.Error: + if i == times - 1: + raise + self.logger.warning('failure during retry', exc_info=True) + time.sleep(sleep_time) + def do_schema(self): self.ex('SET search_path = public') self.logger.debug('dropping schema %s', self.schema_temp) @@ -622,11 +648,7 @@ class WcsOlapFeeder(object): else: if self.do_feed: if not self.fake: - self.logger.debug('dropping schema %s', self.schema) - self.drop_tables_sequencially(self.schema) - self.ex('DROP SCHEMA IF EXISTS {schema} CASCADE') - self.logger.debug('renaming schema %s to %s', self.schema + '_temp', self.schema) - self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}') + self.switch_schemas() if 'cubes_model_dirs' in self.config: model_path = os.path.join(self.config['cubes_model_dirs'], '%s.model' % self.schema) @@ -637,6 +659,42 @@ class WcsOlapFeeder(object): self.cur.close() self.connection.close() + def switch_schemas(self): + self.old_schema = truncate_pg_identifier( + self.schema + + '_old_' + + datetime.datetime.now().strftime('%Y%m%d%H%M')) + self.ctx.push({'old_schema': self.old_schema}) + + try: + def switch(): + with self.atomic(): + self.logger.info('renaming schema %s to %s and schema %s to %s', + self.schema, self.old_schema, + self.schema_temp, self.schema) + if self.schema_exists(self.schema): + self.ex('ALTER SCHEMA {schema} RENAME TO {old_schema}') + self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}') + self.retry_on_db_error( + switch, + times=33, + sleep_time=1) + except Exception: + self.logger.warning('failed to switch schemas 3-times in 33 seconds') + raise + + try: + def drop_old_schema(): + self.logger.info('dropping schema %s', self.old_schema) + self.drop_tables_sequencially(self.old_schema) + self.ex('DROP SCHEMA IF EXISTS {old_schema} CASCADE') + self.retry_on_db_error( + drop_old_schema, + times=33, + sleep_time=1) + except Exception: + self.logger.exception('could not drop schema %s', self.old_schema) + def insert_agent(self, name): self.ex('SELECT id FROM {agent_table} WHERE label = %s', vars=(name,)) res = self.cur.fetchone() -- 2.32.0.rc0