Projet

Général

Profil

0001-feeder-preseve-categories-and-form-names-order-36930.patch

Serghei Mihai (congés, retour 15/05), 14 janvier 2020 11:01

Télécharger (11,2 ko)

Voir les différences:

Subject: [PATCH] feeder: preseve categories and form names order (#36930)

 tests/test_wcs.py  |  29 ++++++++++--
 wcs_olap/feeder.py | 114 ++++++++++++++++++++++++++++++++++++++-------
 2 files changed, 122 insertions(+), 21 deletions(-)
tests/test_wcs.py
10 10

  
11 11
import utils
12 12

  
13
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
14

  
13 15

  
14 16
def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
15 17
    olap_cmd()
......
18 20
        ('agent', 'id'),
19 21
        ('agent', 'label'),
20 22
        ('category', 'id'),
23
        ('category', 'ref'),
21 24
        ('category', 'label'),
22 25
        ('channel', 'id'),
23 26
        ('channel', 'label'),
......
67 70
        ('formdata_demande_field_itemOpen', 'id'),
68 71
        ('formdata_demande_field_itemOpen', 'label'),
69 72
        ('formdef', 'id'),
73
        ('formdef', 'ref'),
70 74
        ('formdef', 'category_id'),
71 75
        ('formdef', 'label'),
72 76
        ('hour', 'id'),
......
97 101
            assert json_schema == expected_json_schema
98 102

  
99 103

  
100
def test_requests_exception(wcs, postgres_db, tmpdir, olap_cmd, caplog):
104
def test_requests_exception(wcs, tmpdir, olap_cmd, caplog):
101 105
    with mock.patch('requests.get', side_effect=requests.RequestException('wat!')):
102 106
        with pytest.raises(SystemExit):
103 107
            olap_cmd(no_log_errors=False)
104 108
    assert 'wat!' in caplog.text
105 109

  
106 110

  
107
def test_requests_not_ok(wcs, postgres_db, tmpdir, olap_cmd, caplog):
111
def test_requests_not_ok(wcs, tmpdir, olap_cmd, caplog):
108 112
    with mock.patch('requests.get') as mocked_get:
109 113
        mocked_get.return_value.ok = False
110 114
        mocked_get.return_value.status_code = 401
......
114 118
    assert 'invalid signature' in caplog.text
115 119

  
116 120

  
117
def test_requests_not_json(wcs, postgres_db, tmpdir, olap_cmd, caplog):
121
def test_requests_not_json(wcs, tmpdir, olap_cmd, caplog):
118 122
    with mock.patch('requests.get') as mocked_get:
119 123
        mocked_get.return_value.ok = True
120 124
        mocked_get.return_value.json.side_effect = ValueError('invalid JSON')
......
197 201
            formdata = c.fetchone()
198 202
            assert formdata[0] == bazouka_id
199 203
            assert formdata[1] == open_new_id
204

  
205

  
206
def test_create_reference_column(postgres_db, olap_cmd):
207

  
208
    olap_cmd()
209
    conn = postgres_db.conn()
210
    with postgres_db.conn() as conn:
211
        with conn.cursor() as c:
212
            conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
213
            c.execute('ALTER TABLE olap.category DROP COLUMN ref')
214
            c.execute('ALTER TABLE olap.formdef DROP COLUMN ref')
215

  
216
    olap_cmd()
217

  
218
    with postgres_db.conn() as conn:
219
        with conn.cursor() as c:
220
            c.execute('SELECT * FROM olap.category')
221
            for line in c.fetchall():
222
                assert len(line) == 3
wcs_olap/feeder.py
25 25

  
26 26

  
27 27
def slugify(s):
28
    return s.replace('-', '_').replace(' ', '_')
28
    return s.replace('-', '_').replace(' ', '_').lower()
29 29

  
30 30

  
31 31
class Context(object):
......
368 368
        self.ex(query, vars=(name,))
369 369
        return self.cur.fetchone()[0]
370 370

  
371
    def update_table_sequence_number(self, name):
372
        self.ex("""SELECT setval(pg_get_serial_sequence('{name}', 'id'),
373
        (SELECT GREATEST(1, MAX(id)) FROM {name}))""", ctx={'name': quote(name)})
374

  
371 375
    def create_labeled_table_serial(self, name, comment):
372 376
        self.create_table(
373 377
            name, [['id', 'serial primary key'], ['label', 'varchar']], comment=comment)
......
378 382
                'INSERT INTO {schema_temp}.{name} SELECT * FROM {schema}.{name}',
379 383
                ctx={'name': quote(name)}
380 384
            )
381
            # Update sequence
382
            self.ex("""SELECT setval(pg_get_serial_sequence('{name}', 'id'),
383
            (SELECT GREATEST(1, MAX(id)) FROM {name}))""", ctx={'name': quote(name)})
385
            self.update_table_sequence_number(name)
386

  
387
    def create_referenced_table(self, name, fields, comment):
388
        # add primary key and reference fields
389
        new_fields = [['id', 'serial primary key'], ['ref', 'varchar UNIQUE']] + fields
390
        self.create_table(name, new_fields, comment=comment)
391

  
392
        if self.prev_table_exists(name):
393
            # verify if ref exists in table schema
394
            self.ex("""SELECT column_name FROM information_schema.columns
395
            WHERE table_name=%s AND table_schema='{schema}'""", vars=(name,))
396
            columns = [c[0] for c in self.cur.fetchall()]
397
            if 'ref' in columns:
398
                self.ex('INSERT INTO {schema_temp}.{name} SELECT * FROM {schema}.{name}', ctx={'name': quote(name)})
399
            else:
400
                fields_labels =  ', '.join([f[0] for f in fields])
401
                self.ex('SELECT {fields} FROM {schema}.{name}', ctx={'name': quote(name),
402
                                                                     'fields': fields_labels})
403
                new_values = []
404
                for line in self.cur.fetchall():
405
                    result = (slugify(line[-1]),) + line
406
                    new_values.append(result)
407
                if new_values:
408
                    columns_number = len(line) + 1
409
                    columns_values = ', '.join(['%s' for x in xrange(columns_number)])
410
                    tmpl = ', '.join(['(DEFAULT, %s)' % columns_values] * len(new_values))
411
                    query = 'INSERT INTO {schema_temp}.{name} VALUES %s' % tmpl
412
                    query = self.ex(query, ctx={'name': quote(name)},
413
                                    vars=list(itertools.chain(*new_values)))
414

  
415

  
416
    def do_referenced_data(self, name, data, result_column, update_column='label'):
417
        to_insert = []
418

  
419
        for item in data:
420
            ref = item[0]
421
            self.ex(
422
                'SELECT ref, {column} FROM {name} WHERE ref = %s',
423
                ctx={'name': quote(name), 'column': quote(update_column)},
424
                vars=(ref,))
425
            if self.cur.fetchall():
426
                for item in self.cur.fetchall():
427
                    self.ex('UPDATE {name} SET {column}=%s WHERE ref=%s',
428
                            ctx={'name': quote(name), 'column': quote(update_column)},
429
                            vars=[item[1], ref])
430
            else:
431
                to_insert.append(item)
432

  
433
        if to_insert:
434
            columns_values = ', '.join(['%s' for x in xrange(len(item))])
435
            tmpl = ', '.join(['(DEFAULT, %s)' % columns_values] * len(data))
436

  
437
            query = 'INSERT INTO {name} VALUES %s' % tmpl
438
            self.ex(query, ctx={'name': quote(name)}, # 'column': quote(update_column)},
439
                    vars=list(itertools.chain(*to_insert)))
440

  
441
        result = {}
442
        self.ex('SELECT id, {column} FROM {name}', ctx={'name': quote(name),
443
                                                        'column': result_column})
444
        for _id, column  in self.cur.fetchall():
445
            result[column] = _id
446
        return result
447

  
384 448

  
385 449
    def create_labeled_table(self, name, labels, comment=None):
386 450
        self.create_table(
......
446 510
    def add_dim(self, **kwargs):
447 511
        self.dimensions.append(self.tpl(kwargs))
448 512

  
513
    def do_category_table(self):
514
        fields = [['label', 'varchar']]
515
        table_name = self.ctx['category_table']
516
        self.create_referenced_table(table_name, fields, 'catégorie')
517
        categories_data = [(c.id, c.name) for c in self.categories]
518
        tmp_cat_map = self.do_referenced_data(table_name, categories_data, 'label')
519
        self.update_table_sequence_number(table_name)
520
        # remap categories ids to ids in the table
521
        return dict((c.id, tmp_cat_map[c.name]) for c in self.categories)
522

  
523
    def do_formdef_table(self):
524
        categories_mapping = self.do_category_table()
525

  
526
        formdef_fields = [['category_id', 'integer REFERENCES {category_table} (id)'],
527
                          ['label', 'varchar']
528
        ]
529
        table_name = self.ctx['form_table']
530
        self.create_referenced_table(table_name, formdef_fields, 'types de formulaire')
531

  
532
        formdefs_data = [(form.slug, categories_mapping.get(form.schema.category_id),
533
                          form.schema.name) for form in self.formdefs if form.count]
534
        self.formdefs_mapping = self.do_referenced_data(table_name, formdefs_data, 'ref')
535
        self.update_table_sequence_number(table_name)
536

  
449 537
    def do_base_table(self):
450 538
        # channels
451 539
        self.create_labeled_table('channel', [[c[0], c[2]] for c in self.channels],
......
457 545
        self.role_mapping = dict(
458 546
            (role.id, tmp_role_map[role.name]) for role in self.roles)
459 547

  
460
        # categories
461
        tmp_cat_map = self.create_labeled_table(
462
            'category', enumerate(c.name for c in self.categories), comment='catégorie')
463
        self.categories_mapping = dict((c.id, tmp_cat_map[c.name]) for c in self.categories)
548
        # forms
549
        self.do_formdef_table()
464 550

  
465 551
        self.create_labeled_table('hour', zip(range(0, 24), map(str, range(0, 24))),
466 552
                                  comment='heures')
467 553

  
468 554
        self.create_labeled_table('status', self.status,
469 555
                                  comment='statuts simplifiés')
470
        self.ex('CREATE TABLE {form_table} (id serial PRIMARY KEY,'
471
                ' category_id integer REFERENCES {category_table} (id),'
472
                ' label varchar)')
473
        self.ex('COMMENT ON TABLE {form_table} IS %s', vars=('types de formulaire',))
556

  
474 557
        # agents
475 558
        self.create_labeled_table_serial('agent', comment='agents')
476 559

  
......
606 689
        self.status_mapping = dict((s.id, tmp_status_map[s.name]) for s in statuses)
607 690

  
608 691
    def do_data_table(self):
609
        self.ex('INSERT INTO {form_table} (category_id, label) VALUES (%s, %s) RETURNING (id)',
610
                vars=[self.categories_mapping.get(self.formdef.schema.category_id),
611
                      self.formdef.schema.name])
612
        self.formdef_sql_id = self.cur.fetchone()[0]
613

  
614 692
        columns = OrderedDict()
615 693
        columns['status_id'] = {'sql_col_name': 'status_id', 'sql_col_def': 'smallint REFERENCES {status_table} (id)'}
616 694

  
......
766 844
            if channel == 'web' and data.submission.backoffice:
767 845
                channel = 'backoffice'
768 846
            row = {
769
                'formdef_id': self.formdef_sql_id,
847
                'formdef_id': self.formdefs_mapping[self.formdef.slug],
770 848
                'receipt_time': data.receipt_time,
771 849
                'hour_id': data.receipt_time.hour,
772 850
                'channel_id': self.channel_to_id[channel],
773
-