Projet

Général

Profil

0004-custom_user-specialize-free_text_search-for-common-s.patch

Benjamin Dauvergne, 16 janvier 2021 19:18

Télécharger (11,2 ko)

Voir les différences:

Subject: [PATCH 4/4] custom_user: specialize free_text_search for common
 search terms (#49957)

 src/authentic2/custom_user/managers.py | 113 ++++++++++++++-----------
 src/authentic2/utils/date.py           |  33 ++++++++
 tests/test_custom_user.py              |  78 +++++++++++++++++
 3 files changed, 175 insertions(+), 49 deletions(-)
 create mode 100644 src/authentic2/utils/date.py
src/authentic2/custom_user/managers.py
17 17
import datetime
18 18
import logging
19 19
import unicodedata
20
import uuid
20 21

  
21 22
from django.contrib.contenttypes.models import ContentType
22 23
from django.contrib.postgres.search import TrigramDistance
24
from django.core.exceptions import ValidationError
23 25
from django.db import models, transaction, connection
24
from django.db.models import F, Value, FloatField, Subquery, OuterRef
26
from django.db.models import F, Value, FloatField, Subquery, OuterRef, Q
25 27
from django.db.models.functions import Lower, Coalesce
26
from django.utils import six
27 28
from django.utils import timezone
28 29
from django.contrib.auth.models import BaseUserManager
30
from django.contrib.postgres.search import SearchQuery
29 31

  
30 32
from authentic2 import app_settings
31
from authentic2.models import Attribute, AttributeValue, UserExternalId
33
from authentic2.models import AttributeValue, UserExternalId
32 34
from authentic2.utils.lookups import Unaccent, ImmutableConcat
35
from authentic2.utils.date import parse_date
36
from authentic2.attribute_kinds import clean_number
33 37

  
34 38

  
35 39
class UserQuerySet(models.QuerySet):
36

  
37 40
    def free_text_search(self, search):
38
        terms = search.split()
39

  
40
        if not terms:
41
            return self
42

  
43
        searchable_attributes = Attribute.objects.filter(searchable=True)
44
        queries = []
45
        for term in terms:
46
            q = None
47

  
48
            specific_queries = []
49
            for a in searchable_attributes:
50
                kind = a.get_kind()
51
                free_text_search_function = kind.get('free_text_search')
52
                if free_text_search_function:
53
                    q = free_text_search_function(term)
54
                    if q is not None:
55
                        specific_queries.append(q & models.query.Q(attribute_values__attribute=a))
56

  
57
            # if the term is recognized by some specific attribute type, like a
58
            # date, does not use the later generic matcher
59
            if specific_queries:
60
                queries.append(six.moves.reduce(models.query.Q.__or__, specific_queries))
61
                continue
62

  
63
            q = (
64
                models.query.Q(username__icontains=term)
65
                | models.query.Q(first_name__icontains=term)
66
                | models.query.Q(last_name__icontains=term)
67
                | models.query.Q(email__icontains=term)
68
            )
69
            for a in searchable_attributes:
70
                if a.name in ('first_name', 'last_name'):
71
                    continue
72
                q = q | models.query.Q(
73
                    attribute_values__content__icontains=term, attribute_values__attribute=a)
74
            queries.append(q)
75
        self = self.filter(six.moves.reduce(models.query.Q.__and__, queries))
76
        # search by attributes can match multiple times
77
        if searchable_attributes:
78
            self = self.distinct()
79
        return self
80

  
81
    def find_duplicates(self, first_name=None, last_name=None, fullname=None, birthdate=None):
41
        search = search.strip()
42

  
43
        if len(search) == 0:
44
            return self.none()
45

  
46
        if '@' in search and len(search.split()) == 1:
47
            qs = self.filter(email__iexact=search).order_by('last_name', 'first_name')
48
            if qs.exists():
49
                return qs
50

  
51
        try:
52
            guid = uuid.UUID(search)
53
        except ValueError:
54
            pass
55
        else:
56
            return self.filter(uuid=guid.hex)
57

  
58
        try:
59
            phone_number = clean_number(search)
60
        except ValidationError:
61
            pass
62
        else:
63
            attribute_values = AttributeValue.objects.filter(
64
                search_vector=SearchQuery(phone_number), attribute__kind='phone_number')
65
            qs = self.filter(attribute_values__in=attribute_values).order_by('last_name', 'first_name')
66
            if qs.exists():
67
                return qs
68

  
69
        try:
70
            date = parse_date(search)
71
        except ValueError:
72
            pass
73
        else:
74
            attribute_values = AttributeValue.objects.filter(
75
                search_vector=SearchQuery(date.isoformat()), attribute__kind='birthdate')
76
            qs = self.filter(attribute_values__in=attribute_values).order_by('last_name', 'first_name')
77
            if qs.exists():
78
                return qs
79

  
80
        qs = self.find_duplicates(fullname=search, limit=None)
81
        extra_user_ids = set()
82
        attribute_values = AttributeValue.objects.filter(search_vector=SearchQuery(search), attribute__searchable=True)
83
        extra_user_ids.update(self.filter(attribute_values__in=attribute_values).values_list('id', flat=True))
84
        if len(search.split()) == 1:
85
            extra_user_ids.update(
86
                self.filter(
87
                    Q(username__istartswith=search)
88
                    | Q(email__istartswith=search)
89
                ).values_list('id', flat=True))
90
        if extra_user_ids:
91
            qs = qs | self.filter(id__in=extra_user_ids)
92
        qs = qs.order_by('dist', 'last_name', 'first_name')
93
        return qs
94

  
95
    def find_duplicates(self, first_name=None, last_name=None, fullname=None, birthdate=None, limit=5):
82 96
        with connection.cursor() as cursor:
83 97
            cursor.execute(
84 98
                "SET pg_trgm.similarity_threshold = %f" % app_settings.A2_DUPLICATES_THRESHOLD
......
96 110
        qs = qs.filter(name__trigram_similar=name)
97 111
        qs = qs.annotate(dist=TrigramDistance('name', name))
98 112
        qs = qs.order_by('dist')
99
        qs = qs[:5]
113
        if limit is not None:
114
            qs = qs[:limit]
100 115

  
101 116
        # alter distance according to additionnal parameters
102 117
        if birthdate:
src/authentic2/utils/date.py
1
# authentic2 - versatile identity manager
2
# Copyright (C) 2010-2020 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
from datetime import datetime
18

  
19
from django.utils import formats
20

  
21

  
22
def parse_date(formatted_date):
23
    parsed_date = None
24
    for date_format in formats.get_format('DATE_INPUT_FORMATS'):
25
        try:
26
            parsed_date = datetime.strptime(formatted_date, date_format)
27
        except ValueError:
28
            continue
29
        else:
30
            break
31
    if not parsed_date:
32
        raise ValueError
33
    return parsed_date.date()
tests/test_custom_user.py
14 14
# You should have received a copy of the GNU Affero General Public License
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16

  
17
from datetime import date
18

  
17 19
from django.contrib.auth import get_user_model
18 20

  
21
from authentic2.models import Attribute
19 22
from django_rbac.utils import get_permission_model, get_role_model
20 23

  
24
import pytest
25

  
21 26
Permission = get_permission_model()
22 27
Role = get_role_model()
23 28
User = get_user_model()
......
59 64
    User.objects.create(username='foo2', email='foo@example.net')
60 65
    assert len(User.objects.filter(email='foo@example.net')) == 2
61 66
    assert len(User.objects.filter(email='foo@example.net', deleted__isnull=True)) == 1
67

  
68

  
69
@pytest.fixture
70
def fts(db):
71
    Attribute.objects.create(name='adresse', label='adresse', searchable=True, kind='string')
72
    Attribute.objects.create(name='telephone', label='telephone', searchable=True, kind='phone_number')
73
    Attribute.objects.create(name='dob', label='dob', searchable=True, kind='birthdate')
74
    user1 = User.objects.create(
75
        username='foo1234',
76
        first_name='Jo',
77
        last_name='darmettein',
78
        email='jean.darmette@example.net'
79
    )
80
    user2 = User.objects.create(
81
        username='bar1234',
82
        first_name='Lea',
83
        last_name='darmettein',
84
        email='micheline.darmette@example.net'
85
    )
86
    user3 = User.objects.create(
87
        first_name='',
88
        last_name='peuplier',
89
    )
90
    user1.attributes.adresse = '4 rue des peupliers 13001 MARSEILLE'
91
    user2.attributes.adresse = '4 rue des peupliers 13001 MARSEILLE'
92
    user1.attributes.telephone = '0601020304'
93
    user2.attributes.telephone = '0601020305'
94
    user1.attributes.dob = date(1970, 1, 1)
95
    user2.attributes.dob = date(1972, 2, 2)
96
    return locals()
97

  
98

  
99
def test_fts_uuid(fts):
100
    assert User.objects.free_text_search(fts['user1'].uuid).count() == 1
101
    assert User.objects.free_text_search(fts['user2'].uuid).count() == 1
102

  
103

  
104
def test_fts_phone(fts):
105
    assert list(User.objects.free_text_search('0601020304')) == [fts['user1']]
106
    assert list(User.objects.free_text_search('0601020305')) == [fts['user2']]
107

  
108

  
109
def test_fts_dob(fts):
110
    assert User.objects.free_text_search('01/01/1970').count() == 1
111
    assert User.objects.free_text_search('02/02/1972').count() == 1
112
    assert User.objects.free_text_search('03/03/1973').count() == 0
113

  
114

  
115
def test_fts_email(fts):
116
    assert User.objects.free_text_search('jean.darmette@example.net').count() == 1
117
    assert User.objects.free_text_search('micheline.darmette@example.net').count() == 1
118

  
119

  
120
def test_fts_username(fts):
121
    assert User.objects.free_text_search('foo1234').count() == 1
122
    assert User.objects.free_text_search('bar1234').count() == 1
123

  
124

  
125
def test_fts_trigram(fts):
126
    assert User.objects.free_text_search('darmettein').count() == 2
127
    # dist attribute signals queryset from find_duplicates()
128
    assert hasattr(User.objects.free_text_search('darmettein')[0], 'dist')
129

  
130
    assert User.objects.free_text_search('lea darmettein').count() == 1
131
    assert hasattr(User.objects.free_text_search('darmettein')[0], 'dist')
132

  
133

  
134
def test_fts_legacy(fts):
135
    assert User.objects.free_text_search('rue des peupliers').count() == 2
136

  
137

  
138
def test_fts_legacy_and_trigram(fts):
139
    assert User.objects.free_text_search('peuplier').count() == 3
62
-