0002-custom_user-specialize-free_text_search-for-common-s.patch
src/authentic2/custom_user/managers.py | ||
---|---|---|
17 | 17 |
import datetime |
18 | 18 |
import logging |
19 | 19 |
import unicodedata |
20 |
import uuid |
|
20 | 21 | |
21 | 22 |
from django.contrib.contenttypes.models import ContentType |
22 | 23 |
from django.contrib.postgres.search import TrigramDistance |
24 |
from django.core.exceptions import ValidationError |
|
23 | 25 |
from django.db import models, transaction, connection |
24 | 26 |
from django.db.models import F, Value, FloatField, Subquery, OuterRef |
25 | 27 |
from django.db.models.functions import Lower, Coalesce |
26 |
from django.utils import six |
|
27 |
from django.utils import timezone |
|
28 |
from django.utils import timezone, six |
|
28 | 29 |
from django.contrib.auth.models import BaseUserManager |
29 | 30 | |
30 | 31 |
from authentic2 import app_settings |
31 | 32 |
from authentic2.models import Attribute, AttributeValue, UserExternalId |
32 | 33 |
from authentic2.utils.lookups import Unaccent, ImmutableConcat |
34 |
from authentic2.utils.date import parse_date |
|
35 |
from authentic2.attribute_kinds import clean_number |
|
33 | 36 | |
34 | 37 | |
35 | 38 |
class UserQuerySet(models.QuerySet): |
36 | 39 | |
37 | 40 |
def free_text_search(self, search): |
41 |
search = search.strip() |
|
42 | ||
43 |
if '@' in search and len(search.split()) == 1: |
|
44 |
return self.filter(email__icontains=search).order_by('email') |
|
45 | ||
46 |
try: |
|
47 |
guid = uuid.UUID(search) |
|
48 |
except ValueError: |
|
49 |
pass |
|
50 |
else: |
|
51 |
return self.filter(uuid=guid.hex) |
|
52 | ||
53 |
try: |
|
54 |
phone_number = clean_number(search) |
|
55 |
except ValidationError: |
|
56 |
pass |
|
57 |
else: |
|
58 |
attribute_values = AttributeValue.objects.filter( |
|
59 |
content__contains=phone_number, attribute__kind='phone_number') |
|
60 |
qs = self.filter(attribute_values__in=attribute_values).order_by('last_name', 'first_name') |
|
61 |
if qs.exists(): |
|
62 |
return qs |
|
63 | ||
64 |
try: |
|
65 |
date = parse_date(search) |
|
66 |
except ValueError: |
|
67 |
pass |
|
68 |
else: |
|
69 |
attribute_values = AttributeValue.objects.filter( |
|
70 |
content__contains=date.isoformat(), attribute__kind='birthdate') |
|
71 |
qs = self.filter(attribute_values__in=attribute_values).order_by('last_name', 'first_name') |
|
72 |
if qs.exists(): |
|
73 |
return qs |
|
74 | ||
75 |
qs1 = self.find_duplicates(fullname=search, limit=None) |
|
76 |
qs2 = self.free_text_search_attributes(search).annotate(dist=Value(1.0)) |
|
77 |
if qs2.query.distinct: |
|
78 |
qs1 = qs1.distinct() |
|
79 |
return (qs1 | qs2).order_by('dist', 'last_name', 'first_name') |
|
80 | ||
81 |
def free_text_search_attributes(self, search): |
|
38 | 82 |
terms = search.split() |
39 | 83 | |
40 | 84 |
if not terms: |
... | ... | |
44 | 88 |
queries = [] |
45 | 89 |
for term in terms: |
46 | 90 |
q = None |
47 | ||
48 | 91 |
specific_queries = [] |
49 | 92 |
for a in searchable_attributes: |
50 | 93 |
kind = a.get_kind() |
... | ... | |
62 | 105 | |
63 | 106 |
q = ( |
64 | 107 |
models.query.Q(username__icontains=term) |
65 |
| models.query.Q(first_name__icontains=term) |
|
66 |
| models.query.Q(last_name__icontains=term) |
|
67 |
| models.query.Q(email__icontains=term) |
|
68 | 108 |
) |
69 | 109 |
for a in searchable_attributes: |
70 | 110 |
if a.name in ('first_name', 'last_name'): |
... | ... | |
78 | 118 |
self = self.distinct() |
79 | 119 |
return self |
80 | 120 | |
81 |
def find_duplicates(self, first_name=None, last_name=None, fullname=None, birthdate=None): |
|
121 |
def find_duplicates(self, first_name=None, last_name=None, fullname=None, birthdate=None, limit=5):
|
|
82 | 122 |
with connection.cursor() as cursor: |
83 | 123 |
cursor.execute( |
84 | 124 |
"SET pg_trgm.similarity_threshold = %f" % app_settings.A2_DUPLICATES_THRESHOLD |
... | ... | |
96 | 136 |
qs = qs.filter(name__trigram_similar=name) |
97 | 137 |
qs = qs.annotate(dist=TrigramDistance('name', name)) |
98 | 138 |
qs = qs.order_by('dist') |
99 |
qs = qs[:5] |
|
139 |
if limit is not None: |
|
140 |
qs = qs[:limit] |
|
100 | 141 | |
101 | 142 |
# alter distance according to additionnal parameters |
102 | 143 |
if birthdate: |
src/authentic2/utils/date.py | ||
---|---|---|
1 |
# authentic2 - versatile identity manager |
|
2 |
# Copyright (C) 2010-2019 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
from datetime import datetime |
|
18 | ||
19 |
from django.utils import formats |
|
20 | ||
21 | ||
22 |
def parse_date(formatted_date): |
|
23 |
parsed_date = None |
|
24 |
for date_format in formats.get_format('DATE_INPUT_FORMATS'): |
|
25 |
try: |
|
26 |
parsed_date = datetime.strptime(formatted_date, date_format) |
|
27 |
except ValueError: |
|
28 |
continue |
|
29 |
else: |
|
30 |
break |
|
31 |
if not parsed_date: |
|
32 |
raise ValueError |
|
33 |
return parsed_date.date() |
tests/test_custom_user.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU Affero General Public License |
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
from datetime import date |
|
18 | ||
17 | 19 |
from django.contrib.auth import get_user_model |
18 | 20 | |
21 |
from authentic2.models import Attribute |
|
19 | 22 |
from django_rbac.utils import get_permission_model, get_role_model |
20 | 23 | |
24 |
import pytest |
|
25 | ||
21 | 26 |
Permission = get_permission_model() |
22 | 27 |
Role = get_role_model() |
23 | 28 |
User = get_user_model() |
... | ... | |
59 | 64 |
User.objects.create(username='foo2', email='foo@example.net') |
60 | 65 |
assert len(User.objects.filter(email='foo@example.net')) == 2 |
61 | 66 |
assert len(User.objects.filter(email='foo@example.net', deleted__isnull=True)) == 1 |
67 | ||
68 | ||
69 |
@pytest.fixture |
|
70 |
def fts(db): |
|
71 |
Attribute.objects.create(name='adresse', label='adresse', searchable=True, kind='string') |
|
72 |
Attribute.objects.create(name='telephone', label='telephone', searchable=True, kind='phone_number') |
|
73 |
Attribute.objects.create(name='dob', label='dob', searchable=True, kind='birthdate') |
|
74 |
user1 = User.objects.create( |
|
75 |
username='foo1234', |
|
76 |
first_name='Jo', |
|
77 |
last_name='darmettein', |
|
78 |
email='jean.darmette@example.net' |
|
79 |
) |
|
80 |
user2 = User.objects.create( |
|
81 |
username='bar1234', |
|
82 |
first_name='Lea', |
|
83 |
last_name='darmettein', |
|
84 |
email='micheline.darmette@example.net' |
|
85 |
) |
|
86 |
user3 = User.objects.create( |
|
87 |
first_name='', |
|
88 |
last_name='peuplier', |
|
89 |
) |
|
90 |
user1.attributes.adresse = '4 rue des peupliers 13001 MARSEILLE' |
|
91 |
user2.attributes.adresse = '4 rue des peupliers 13001 MARSEILLE' |
|
92 |
user1.attributes.telephone = '0601020304' |
|
93 |
user2.attributes.telephone = '0601020305' |
|
94 |
user1.attributes.dob = date(1970, 1, 1) |
|
95 |
user2.attributes.dob = date(1972, 2, 2) |
|
96 |
return locals() |
|
97 | ||
98 | ||
99 |
def test_fts_uuid(fts): |
|
100 |
assert User.objects.free_text_search(fts['user1'].uuid).count() == 1 |
|
101 |
assert User.objects.free_text_search(fts['user2'].uuid).count() == 1 |
|
102 | ||
103 | ||
104 |
def test_fts_phone(fts): |
|
105 |
assert User.objects.free_text_search('01020304').count() == 1 |
|
106 |
assert User.objects.free_text_search('06010203').count() == 2 |
|
107 | ||
108 | ||
109 |
def test_fts_dob(fts): |
|
110 |
assert User.objects.free_text_search('01/01/1970').count() == 1 |
|
111 |
assert User.objects.free_text_search('02/02/1972').count() == 1 |
|
112 |
assert User.objects.free_text_search('03/03/1973').count() == 0 |
|
113 | ||
114 | ||
115 |
def test_fts_email(fts): |
|
116 |
assert User.objects.free_text_search('jean.darmette@example.net').count() == 1 |
|
117 |
assert User.objects.free_text_search('micheline.darmette@example.net').count() == 1 |
|
118 | ||
119 | ||
120 |
def test_fts_username(fts): |
|
121 |
assert User.objects.free_text_search('foo1234').count() == 1 |
|
122 |
assert User.objects.free_text_search('bar1234').count() == 1 |
|
123 | ||
124 | ||
125 |
def test_fts_trigram(fts): |
|
126 |
assert User.objects.free_text_search('darmettein').count() == 2 |
|
127 |
# dist attribute signals queryset from find_duplicates() |
|
128 |
assert hasattr(User.objects.free_text_search('darmettein')[0], 'dist') |
|
129 | ||
130 |
assert User.objects.free_text_search('lea darmettein').count() == 1 |
|
131 |
assert hasattr(User.objects.free_text_search('darmettein')[0], 'dist') |
|
132 | ||
133 | ||
134 |
def test_fts_legacy(fts): |
|
135 |
assert User.objects.free_text_search('rue des peupliers').count() == 2 |
|
136 | ||
137 | ||
138 |
def test_fts_legacy_and_trigram(fts): |
|
139 |
assert User.objects.free_text_search('peuplier').count() == 3 |
|
62 |
- |