Projet

Général

Profil

0001-feeder-prevent-situation-of-half-dropped-schema-5465.patch

Benjamin Dauvergne, 07 août 2021 19:05

Télécharger (5,16 ko)

Voir les différences:

Subject: [PATCH] feeder: prevent situation of half-dropped schema (#54658)

To prevent loosing currently loaded data wcs-olap, failing ro rename the
temporary schema to its final name, wcs-olap will:

- first, inside a transaction, rename the current schema instead of
  dropping it, then rename the new schema to the current schema's name;
  in case of failure it will retry three times sleeping 16 seconds between
  each attempt;

- if successfull, drop the renamed old schema, again in a retry loop, if
  it fails to drop it logs an error, without aborting the current
  feeding.
 wcs_olap/feeder.py | 66 +++++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 60 insertions(+), 6 deletions(-)
wcs_olap/feeder.py
14 14
from .utils import Whatever
15 15
import psycopg2
16 16
import psycopg2.errorcodes
17
import time
17 18

  
18 19
from cached_property import cached_property
19 20
from wcs_olap.wcs_api import WcsApiError
......
329 330
    def default_ctx(self):
330 331
        return self.ctx.as_dict()
331 332

  
333
    def schema_exists(self, name):
334
        self.ex('SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s', vars=[name])
335
        return bool(self.cur.fetchone())
336

  
332 337
    def ex(self, query, ctx=None, vars=None):
333 338
        ctx = ctx or {}
334 339
        ctx.update(self.default_ctx)
......
337 342
        try:
338 343
            self.cur.execute(sql, vars=vars)
339 344
        except Exception as e:
340
            self.logger.error('Failed to execute %r with vars %s, raised %s', sql, reprlib.repr(vars or []), e)
345
            self.logger.warning('Failed to execute %r with vars %s, raised %s', sql, reprlib.repr(vars or []), e)
341 346
            raise
342 347

  
348
    def retry_on_db_error(self, function, times, sleep_time, transaction=False):
349
        for i in range(times):
350
            try:
351
                if transaction:
352
                    self.ex('BEGIN')
353
                function()
354
                if transaction:
355
                    self.ex('COMMIT')
356
                break
357
            except psycopg2.Error:
358
                if transaction:
359
                    self.ex('ROLLBACK')
360
                if i == times - 1:
361
                    raise
362
                self.logger.warning('failure during retry', exc_info=True)
363
                time.sleep(sleep_time)
364

  
343 365
    def do_schema(self):
344 366
        self.ex('SET search_path = public')
345 367
        self.logger.debug('dropping schema %s', self.schema_temp)
......
620 642
        else:
621 643
            if self.do_feed:
622 644
                if not self.fake:
623
                    self.logger.debug('dropping schema %s', self.schema)
624
                    self.drop_tables_sequencially(self.schema)
625
                    self.ex('DROP SCHEMA IF EXISTS {schema} CASCADE')
626
                    self.logger.debug('renaming schema %s to %s', self.schema + '_temp', self.schema)
627
                    self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}')
645
                    self.switch_schemas()
628 646

  
629 647
                    if 'cubes_model_dirs' in self.config:
630 648
                        model_path = os.path.join(self.config['cubes_model_dirs'], '%s.model' % self.schema)
......
635 653
            self.cur.close()
636 654
            self.connection.close()
637 655

  
656
    def switch_schemas(self):
657
        self.old_schema = truncate_pg_identifier(
658
            self.schema
659
            + '_old_'
660
            + datetime.datetime.now().strftime('%Y%m%d%H%M'))
661
        self.ctx.push({'old_schema': self.old_schema})
662

  
663
        try:
664
            def switch():
665
                self.logger.info('renaming schema %s to %s and schema %s to %s',
666
                                 self.schema, self.old_schema,
667
                                 self.schema_temp, self.schema)
668
                if self.schema_exists(self.schema):
669
                    self.ex('ALTER SCHEMA {schema} RENAME TO {old_schema}')
670
                self.ex('ALTER SCHEMA {schema_temp} RENAME TO {schema}')
671
            self.retry_on_db_error(
672
                switch,
673
                times=3,
674
                sleep_time=16,
675
                transaction=True)
676
        except Exception:
677
            self.logger.warning('failed to switch schemas 3-times in 33 seconds')
678
            raise
679

  
680
        try:
681
            def drop_old_schema():
682
                self.logger.info('dropping schema %s', self.old_schema)
683
                self.drop_tables_sequencially(self.old_schema)
684
                self.ex('DROP SCHEMA IF EXISTS {old_schema} CASCADE')
685
            self.retry_on_db_error(
686
                drop_old_schema,
687
                times=4,
688
                sleep_time=16)
689
        except Exception:
690
            self.logger.exception('could not drop schema %s', self.old_schema)
691

  
638 692
    def insert_agent(self, name):
639 693
        self.ex('SELECT id FROM {agent_table} WHERE label = %s', vars=(name,))
640 694
        res = self.cur.fetchone()
641
-