0002-misc-add-one-time-token-model-39745.patch
src/authentic2/idp/management/commands/cleanupauthentic.py | ||
---|---|---|
39 | 39 |
manager = getattr(model, 'objects', None) |
40 | 40 |
if hasattr(manager, 'cleanup'): |
41 | 41 |
manager.cleanup() |
42 |
if hasattr(model, 'cleanup'): |
|
43 |
model.cleanup() |
src/authentic2/migrations/0026_token.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
# Generated by Django 1.11.20 on 2020-02-11 10:27 |
|
3 |
from __future__ import unicode_literals |
|
4 | ||
5 |
import django.contrib.postgres.fields.jsonb |
|
6 |
from django.db import migrations, models |
|
7 |
import uuid |
|
8 | ||
9 | ||
10 |
class Migration(migrations.Migration): |
|
11 | ||
12 |
dependencies = [ |
|
13 |
('authentic2', '0025_auto_20191009_1047'), |
|
14 |
] |
|
15 | ||
16 |
operations = [ |
|
17 |
migrations.CreateModel( |
|
18 |
name='Token', |
|
19 |
fields=[ |
|
20 |
('uuid', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False, verbose_name='Identifier')), |
|
21 |
('kind', models.CharField(max_length=32, verbose_name='Kind')), |
|
22 |
('content', django.contrib.postgres.fields.jsonb.JSONField(blank=True, verbose_name='Content')), |
|
23 |
('expires', models.DateTimeField(verbose_name='Expires')), |
|
24 |
], |
|
25 |
options={ |
|
26 |
'ordering': ('-expires', 'kind', 'uuid'), |
|
27 |
}, |
|
28 |
), |
|
29 |
] |
src/authentic2/models.py | ||
---|---|---|
16 | 16 | |
17 | 17 |
import time |
18 | 18 |
import uuid |
19 | ||
19 | 20 |
from django.utils.http import urlquote |
20 | 21 |
from django.conf import settings |
21 | 22 |
from django.db import models, transaction |
22 | 23 |
from django.db.models.query import Q |
23 |
from django.utils import six |
|
24 |
from django.utils import six, timezone
|
|
24 | 25 |
from django.utils.translation import ugettext_lazy as _ |
25 | 26 |
from django.utils.six.moves.urllib import parse as urlparse |
26 | 27 |
from django.core.exceptions import ValidationError |
27 | 28 |
from django.contrib.contenttypes.models import ContentType |
29 |
from django.contrib.postgres.fields import jsonb |
|
28 | 30 | |
29 | 31 |
from model_utils.managers import QueryManager |
30 | 32 | |
31 | 33 |
from authentic2.a2_rbac.models import Role |
34 |
from authentic2.crypto import base64url_encode, base64url_decode |
|
35 |
from authentic2.utils.dt import interpret_expires |
|
32 | 36 |
from django_rbac.utils import get_role_model_name |
33 | 37 | |
34 | 38 |
try: |
... | ... | |
456 | 460 |
class AuthorizedRole(models.Model): |
457 | 461 |
service = models.ForeignKey(Service, on_delete=models.CASCADE) |
458 | 462 |
role = models.ForeignKey(get_role_model_name(), on_delete=models.CASCADE) |
463 | ||
464 | ||
465 |
class Token(models.Model): |
|
466 |
uuid = models.UUIDField( |
|
467 |
verbose_name=_('Identifier'), |
|
468 |
primary_key=True, |
|
469 |
default=uuid.uuid4, |
|
470 |
editable=False) |
|
471 |
kind = models.CharField( |
|
472 |
verbose_name=_('Kind'), |
|
473 |
max_length=32) |
|
474 |
content = jsonb.JSONField( |
|
475 |
verbose_name=_('Content'), |
|
476 |
blank=True) |
|
477 |
expires = models.DateTimeField( |
|
478 |
verbose_name=_('Expires')) |
|
479 | ||
480 |
class Meta: |
|
481 |
ordering = ('-expires', 'kind', 'uuid') |
|
482 | ||
483 |
@property |
|
484 |
def uuid_b64url(self): |
|
485 |
return base64url_encode(self.uuid.bytes).decode('ascii') |
|
486 | ||
487 |
@classmethod |
|
488 |
def create(cls, kind, content, expires=None): |
|
489 |
return cls.objects.create( |
|
490 |
kind=kind, |
|
491 |
content=content, |
|
492 |
expires=interpret_expires(expires, default=60)) |
|
493 | ||
494 |
@classmethod |
|
495 |
def _decode_uuid(cls, _uuid): |
|
496 |
try: |
|
497 |
_uuid = uuid.UUID(_uuid) |
|
498 |
except (TypeError, ValueError): |
|
499 |
pass |
|
500 |
else: |
|
501 |
return _uuid |
|
502 | ||
503 |
if isinstance(_uuid, six.text_type): |
|
504 |
_uuid = _uuid.encode('ascii') |
|
505 |
_uuid = base64url_decode(_uuid) |
|
506 |
return uuid.UUID(bytes=_uuid) |
|
507 | ||
508 |
@classmethod |
|
509 |
def use(cls, kind, _uuid, now=None, delete=True): |
|
510 |
'''Can raise TypeError, ValueError if uuid is invalid, DoesNotExist if uuid is unknown or expired.''' |
|
511 |
now = now or timezone.now() |
|
512 |
if not isinstance(_uuid, uuid.UUID): |
|
513 |
_uuid = cls._decode_uuid(_uuid) |
|
514 |
with transaction.atomic(): |
|
515 |
token = cls.objects.get(kind=kind, uuid=_uuid, expires__gt=now) |
|
516 |
if delete: |
|
517 |
token.delete() |
|
518 |
return token |
|
519 | ||
520 |
@classmethod |
|
521 |
def cleanup(cls, now=None): |
|
522 |
now = now or timezone.now() |
|
523 |
cls.objects.filter(expires__lte=now).delete() |
src/authentic2/utils/dt.py | ||
---|---|---|
1 |
# authentic2 - versatile identity manager |
|
2 |
# Copyright (C) 2010-2020 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 | ||
18 |
import datetime |
|
19 | ||
20 |
from django.utils import six, timezone |
|
21 | ||
22 | ||
23 |
def interpret_expires(expires, default=None): |
|
24 |
if expires is None: |
|
25 |
if default is None: |
|
26 |
raise ValueError('expires is None and no default was given') |
|
27 |
else: |
|
28 |
expires = default |
|
29 | ||
30 |
if isinstance(expires, six.integer_types): |
|
31 |
return timezone.now() + datetime.timedelta(seconds=expires) |
|
32 |
elif isinstance(expires, datetime.timedelta): |
|
33 |
return timezone.now() + expires |
|
34 |
elif isinstance(expires, datetime.datetime): |
|
35 |
return expires |
|
36 |
else: |
|
37 |
raise ValueError('invalid expires %r' % expires) |
tests/test_token.py | ||
---|---|---|
1 |
# authentic2 - versatile identity manager |
|
2 |
# Copyright (C) 2010-2020 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
from __future__ import unicode_literals |
|
18 | ||
19 |
import pytest |
|
20 | ||
21 |
from authentic2.models import Token |
|
22 | ||
23 | ||
24 |
def test_base(db): |
|
25 |
assert Token.objects.count() == 0 |
|
26 |
token = Token.create('su', {'user_pk': 36}) |
|
27 |
assert Token.objects.count() == 1 |
|
28 |
assert Token.use('su', token.uuid, delete=False) == token |
|
29 |
assert Token.use('su', token.uuid.bytes, delete=False) == token |
|
30 |
assert Token.use('su', token.uuid.hex, delete=False) == token |
|
31 |
assert Token.use('su', token.uuid_b64url, delete=False) == token |
|
32 |
token2 = Token.use('su', str(token.uuid), delete=False) |
|
33 | ||
34 |
with pytest.raises(Token.DoesNotExist): |
|
35 |
Token.use('wtf', str(token.uuid)) |
|
36 | ||
37 |
assert token2.content == {'user_pk': 36} |
|
38 |
Token.use('su', token.uuid) |
|
39 |
assert Token.objects.count() == 0 |
|
40 |
with pytest.raises(Token.DoesNotExist): |
|
41 |
Token.use('su', token.uuid) |
|
42 | ||
43 | ||
44 |
def test_default_expires(db, freezer): |
|
45 |
freezer.move_to('2020-01-01') |
|
46 |
assert Token.objects.count() == 0 |
|
47 |
token = Token.create('su', {'user_pk': 36}) |
|
48 |
Token.use('su', str(token.uuid), delete=False) |
|
49 |
freezer.tick(60) # advance 60 seconds |
|
50 |
with pytest.raises(Token.DoesNotExist): |
|
51 |
Token.use('su', str(token.uuid), delete=False) |
|
52 | ||
53 | ||
54 |
def test_default_integer_expires(db, freezer): |
|
55 |
freezer.move_to('2020-01-01') |
|
56 |
assert Token.objects.count() == 0 |
|
57 |
token = Token.create('su', {'user_pk': 36}, expires=120) |
|
58 |
Token.use('su', str(token.uuid), delete=False) |
|
59 |
freezer.tick(60) # advance 60 seconds |
|
60 |
Token.use('su', str(token.uuid), delete=False) |
|
61 |
freezer.tick(60) # advance 60 seconds |
|
62 |
with pytest.raises(Token.DoesNotExist): |
|
63 |
Token.use('su', str(token.uuid), delete=False) |
|
64 | ||
65 | ||
66 |
def test_cleanup(db, freezer): |
|
67 |
freezer.move_to('2020-01-01') |
|
68 |
Token.create('su', {'user_pk': 36}) |
|
69 |
assert Token.objects.count() == 1 |
|
70 |
Token.cleanup() |
|
71 |
assert Token.objects.count() == 1 |
|
72 |
freezer.tick(60) # advance 60 seconds |
|
73 |
Token.cleanup() |
|
74 |
assert Token.objects.count() == 0 |
|
0 |
- |