0002-custom_user-specialize-free_text_search-for-common-s.patch
src/authentic2/custom_user/managers.py | ||
---|---|---|
17 | 17 |
import datetime |
18 | 18 |
import logging |
19 | 19 |
import unicodedata |
20 |
import uuid |
|
20 | 21 | |
21 | 22 |
from django.contrib.contenttypes.models import ContentType |
22 | 23 |
from django.contrib.postgres.search import TrigramDistance |
24 |
from django.core.exceptions import ValidationError |
|
23 | 25 |
from django.db import models, transaction, connection |
24 | 26 |
from django.db.models import F, Value, FloatField, Subquery, OuterRef |
25 | 27 |
from django.db.models.functions import Lower, Coalesce |
26 |
from django.utils import six |
|
27 |
from django.utils import timezone |
|
28 |
from django.utils import timezone, six |
|
28 | 29 |
from django.contrib.auth.models import BaseUserManager |
29 | 30 | |
30 | 31 |
from authentic2 import app_settings |
31 | 32 |
from authentic2.models import Attribute, AttributeValue, UserExternalId |
32 | 33 |
from authentic2.utils.lookups import Unaccent, ImmutableConcat |
34 |
from authentic2.utils.date import parse_date |
|
35 |
from authentic2.attribute_kinds import clean_number |
|
33 | 36 | |
34 | 37 | |
35 | 38 |
class UserQuerySet(models.QuerySet): |
36 | 39 | |
37 | 40 |
def free_text_search(self, search): |
41 |
search = search.strip() |
|
42 | ||
43 |
if '@' in search and len(search.split()) == 1: |
|
44 |
return self.filter(email__icontains=search).order_by('email') |
|
45 | ||
46 |
try: |
|
47 |
guid = uuid.UUID(search) |
|
48 |
except ValueError: |
|
49 |
pass |
|
50 |
else: |
|
51 |
return self.filter(uuid=guid.hex) |
|
52 | ||
53 |
try: |
|
54 |
phone_number = clean_number(search) |
|
55 |
except ValidationError: |
|
56 |
pass |
|
57 |
else: |
|
58 |
attribute_values = AttributeValue.objects.filter( |
|
59 |
content__contains=phone_number, attribute__kind='phone_number') |
|
60 |
qs = self.filter(attribute_values__in=attribute_values).order_by('last_name', 'first_name') |
|
61 |
if qs.exists(): |
|
62 |
return qs |
|
63 | ||
64 |
try: |
|
65 |
date = parse_date(search) |
|
66 |
except ValueError: |
|
67 |
pass |
|
68 |
else: |
|
69 |
attribute_values = AttributeValue.objects.filter( |
|
70 |
content__contains=date.isoformat(), attribute__kind='birthdate') |
|
71 |
qs = self.filter(attribute_values__in=attribute_values).order_by('last_name', 'first_name') |
|
72 |
if qs.exists(): |
|
73 |
return qs |
|
74 | ||
75 |
qs = self.find_duplicates(fullname=search, limit=None) |
|
76 |
if qs.exists(): |
|
77 |
return qs |
|
78 | ||
79 |
qs = self.filter(username__istartswith=search) |
|
80 |
if qs.exists(): |
|
81 |
return qs |
|
82 |
return self.free_text_search_complex(search) |
|
83 | ||
84 |
def free_text_search_complex(self, search): |
|
38 | 85 |
terms = search.split() |
39 | 86 | |
40 | 87 |
if not terms: |
... | ... | |
44 | 91 |
queries = [] |
45 | 92 |
for term in terms: |
46 | 93 |
q = None |
47 | ||
48 | 94 |
specific_queries = [] |
49 | 95 |
for a in searchable_attributes: |
50 | 96 |
kind = a.get_kind() |
... | ... | |
78 | 124 |
self = self.distinct() |
79 | 125 |
return self |
80 | 126 | |
81 |
def find_duplicates(self, first_name=None, last_name=None, fullname=None, birthdate=None): |
|
127 | ||
128 | ||
129 |
def find_duplicates(self, first_name=None, last_name=None, fullname=None, birthdate=None, limit=5): |
|
82 | 130 |
with connection.cursor() as cursor: |
83 | 131 |
cursor.execute( |
84 | 132 |
"SET pg_trgm.similarity_threshold = %f" % app_settings.A2_DUPLICATES_THRESHOLD |
... | ... | |
96 | 144 |
qs = qs.filter(name__trigram_similar=name) |
97 | 145 |
qs = qs.annotate(dist=TrigramDistance('name', name)) |
98 | 146 |
qs = qs.order_by('dist') |
99 |
qs = qs[:5] |
|
147 |
if limit is not None: |
|
148 |
qs = qs[:limit] |
|
100 | 149 | |
101 | 150 |
# alter distance according to additionnal parameters |
102 | 151 |
if birthdate: |
src/authentic2/utils/date.py | ||
---|---|---|
1 |
# authentic2 - versatile identity manager |
|
2 |
# Copyright (C) 2010-2019 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
from datetime import datetime |
|
18 | ||
19 |
from django.utils import formats |
|
20 | ||
21 | ||
22 |
def parse_date(formatted_date): |
|
23 |
parsed_date = None |
|
24 |
for date_format in formats.get_format('DATE_INPUT_FORMATS'): |
|
25 |
try: |
|
26 |
parsed_date = datetime.strptime(formatted_date, date_format) |
|
27 |
except ValueError: |
|
28 |
continue |
|
29 |
else: |
|
30 |
break |
|
31 |
if not parsed_date: |
|
32 |
raise ValueError |
|
33 |
return parsed_date.date() |
tests/test_custom_user.py | ||
---|---|---|
16 | 16 | |
17 | 17 |
from django.contrib.auth import get_user_model |
18 | 18 | |
19 |
from authentic2.models import Attribute |
|
19 | 20 |
from django_rbac.utils import get_permission_model, get_role_model |
20 | 21 | |
22 |
import pytest |
|
23 | ||
21 | 24 |
Permission = get_permission_model() |
22 | 25 |
Role = get_role_model() |
23 | 26 |
User = get_user_model() |
... | ... | |
59 | 62 |
User.objects.create(username='foo2', email='foo@example.net') |
60 | 63 |
assert len(User.objects.filter(email='foo@example.net')) == 2 |
61 | 64 |
assert len(User.objects.filter(email='foo@example.net', deleted__isnull=True)) == 1 |
65 | ||
66 | ||
67 |
@pytest.fixture |
|
68 |
def fts(db): |
|
69 |
Attribute.objects.create(name='adresse', searchable=True, kind='string') |
|
70 |
user1 = User.objects.create( |
|
71 |
username='foo1234', |
|
72 |
first_name='Jean', |
|
73 |
last_name='Darmette', |
|
74 |
email='jean.darmette@example.net' |
|
75 |
) |
|
76 |
user2 = User.objects.create( |
|
77 |
username='bar1234', |
|
78 |
first_name='Micheline', |
|
79 |
last_name='Darmette', |
|
80 |
email='micheline.darmette@example.net' |
|
81 |
) |
|
82 |
user1.attributes.adresse = '4 rue des peupliers 13001 MARSEILLE' |
|
83 |
user2.attributes.adresse = '4 rue des peupliers 13001 MARSEILLE' |
|
84 | ||
85 | ||
86 |
def test_fts_email(fts): |
|
87 |
assert User.objects.free_text_search('jean.darmette@example.net').count() == 1 |
|
88 |
assert User.objects.free_text_search('micheline.darmette@example.net').count() == 1 |
|
89 | ||
90 | ||
91 |
def test_fts_username(fts): |
|
92 |
assert User.objects.free_text_search('foo1234').count() == 1 |
|
93 |
assert User.objects.free_text_search('bar1234').count() == 1 |
|
94 | ||
95 | ||
96 |
def test_fts_trigram(fts): |
|
97 |
assert User.objects.free_text_search('darmette').count() == 2 |
|
98 |
# dist attribute signals queryset from find_duplicates() |
|
99 |
assert User.objects.free_text_search('michel darmette')[0].dist |
|
100 |
assert User.objects.free_text_search('michel darmette').count() == 1 |
|
101 |
# dist attribute signals queryset from find_duplicates() |
|
102 |
assert User.objects.free_text_search('michel darmette')[0].dist |
|
103 | ||
104 | ||
105 |
def test_fts_legacy(fts): |
|
106 |
assert User.objects.free_text_search('rue des peupliers').count() == 2 |
|
62 |
- |