0001-validators-use-only-dnspython-to-resolve-domains-409.patch
src/authentic2/validators.py | ||
---|---|---|
19 | 19 |
import smtplib |
20 | 20 | |
21 | 21 |
from django.utils.translation import ugettext_lazy as _ |
22 |
from django.utils.encoding import force_text |
|
23 | 22 |
from django.core.exceptions import ValidationError |
24 |
from django.core.validators import RegexValidator |
|
23 |
from django.core.validators import RegexValidator, EmailValidator as DjangoEmailValidator
|
|
25 | 24 | |
26 |
import socket |
|
27 | 25 |
import dns.resolver |
28 | 26 |
import dns.exception |
29 | 27 | |
... | ... | |
37 | 35 |
def __init__(self, rcpt_check=False): |
38 | 36 |
self.rcpt_check = rcpt_check |
39 | 37 | |
40 |
def check_mxs(self, domain):
|
|
38 |
def query_mxs(self, domain):
|
|
41 | 39 |
try: |
42 | 40 |
mxs = dns.resolver.query(domain, 'MX') |
43 | 41 |
mxs = [str(mx.exchange).rstrip('.') for mx in mxs] |
44 | 42 |
return mxs |
43 |
except dns.resolver.NXDOMAIN: |
|
44 |
return [] |
|
45 |
except dns.resolver.NoAnswer: |
|
46 |
pass |
|
45 | 47 |
except dns.exception.DNSException: |
48 |
pass |
|
49 | ||
50 |
for record_type in ('AAAA', 'A'): |
|
46 | 51 |
try: |
47 |
idna_encoded = force_text(domain).encode('idna') |
|
48 |
except UnicodeError: |
|
52 |
mxs = dns.resolver.query(domain, record_type) |
|
53 |
mxs = [str(mx.address).rstrip('.') for mx in mxs] |
|
54 |
return mxs |
|
55 |
except dns.resolver.NXDOMAIN: |
|
49 | 56 |
return [] |
50 |
try: |
|
51 |
socket.gethostbyname(idna_encoded) |
|
52 |
return [domain] |
|
53 |
except socket.error: |
|
57 |
except dns.resolver.NoAnswer: |
|
58 |
pass |
|
59 |
except dns.exception.DNSException: |
|
54 | 60 |
pass |
55 | 61 |
return [] |
56 | 62 | |
57 |
def __call__(self, value): |
|
58 |
try: |
|
59 |
hostname = value.split('@')[-1] |
|
60 |
except KeyError: |
|
61 |
raise ValidationError(_('Enter a valid email address.'), code='invalid-email') |
|
62 |
if not app_settings.A2_VALIDATE_EMAIL_DOMAIN: |
|
63 |
return True |
|
63 |
def check_rcpt(self, value, mxs): |
|
64 |
for server in mxs: |
|
65 |
try: |
|
66 |
smtp = smtplib.SMTP() |
|
67 |
smtp.connect(server) |
|
68 |
status = smtp.helo() |
|
69 |
if status[0] != 250: |
|
70 |
continue |
|
71 |
smtp.mail('') |
|
72 |
status = smtp.rcpt(value) |
|
73 |
if status[0] // 100 == 5: |
|
74 |
raise ValidationError(_('Invalid email address.'), code='rcpt-check-failed') |
|
75 |
break |
|
76 |
except smtplib.SMTPServerDisconnected: |
|
77 |
continue |
|
78 |
except smtplib.SMTPConnectError: |
|
79 |
continue |
|
64 | 80 | |
65 |
mxs = self.check_mxs(hostname) |
|
66 |
if not mxs: |
|
67 |
raise ValidationError(_('Email domain is invalid'), code='invalid-domain') |
|
81 |
def __call__(self, value): |
|
82 |
DjangoEmailValidator()(value) |
|
68 | 83 | |
69 |
if not self.rcpt_check or not app_settings.A2_VALIDATE_EMAIL: |
|
70 |
return |
|
84 |
localpart, hostname = value.split('@', 1) |
|
85 |
if app_settings.A2_VALIDATE_EMAIL_DOMAIN: |
|
86 |
mxs = self.query_mxs(hostname) |
|
87 |
if not mxs: |
|
88 |
raise ValidationError(_('Email domain is invalid'), code='invalid-domain') |
|
89 |
if self.rcpt_check and app_settings.A2_VALIDATE_EMAIL: |
|
90 |
self.check_rcpt(value, mxs) |
|
71 | 91 | |
72 |
try: |
|
73 |
for server in mxs: |
|
74 |
try: |
|
75 |
smtp = smtplib.SMTP() |
|
76 |
smtp.connect(server) |
|
77 |
status = smtp.helo() |
|
78 |
if status[0] != 250: |
|
79 |
continue |
|
80 |
smtp.mail('') |
|
81 |
status = smtp.rcpt(value) |
|
82 |
if status[0] % 100 == 5: |
|
83 |
raise ValidationError(_('Invalid email address.'), code='rcpt-check-failed') |
|
84 |
break |
|
85 |
except smtplib.SMTPServerDisconnected: |
|
86 |
break |
|
87 |
except smtplib.SMTPConnectError: |
|
88 |
continue |
|
89 |
# Should not happen ! |
|
90 |
except dns.resolver.NXDOMAIN: |
|
91 |
raise ValidationError(_('Nonexistent domain.')) |
|
92 |
except dns.resolver.NoAnswer: |
|
93 |
raise ValidationError(_('Nonexistent email address.')) |
|
94 | 92 | |
95 | 93 |
email_validator = EmailValidator() |
96 | 94 |
tests/test_all.py | ||
---|---|---|
204 | 204 |
'/coin?nonce=xxx&next=/zob/') |
205 | 205 | |
206 | 206 | |
207 |
class ValidatorsTest(TestCase): |
|
208 |
def test_validate_password_(self): |
|
209 |
from authentic2.validators import validate_password |
|
210 |
from django.core.exceptions import ValidationError |
|
211 |
with self.assertRaises(ValidationError): |
|
212 |
validate_password('aaaaaZZZZZZ') |
|
213 |
with self.assertRaises(ValidationError): |
|
214 |
validate_password('00000aaaaaa') |
|
215 |
with self.assertRaises(ValidationError): |
|
216 |
validate_password('00000ZZZZZZ') |
|
217 |
validate_password('000aaaaZZZZ') |
|
218 | ||
219 |
@override_settings(A2_PASSWORD_POLICY_REGEX='^[0-9]{8}$', |
|
220 |
A2_PASSWORD_POLICY_REGEX_ERROR_MSG='pasbon', |
|
221 |
A2_PASSWORD_POLICY_MIN_LENGTH=0, |
|
222 |
A2_PASSWORD_POLICY_MIN_CLASSES=0) |
|
223 |
def test_digits_password_policy(self): |
|
224 |
from authentic2.validators import validate_password |
|
225 |
from django.core.exceptions import ValidationError |
|
226 | ||
227 |
with pytest.raises(ValidationError): |
|
228 |
validate_password('aaa') |
|
229 |
validate_password('12345678') |
|
230 | ||
231 | ||
232 | 207 |
class UserProfileTests(TestCase): |
233 | 208 |
def setUp(self): |
234 | 209 |
User = get_user_model() |
tests/test_manager.py | ||
---|---|---|
296 | 296 | |
297 | 297 |
def test_manager_create_user_email_validation(superuser_or_admin, app, settings, monkeypatch): |
298 | 298 |
settings.A2_VALIDATE_EMAIL_DOMAIN = True |
299 |
monkeypatch.setattr(EmailValidator, 'check_mxs', lambda x, y: [])
|
|
299 |
monkeypatch.setattr(EmailValidator, 'query_mxs', lambda x, y: [])
|
|
300 | 300 |
ou1 = OU.objects.create(name='OU1', slug='ou1') |
301 | 301 | |
302 | 302 |
url = reverse('a2-manager-user-add', kwargs={'ou_pk': ou1.pk}) |
... | ... | |
309 | 309 |
resp = resp.form.submit() |
310 | 310 |
assert 'domain is invalid' in resp.text |
311 | 311 | |
312 |
monkeypatch.setattr(EmailValidator, 'check_mxs', lambda x, y: ['mx1.entrouvert.org'])
|
|
312 |
monkeypatch.setattr(EmailValidator, 'query_mxs', lambda x, y: ['mx1.entrouvert.org'])
|
|
313 | 313 |
resp.form.submit() |
314 | 314 |
assert User.objects.filter(email='john.doe@entrouvert.com').count() == 1 |
315 | 315 |
tests/test_registration.py | ||
---|---|---|
157 | 157 | |
158 | 158 |
def test_registration_email_validation(app, db, monkeypatch, settings): |
159 | 159 |
settings.A2_VALIDATE_EMAIL_DOMAIN = True |
160 |
monkeypatch.setattr(EmailValidator, 'check_mxs', lambda x, y: ['mx1.entrouvert.org'])
|
|
160 |
monkeypatch.setattr(EmailValidator, 'query_mxs', lambda x, y: ['mx1.entrouvert.org'])
|
|
161 | 161 | |
162 | 162 |
resp = app.get(reverse('registration_register')) |
163 | 163 |
resp.form.set('email', 'testbot@entrouvert.com') |
164 | 164 |
resp = resp.form.submit().follow() |
165 | 165 |
assert 'Follow the instructions' in resp.text |
166 | 166 | |
167 |
monkeypatch.setattr(EmailValidator, 'check_mxs', lambda x, y: [])
|
|
167 |
monkeypatch.setattr(EmailValidator, 'query_mxs', lambda x, y: [])
|
|
168 | 168 |
resp = app.get(reverse('registration_register')) |
169 | 169 |
resp.form.set('email', 'testbot@entrouvert.com') |
170 | 170 |
resp = resp.form.submit() |
tests/test_validators.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
# authentic2 - versatile identity manager |
|
3 |
# Copyright (C) 2010-2019 Entr'ouvert |
|
4 |
# |
|
5 |
# This program is free software: you can redistribute it and/or modify it |
|
6 |
# under the terms of the GNU Affero General Public License as published |
|
7 |
# by the Free Software Foundation, either version 3 of the License, or |
|
8 |
# (at your option) any later version. |
|
9 |
# |
|
10 |
# This program is distributed in the hope that it will be useful, |
|
11 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
12 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
13 |
# GNU Affero General Public License for more details. |
|
14 |
# |
|
15 |
# You should have received a copy of the GNU Affero General Public License |
|
16 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
17 | ||
18 |
from __future__ import unicode_literals |
|
19 | ||
20 |
import smtplib |
|
21 | ||
22 |
import mock |
|
23 |
import pytest |
|
24 | ||
25 |
from django.core.exceptions import ValidationError |
|
26 | ||
27 |
from authentic2.validators import validate_password, EmailValidator |
|
28 | ||
29 | ||
30 |
def test_validate_password(): |
|
31 |
with pytest.raises(ValidationError): |
|
32 |
validate_password('aaaaaZZZZZZ') |
|
33 |
with pytest.raises(ValidationError): |
|
34 |
validate_password('00000aaaaaa') |
|
35 |
with pytest.raises(ValidationError): |
|
36 |
validate_password('00000ZZZZZZ') |
|
37 |
validate_password('000aaaaZZZZ') |
|
38 | ||
39 | ||
40 |
def test_digits_password_policy(settings): |
|
41 |
settings.A2_PASSWORD_POLICY_REGEX = '^[0-9]{8}$' |
|
42 |
settings.A2_PASSWORD_POLICY_REGEX_ERROR_MSG = 'pasbon' |
|
43 |
settings.A2_PASSWORD_POLICY_MIN_LENGTH = 0 |
|
44 |
settings.A2_PASSWORD_POLICY_MIN_CLASSES = 0 |
|
45 | ||
46 |
with pytest.raises(ValidationError): |
|
47 |
validate_password('aaa') |
|
48 |
validate_password('12345678') |
|
49 | ||
50 | ||
51 |
def test_email_validator(): |
|
52 |
with pytest.raises(ValidationError): |
|
53 |
EmailValidator()('nok') |
|
54 |
with pytest.raises(ValidationError): |
|
55 |
EmailValidator()('@nok.com') |
|
56 |
with pytest.raises(ValidationError): |
|
57 |
EmailValidator()('foo@bar\x00') |
|
58 |
EmailValidator()('ok@ok.com') |
|
59 | ||
60 | ||
61 |
def test_email_validator_domain(settings): |
|
62 |
settings.A2_VALIDATE_EMAIL_DOMAIN = True |
|
63 |
with mock.patch('authentic2.validators.EmailValidator.query_mxs', return_value=[]) as query_mxs: |
|
64 |
with pytest.raises(ValidationError): |
|
65 |
EmailValidator()('ok@ok.com') |
|
66 |
assert query_mxs.call_count == 1 |
|
67 |
with mock.patch('authentic2.validators.EmailValidator.query_mxs', return_value=['ok']) as query_mxs: |
|
68 |
EmailValidator()('ok@ok.com') |
|
69 |
assert query_mxs.call_count == 1 |
|
70 | ||
71 | ||
72 |
@pytest.fixture |
|
73 |
def smtp(): |
|
74 |
smtp = mock.Mock() |
|
75 |
smtp.helo.return_value = 250, None |
|
76 |
with mock.patch('smtplib.SMTP', return_value=smtp): |
|
77 |
yield smtp |
|
78 | ||
79 | ||
80 |
def test_email_validator_rcpt_check(settings, smtp): |
|
81 |
settings.A2_VALIDATE_EMAIL_DOMAIN = True |
|
82 |
settings.A2_VALIDATE_EMAIL = True |
|
83 | ||
84 |
validator = EmailValidator(rcpt_check=True) |
|
85 | ||
86 |
with mock.patch('authentic2.validators.EmailValidator.query_mxs', return_value=['ok']): |
|
87 |
smtp.rcpt.return_value = 100, None |
|
88 |
validator('ok@ok.com') |
|
89 | ||
90 |
smtp.rcpt.return_value = 500, None |
|
91 |
with pytest.raises(ValidationError): |
|
92 |
validator('ok@ok.com') |
|
93 | ||
94 |
smtp.rcpt.return_value = 100, None |
|
95 |
smtp.rcpt.side_effect = smtplib.SMTPServerDisconnected |
|
96 |
validator('ok@ok.com') |
|
97 | ||
98 |
smtp.rcpt.return_value = 100, None |
|
99 |
smtp.connect.side_effect = smtplib.SMTPConnectError(1,2) |
|
100 |
validator('ok@ok.com') |
|
101 | ||
102 |
assert smtp.connect.call_count == 4 |
|
103 |
assert smtp.rcpt.call_count == 3 |
|
0 |
- |