19 |
19 |
import smtplib
|
20 |
20 |
|
21 |
21 |
from django.utils.translation import ugettext_lazy as _
|
22 |
|
from django.utils.encoding import force_text
|
23 |
22 |
from django.core.exceptions import ValidationError
|
24 |
|
from django.core.validators import RegexValidator
|
|
23 |
from django.core.validators import RegexValidator, EmailValidator as DjangoEmailValidator
|
25 |
24 |
|
26 |
|
import socket
|
27 |
25 |
import dns.resolver
|
28 |
26 |
import dns.exception
|
29 |
27 |
|
... | ... | |
37 |
35 |
def __init__(self, rcpt_check=False):
|
38 |
36 |
self.rcpt_check = rcpt_check
|
39 |
37 |
|
40 |
|
def check_mxs(self, domain):
|
|
38 |
def query_mxs(self, domain):
|
41 |
39 |
try:
|
42 |
40 |
mxs = dns.resolver.query(domain, 'MX')
|
43 |
41 |
mxs = [str(mx.exchange).rstrip('.') for mx in mxs]
|
44 |
42 |
return mxs
|
|
43 |
except dns.resolver.NXDOMAIN:
|
|
44 |
return []
|
|
45 |
except dns.resolver.NoAnswer:
|
|
46 |
pass
|
45 |
47 |
except dns.exception.DNSException:
|
|
48 |
pass
|
|
49 |
|
|
50 |
for record_type in ('AAAA', 'A'):
|
46 |
51 |
try:
|
47 |
|
idna_encoded = force_text(domain).encode('idna')
|
48 |
|
except UnicodeError:
|
|
52 |
mxs = dns.resolver.query(domain, record_type)
|
|
53 |
mxs = [str(mx.address).rstrip('.') for mx in mxs]
|
|
54 |
return mxs
|
|
55 |
except dns.resolver.NXDOMAIN:
|
49 |
56 |
return []
|
50 |
|
try:
|
51 |
|
socket.gethostbyname(idna_encoded)
|
52 |
|
return [domain]
|
53 |
|
except socket.error:
|
|
57 |
except dns.resolver.NoAnswer:
|
|
58 |
pass
|
|
59 |
except dns.exception.DNSException:
|
54 |
60 |
pass
|
55 |
61 |
return []
|
56 |
62 |
|
57 |
|
def __call__(self, value):
|
58 |
|
try:
|
59 |
|
hostname = value.split('@')[-1]
|
60 |
|
except KeyError:
|
61 |
|
raise ValidationError(_('Enter a valid email address.'), code='invalid-email')
|
62 |
|
if not app_settings.A2_VALIDATE_EMAIL_DOMAIN:
|
63 |
|
return True
|
|
63 |
def check_rcpt(self, value, mxs):
|
|
64 |
for server in mxs:
|
|
65 |
try:
|
|
66 |
smtp = smtplib.SMTP()
|
|
67 |
smtp.connect(server)
|
|
68 |
status = smtp.helo()
|
|
69 |
if status[0] != 250:
|
|
70 |
continue
|
|
71 |
smtp.mail('')
|
|
72 |
status = smtp.rcpt(value)
|
|
73 |
if status[0] // 100 == 5:
|
|
74 |
raise ValidationError(_('Invalid email address.'), code='rcpt-check-failed')
|
|
75 |
break
|
|
76 |
except smtplib.SMTPServerDisconnected:
|
|
77 |
continue
|
|
78 |
except smtplib.SMTPConnectError:
|
|
79 |
continue
|
64 |
80 |
|
65 |
|
mxs = self.check_mxs(hostname)
|
66 |
|
if not mxs:
|
67 |
|
raise ValidationError(_('Email domain is invalid'), code='invalid-domain')
|
|
81 |
def __call__(self, value):
|
|
82 |
DjangoEmailValidator()(value)
|
68 |
83 |
|
69 |
|
if not self.rcpt_check or not app_settings.A2_VALIDATE_EMAIL:
|
70 |
|
return
|
|
84 |
localpart, hostname = value.split('@', 1)
|
|
85 |
if app_settings.A2_VALIDATE_EMAIL_DOMAIN:
|
|
86 |
mxs = self.query_mxs(hostname)
|
|
87 |
if not mxs:
|
|
88 |
raise ValidationError(_('Email domain is invalid'), code='invalid-domain')
|
|
89 |
if self.rcpt_check and app_settings.A2_VALIDATE_EMAIL:
|
|
90 |
self.check_rcpt(value, mxs)
|
71 |
91 |
|
72 |
|
try:
|
73 |
|
for server in mxs:
|
74 |
|
try:
|
75 |
|
smtp = smtplib.SMTP()
|
76 |
|
smtp.connect(server)
|
77 |
|
status = smtp.helo()
|
78 |
|
if status[0] != 250:
|
79 |
|
continue
|
80 |
|
smtp.mail('')
|
81 |
|
status = smtp.rcpt(value)
|
82 |
|
if status[0] % 100 == 5:
|
83 |
|
raise ValidationError(_('Invalid email address.'), code='rcpt-check-failed')
|
84 |
|
break
|
85 |
|
except smtplib.SMTPServerDisconnected:
|
86 |
|
break
|
87 |
|
except smtplib.SMTPConnectError:
|
88 |
|
continue
|
89 |
|
# Should not happen !
|
90 |
|
except dns.resolver.NXDOMAIN:
|
91 |
|
raise ValidationError(_('Nonexistent domain.'))
|
92 |
|
except dns.resolver.NoAnswer:
|
93 |
|
raise ValidationError(_('Nonexistent email address.'))
|
94 |
92 |
|
95 |
93 |
email_validator = EmailValidator()
|
96 |
94 |
|
97 |
|
-
|