Projet

Général

Profil

0002-misc-add-one-time-token-model-39745.patch

Benjamin Dauvergne, 11 février 2020 14:54

Télécharger (9,89 ko)

Voir les différences:

Subject: [PATCH 2/3] misc: add one-time token model (#39745)

 .../management/commands/cleanupauthentic.py   |  2 +
 src/authentic2/migrations/0026_token.py       | 29 ++++++++
 src/authentic2/models.py                      | 67 ++++++++++++++++-
 src/authentic2/utils/dt.py                    | 37 ++++++++++
 tests/test_token.py                           | 74 +++++++++++++++++++
 5 files changed, 208 insertions(+), 1 deletion(-)
 create mode 100644 src/authentic2/migrations/0026_token.py
 create mode 100644 src/authentic2/utils/dt.py
 create mode 100644 tests/test_token.py
src/authentic2/idp/management/commands/cleanupauthentic.py
39 39
        manager = getattr(model, 'objects', None)
40 40
        if hasattr(manager, 'cleanup'):
41 41
            manager.cleanup()
42
        if hasattr(model, 'cleanup'):
43
            model.cleanup()
src/authentic2/migrations/0026_token.py
1
# -*- coding: utf-8 -*-
2
# Generated by Django 1.11.20 on 2020-02-11 10:27
3
from __future__ import unicode_literals
4

  
5
import django.contrib.postgres.fields.jsonb
6
from django.db import migrations, models
7
import uuid
8

  
9

  
10
class Migration(migrations.Migration):
11

  
12
    dependencies = [
13
        ('authentic2', '0025_auto_20191009_1047'),
14
    ]
15

  
16
    operations = [
17
        migrations.CreateModel(
18
            name='Token',
19
            fields=[
20
                ('uuid', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False, verbose_name='Identifier')),
21
                ('kind', models.CharField(max_length=32, verbose_name='Kind')),
22
                ('content', django.contrib.postgres.fields.jsonb.JSONField(blank=True, verbose_name='Content')),
23
                ('expires', models.DateTimeField(verbose_name='Expires')),
24
            ],
25
            options={
26
                'ordering': ('-expires', 'kind', 'uuid'),
27
            },
28
        ),
29
    ]
src/authentic2/models.py
16 16

  
17 17
import time
18 18
import uuid
19

  
19 20
from django.utils.http import urlquote
20 21
from django.conf import settings
21 22
from django.db import models, transaction
22 23
from django.db.models.query import Q
23
from django.utils import six
24
from django.utils import six, timezone
24 25
from django.utils.translation import ugettext_lazy as _
25 26
from django.utils.six.moves.urllib import parse as urlparse
26 27
from django.core.exceptions import ValidationError
27 28
from django.contrib.contenttypes.models import ContentType
29
from django.contrib.postgres.fields import jsonb
28 30

  
29 31
from model_utils.managers import QueryManager
30 32

  
31 33
from authentic2.a2_rbac.models import Role
34
from authentic2.crypto import base64url_encode, base64url_decode
35
from authentic2.utils.dt import interpret_expires
32 36
from django_rbac.utils import get_role_model_name
33 37

  
34 38
try:
......
456 460
class AuthorizedRole(models.Model):
457 461
    service = models.ForeignKey(Service, on_delete=models.CASCADE)
458 462
    role = models.ForeignKey(get_role_model_name(), on_delete=models.CASCADE)
463

  
464

  
465
class Token(models.Model):
466
    uuid = models.UUIDField(
467
        verbose_name=_('Identifier'),
468
        primary_key=True,
469
        default=uuid.uuid4,
470
        editable=False)
471
    kind = models.CharField(
472
        verbose_name=_('Kind'),
473
        max_length=32)
474
    content = jsonb.JSONField(
475
        verbose_name=_('Content'),
476
        blank=True)
477
    expires = models.DateTimeField(
478
        verbose_name=_('Expires'))
479

  
480
    class Meta:
481
        ordering = ('-expires', 'kind', 'uuid')
482

  
483
    @property
484
    def uuid_b64url(self):
485
        return base64url_encode(self.uuid.bytes).decode('ascii')
486

  
487
    @classmethod
488
    def create(cls, kind, content, expires=None):
489
        return cls.objects.create(
490
            kind=kind,
491
            content=content,
492
            expires=interpret_expires(expires, default=60))
493

  
494
    @classmethod
495
    def _decode_uuid(cls, _uuid):
496
        try:
497
            _uuid = uuid.UUID(_uuid)
498
        except (TypeError, ValueError):
499
            pass
500
        else:
501
            return _uuid
502

  
503
        if isinstance(_uuid, six.text_type):
504
            _uuid = _uuid.encode('ascii')
505
            _uuid = base64url_decode(_uuid)
506
        return uuid.UUID(bytes=_uuid)
507

  
508
    @classmethod
509
    def use(cls, kind, _uuid, now=None, delete=True):
510
        '''Can raise TypeError, ValueError if uuid is invalid, DoesNotExist if uuid is unknown or expired.'''
511
        now = now or timezone.now()
512
        if not isinstance(_uuid, uuid.UUID):
513
            _uuid = cls._decode_uuid(_uuid)
514
        with transaction.atomic():
515
            token = cls.objects.get(kind=kind, uuid=_uuid, expires__gt=now)
516
            if delete:
517
                token.delete()
518
            return token
519

  
520
    @classmethod
521
    def cleanup(cls, now=None):
522
        now = now or timezone.now()
523
        cls.objects.filter(expires__lte=now).delete()
src/authentic2/utils/dt.py
1
# authentic2 - versatile identity manager
2
# Copyright (C) 2010-2020 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17

  
18
import datetime
19

  
20
from django.utils import six, timezone
21

  
22

  
23
def interpret_expires(expires, default=None):
24
    if expires is None:
25
        if default is None:
26
            raise ValueError('expires is None and no default was given')
27
        else:
28
            expires = default
29

  
30
    if isinstance(expires, six.integer_types):
31
        return timezone.now() + datetime.timedelta(seconds=expires)
32
    elif isinstance(expires, datetime.timedelta):
33
        return timezone.now() + expires
34
    elif isinstance(expires, datetime.datetime):
35
        return expires
36
    else:
37
        raise ValueError('invalid expires %r' % expires)
tests/test_token.py
1
# authentic2 - versatile identity manager
2
# Copyright (C) 2010-2020 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
from __future__ import unicode_literals
18

  
19
import pytest
20

  
21
from authentic2.models import Token
22

  
23

  
24
def test_base(db):
25
    assert Token.objects.count() == 0
26
    token = Token.create('su', {'user_pk': 36})
27
    assert Token.objects.count() == 1
28
    assert Token.use('su', token.uuid, delete=False) == token
29
    assert Token.use('su', token.uuid.bytes, delete=False) == token
30
    assert Token.use('su', token.uuid.hex, delete=False) == token
31
    assert Token.use('su', token.uuid_b64url, delete=False) == token
32
    token2 = Token.use('su', str(token.uuid), delete=False)
33

  
34
    with pytest.raises(Token.DoesNotExist):
35
        Token.use('wtf', str(token.uuid))
36

  
37
    assert token2.content == {'user_pk': 36}
38
    Token.use('su', token.uuid)
39
    assert Token.objects.count() == 0
40
    with pytest.raises(Token.DoesNotExist):
41
        Token.use('su', token.uuid)
42

  
43

  
44
def test_default_expires(db, freezer):
45
    freezer.move_to('2020-01-01')
46
    assert Token.objects.count() == 0
47
    token = Token.create('su', {'user_pk': 36})
48
    Token.use('su', str(token.uuid), delete=False)
49
    freezer.tick(60)  # advance 60 seconds
50
    with pytest.raises(Token.DoesNotExist):
51
        Token.use('su', str(token.uuid), delete=False)
52

  
53

  
54
def test_default_integer_expires(db, freezer):
55
    freezer.move_to('2020-01-01')
56
    assert Token.objects.count() == 0
57
    token = Token.create('su', {'user_pk': 36}, expires=120)
58
    Token.use('su', str(token.uuid), delete=False)
59
    freezer.tick(60)  # advance 60 seconds
60
    Token.use('su', str(token.uuid), delete=False)
61
    freezer.tick(60)  # advance 60 seconds
62
    with pytest.raises(Token.DoesNotExist):
63
        Token.use('su', str(token.uuid), delete=False)
64

  
65

  
66
def test_cleanup(db, freezer):
67
    freezer.move_to('2020-01-01')
68
    Token.create('su', {'user_pk': 36})
69
    assert Token.objects.count() == 1
70
    Token.cleanup()
71
    assert Token.objects.count() == 1
72
    freezer.tick(60)  # advance 60 seconds
73
    Token.cleanup()
74
    assert Token.objects.count() == 0
0
-