From 702feb4d95bc593d494dfc4e70a17191f72f061a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20P=C3=A9ters?= Date: Tue, 24 Mar 2020 20:31:25 +0100 Subject: [PATCH] validators: use only dnspython to resolve domains (#40989) Also fix SMTP error value check when testing email adresses with an RCPT check. --- src/authentic2/validators.py | 86 ++++++++++++++--------------- tests/test_all.py | 25 --------- tests/test_manager.py | 4 +- tests/test_registration.py | 4 +- tests/test_validators.py | 103 +++++++++++++++++++++++++++++++++++ 5 files changed, 149 insertions(+), 73 deletions(-) create mode 100644 tests/test_validators.py diff --git a/src/authentic2/validators.py b/src/authentic2/validators.py index dc992b09..40bc47fc 100644 --- a/src/authentic2/validators.py +++ b/src/authentic2/validators.py @@ -19,11 +19,9 @@ from __future__ import unicode_literals import smtplib from django.utils.translation import ugettext_lazy as _ -from django.utils.encoding import force_text from django.core.exceptions import ValidationError -from django.core.validators import RegexValidator +from django.core.validators import RegexValidator, EmailValidator as DjangoEmailValidator -import socket import dns.resolver import dns.exception @@ -37,60 +35,60 @@ class EmailValidator(object): def __init__(self, rcpt_check=False): self.rcpt_check = rcpt_check - def check_mxs(self, domain): + def query_mxs(self, domain): try: mxs = dns.resolver.query(domain, 'MX') mxs = [str(mx.exchange).rstrip('.') for mx in mxs] return mxs + except dns.resolver.NXDOMAIN: + return [] + except dns.resolver.NoAnswer: + pass except dns.exception.DNSException: + pass + + for record_type in ('AAAA', 'A'): try: - idna_encoded = force_text(domain).encode('idna') - except UnicodeError: + mxs = dns.resolver.query(domain, record_type) + mxs = [str(mx.address).rstrip('.') for mx in mxs] + return mxs + except dns.resolver.NXDOMAIN: return [] - try: - socket.gethostbyname(idna_encoded) - return [domain] - except socket.error: + except dns.resolver.NoAnswer: + pass + except dns.exception.DNSException: pass return [] - def __call__(self, value): - try: - hostname = value.split('@')[-1] - except KeyError: - raise ValidationError(_('Enter a valid email address.'), code='invalid-email') - if not app_settings.A2_VALIDATE_EMAIL_DOMAIN: - return True + def check_rcpt(self, value, mxs): + for server in mxs: + try: + smtp = smtplib.SMTP() + smtp.connect(server) + status = smtp.helo() + if status[0] != 250: + continue + smtp.mail('') + status = smtp.rcpt(value) + if status[0] // 100 == 5: + raise ValidationError(_('Invalid email address.'), code='rcpt-check-failed') + break + except smtplib.SMTPServerDisconnected: + continue + except smtplib.SMTPConnectError: + continue - mxs = self.check_mxs(hostname) - if not mxs: - raise ValidationError(_('Email domain is invalid'), code='invalid-domain') + def __call__(self, value): + DjangoEmailValidator()(value) - if not self.rcpt_check or not app_settings.A2_VALIDATE_EMAIL: - return + localpart, hostname = value.split('@', 1) + if app_settings.A2_VALIDATE_EMAIL_DOMAIN: + mxs = self.query_mxs(hostname) + if not mxs: + raise ValidationError(_('Email domain is invalid'), code='invalid-domain') + if self.rcpt_check and app_settings.A2_VALIDATE_EMAIL: + self.check_rcpt(value, mxs) - try: - for server in mxs: - try: - smtp = smtplib.SMTP() - smtp.connect(server) - status = smtp.helo() - if status[0] != 250: - continue - smtp.mail('') - status = smtp.rcpt(value) - if status[0] % 100 == 5: - raise ValidationError(_('Invalid email address.'), code='rcpt-check-failed') - break - except smtplib.SMTPServerDisconnected: - break - except smtplib.SMTPConnectError: - continue - # Should not happen ! - except dns.resolver.NXDOMAIN: - raise ValidationError(_('Nonexistent domain.')) - except dns.resolver.NoAnswer: - raise ValidationError(_('Nonexistent email address.')) email_validator = EmailValidator() diff --git a/tests/test_all.py b/tests/test_all.py index 88254267..22e8fc25 100644 --- a/tests/test_all.py +++ b/tests/test_all.py @@ -204,31 +204,6 @@ class UtilsTests(Authentic2TestCase): '/coin?nonce=xxx&next=/zob/') -class ValidatorsTest(TestCase): - def test_validate_password_(self): - from authentic2.validators import validate_password - from django.core.exceptions import ValidationError - with self.assertRaises(ValidationError): - validate_password('aaaaaZZZZZZ') - with self.assertRaises(ValidationError): - validate_password('00000aaaaaa') - with self.assertRaises(ValidationError): - validate_password('00000ZZZZZZ') - validate_password('000aaaaZZZZ') - - @override_settings(A2_PASSWORD_POLICY_REGEX='^[0-9]{8}$', - A2_PASSWORD_POLICY_REGEX_ERROR_MSG='pasbon', - A2_PASSWORD_POLICY_MIN_LENGTH=0, - A2_PASSWORD_POLICY_MIN_CLASSES=0) - def test_digits_password_policy(self): - from authentic2.validators import validate_password - from django.core.exceptions import ValidationError - - with pytest.raises(ValidationError): - validate_password('aaa') - validate_password('12345678') - - class UserProfileTests(TestCase): def setUp(self): User = get_user_model() diff --git a/tests/test_manager.py b/tests/test_manager.py index 5087363c..ae21cc83 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -296,7 +296,7 @@ def test_manager_create_user(superuser_or_admin, app, settings): def test_manager_create_user_email_validation(superuser_or_admin, app, settings, monkeypatch): settings.A2_VALIDATE_EMAIL_DOMAIN = True - monkeypatch.setattr(EmailValidator, 'check_mxs', lambda x, y: []) + monkeypatch.setattr(EmailValidator, 'query_mxs', lambda x, y: []) ou1 = OU.objects.create(name='OU1', slug='ou1') url = reverse('a2-manager-user-add', kwargs={'ou_pk': ou1.pk}) @@ -309,7 +309,7 @@ def test_manager_create_user_email_validation(superuser_or_admin, app, settings, resp = resp.form.submit() assert 'domain is invalid' in resp.text - monkeypatch.setattr(EmailValidator, 'check_mxs', lambda x, y: ['mx1.entrouvert.org']) + monkeypatch.setattr(EmailValidator, 'query_mxs', lambda x, y: ['mx1.entrouvert.org']) resp.form.submit() assert User.objects.filter(email='john.doe@entrouvert.com').count() == 1 diff --git a/tests/test_registration.py b/tests/test_registration.py index fd6f8edb..030fe277 100644 --- a/tests/test_registration.py +++ b/tests/test_registration.py @@ -157,14 +157,14 @@ def test_registration_realm(app, db, settings, mailoutbox): def test_registration_email_validation(app, db, monkeypatch, settings): settings.A2_VALIDATE_EMAIL_DOMAIN = True - monkeypatch.setattr(EmailValidator, 'check_mxs', lambda x, y: ['mx1.entrouvert.org']) + monkeypatch.setattr(EmailValidator, 'query_mxs', lambda x, y: ['mx1.entrouvert.org']) resp = app.get(reverse('registration_register')) resp.form.set('email', 'testbot@entrouvert.com') resp = resp.form.submit().follow() assert 'Follow the instructions' in resp.text - monkeypatch.setattr(EmailValidator, 'check_mxs', lambda x, y: []) + monkeypatch.setattr(EmailValidator, 'query_mxs', lambda x, y: []) resp = app.get(reverse('registration_register')) resp.form.set('email', 'testbot@entrouvert.com') resp = resp.form.submit() diff --git a/tests/test_validators.py b/tests/test_validators.py new file mode 100644 index 00000000..a1964be1 --- /dev/null +++ b/tests/test_validators.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +# authentic2 - versatile identity manager +# Copyright (C) 2010-2019 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from __future__ import unicode_literals + +import smtplib + +import mock +import pytest + +from django.core.exceptions import ValidationError + +from authentic2.validators import validate_password, EmailValidator + + +def test_validate_password(): + with pytest.raises(ValidationError): + validate_password('aaaaaZZZZZZ') + with pytest.raises(ValidationError): + validate_password('00000aaaaaa') + with pytest.raises(ValidationError): + validate_password('00000ZZZZZZ') + validate_password('000aaaaZZZZ') + + +def test_digits_password_policy(settings): + settings.A2_PASSWORD_POLICY_REGEX = '^[0-9]{8}$' + settings.A2_PASSWORD_POLICY_REGEX_ERROR_MSG = 'pasbon' + settings.A2_PASSWORD_POLICY_MIN_LENGTH = 0 + settings.A2_PASSWORD_POLICY_MIN_CLASSES = 0 + + with pytest.raises(ValidationError): + validate_password('aaa') + validate_password('12345678') + + +def test_email_validator(): + with pytest.raises(ValidationError): + EmailValidator()('nok') + with pytest.raises(ValidationError): + EmailValidator()('@nok.com') + with pytest.raises(ValidationError): + EmailValidator()('foo@bar\x00') + EmailValidator()('ok@ok.com') + + +def test_email_validator_domain(settings): + settings.A2_VALIDATE_EMAIL_DOMAIN = True + with mock.patch('authentic2.validators.EmailValidator.query_mxs', return_value=[]) as query_mxs: + with pytest.raises(ValidationError): + EmailValidator()('ok@ok.com') + assert query_mxs.call_count == 1 + with mock.patch('authentic2.validators.EmailValidator.query_mxs', return_value=['ok']) as query_mxs: + EmailValidator()('ok@ok.com') + assert query_mxs.call_count == 1 + + +@pytest.fixture +def smtp(): + smtp = mock.Mock() + smtp.helo.return_value = 250, None + with mock.patch('smtplib.SMTP', return_value=smtp): + yield smtp + + +def test_email_validator_rcpt_check(settings, smtp): + settings.A2_VALIDATE_EMAIL_DOMAIN = True + settings.A2_VALIDATE_EMAIL = True + + validator = EmailValidator(rcpt_check=True) + + with mock.patch('authentic2.validators.EmailValidator.query_mxs', return_value=['ok']): + smtp.rcpt.return_value = 100, None + validator('ok@ok.com') + + smtp.rcpt.return_value = 500, None + with pytest.raises(ValidationError): + validator('ok@ok.com') + + smtp.rcpt.return_value = 100, None + smtp.rcpt.side_effect = smtplib.SMTPServerDisconnected + validator('ok@ok.com') + + smtp.rcpt.return_value = 100, None + smtp.connect.side_effect = smtplib.SMTPConnectError(1,2) + validator('ok@ok.com') + + assert smtp.connect.call_count == 4 + assert smtp.rcpt.call_count == 3 -- 2.24.0