Projet

Général

Profil

0001-feeder-prevent-situation-of-half-dropped-schema-5465.patch

Benjamin Dauvergne, 17 août 2021 16:25

Télécharger (4,97 ko)

Voir les différences:

Subject: [PATCH] feeder: prevent situation of half-dropped schema (#54658)

To prevent loosing currently loaded data wcs-olap, failing ro rename the
temporary schema to its final name, wcs-olap will:

- first, inside a transaction, rename the current schema instead of
  dropping it, then rename the new schema to the current schema's name;
  in case of failure it will retry 33 times sleeping 1 second between
  each attempt;

- if successfull, drop the renamed old schema, again in a retry loop, if
  it fails to drop it logs an error, without aborting the current
  feeding.
 wcs_olap/feeder.py | 68 ++++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 63 insertions(+), 5 deletions(-)
wcs_olap/feeder.py
14 14
from .utils import Whatever
15 15
import psycopg2
16 16
import psycopg2.errorcodes
17
import time
17 18

  
18 19
from cached_property import cached_property
19 20
from wcs_olap.wcs_api import WcsApiError
......
330 331
    def default_ctx(self):
331 332
        return self.ctx.as_dict()
332 333

  
334
    def schema_exists(self, name):
335
        self.ex('SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s', vars=[name])
336
        return bool(self.cur.fetchone())
337

  
333 338
    def ex(self, query, ctx=None, vars=None):
334 339
        ctx = ctx or {}
335 340
        ctx.update(self.default_ctx)
......
341 346
            self.logger.warning('Failed to execute %r with vars %s, raised %s', sql, reprlib.repr(vars or []), e)
342 347
            raise
343 348

  
349
    @contextlib.contextmanager
350
    def atomic(self):
351
        self.ex('BEGIN')
352
        try:
353
            yield
354
            self.ex('COMMIT')
355
        except Exception:
356
            self.ex('ROLLBACK')
357
            raise
358

  
359
    def retry_on_db_error(self, function, times, sleep_time):
360
        for i in range(times):
361
            try:
362
                function()
363
                break
364
            except psycopg2.Error:
365
                if i == times - 1:
366
                    raise
367
                self.logger.warning('failure during retry', exc_info=True)
368
                time.sleep(sleep_time)
369

  
344 370
    def do_schema(self):
345 371
        self.ex('SET search_path = public')
346 372
        self.logger.debug('dropping schema %s', self.schema_temp)
......
622 648
        else:
623 649
            if self.do_feed:
624 650
                if not self.fake:
625
                    self.logger.debug('dropping schema %s', self.schema)
626
                    self.drop_tables_sequencially(self.schema)
627
                    self.ex('DROP SCHEMA IF EXISTS {schema} CASCADE')
628
                    self.logger.debug('renaming schema %s to %s', self.schema + '_temp', self.schema)
629
                    self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}')
651
                    self.switch_schemas()
630 652

  
631 653
                    if 'cubes_model_dirs' in self.config:
632 654
                        model_path = os.path.join(self.config['cubes_model_dirs'], '%s.model' % self.schema)
......
637 659
            self.cur.close()
638 660
            self.connection.close()
639 661

  
662
    def switch_schemas(self):
663
        self.old_schema = truncate_pg_identifier(
664
            self.schema
665
            + '_old_'
666
            + datetime.datetime.now().strftime('%Y%m%d%H%M'))
667
        self.ctx.push({'old_schema': self.old_schema})
668

  
669
        try:
670
            def switch():
671
                with self.atomic():
672
                    self.logger.info('renaming schema %s to %s and schema %s to %s',
673
                                     self.schema, self.old_schema,
674
                                     self.schema_temp, self.schema)
675
                    if self.schema_exists(self.schema):
676
                        self.ex('ALTER SCHEMA {schema} RENAME TO {old_schema}')
677
                    self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}')
678
            self.retry_on_db_error(
679
                switch,
680
                times=33,
681
                sleep_time=1)
682
        except Exception:
683
            self.logger.warning('failed to switch schemas 3-times in 33 seconds')
684
            raise
685

  
686
        try:
687
            def drop_old_schema():
688
                self.logger.info('dropping schema %s', self.old_schema)
689
                self.drop_tables_sequencially(self.old_schema)
690
                self.ex('DROP SCHEMA IF EXISTS {old_schema} CASCADE')
691
            self.retry_on_db_error(
692
                drop_old_schema,
693
                times=33,
694
                sleep_time=1)
695
        except Exception:
696
            self.logger.exception('could not drop schema %s', self.old_schema)
697

  
640 698
    def insert_agent(self, name):
641 699
        self.ex('SELECT id FROM {agent_table} WHERE label = %s', vars=(name,))
642 700
        res = self.cur.fetchone()
643
-