15 |
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
16 |
16 |
|
17 |
17 |
import ast
|
|
18 |
import logging
|
|
19 |
import re
|
18 |
20 |
import sys
|
19 |
21 |
|
|
22 |
import dns.exception
|
|
23 |
import dns.resolver
|
20 |
24 |
from django.core.exceptions import ValidationError
|
|
25 |
from django.core.validators import validate_ipv4_address
|
21 |
26 |
|
22 |
27 |
try:
|
23 |
28 |
from functools import lru_cache
|
24 |
29 |
except ImportError:
|
25 |
30 |
from django.utils.lru_cache import lru_cache
|
26 |
31 |
|
27 |
|
|
28 |
32 |
from django.utils.translation import ugettext as _
|
29 |
33 |
|
|
34 |
logger = logging.getLogger(__name__)
|
|
35 |
|
30 |
36 |
|
31 |
37 |
class HTTPHeaders:
|
32 |
38 |
def __init__(self, request):
|
... | ... | |
67 |
73 |
self.text = Unparse().visit(node)
|
68 |
74 |
|
69 |
75 |
|
|
76 |
def is_valid_hostname(hostname):
|
|
77 |
if hostname[-1] == ".":
|
|
78 |
# strip exactly one dot from the right, if present
|
|
79 |
hostname = hostname[:-1]
|
|
80 |
if len(hostname) > 253:
|
|
81 |
return False
|
|
82 |
|
|
83 |
labels = hostname.split(".")
|
|
84 |
|
|
85 |
# the TLD must be not all-numeric
|
|
86 |
if re.match(r"[0-9]+$", labels[-1]):
|
|
87 |
return False
|
|
88 |
|
|
89 |
allowed = re.compile(r"(?!-)[a-z0-9-]{1,63}(?<!-)$", re.IGNORECASE)
|
|
90 |
return all(allowed.match(label) for label in labels)
|
|
91 |
|
|
92 |
|
|
93 |
def check_dnsbl(dnsbl, remote_addr):
|
|
94 |
domain = '.'.join(reversed(remote_addr.split('.'))) + '.' + dnsbl
|
|
95 |
exception = None
|
|
96 |
try:
|
|
97 |
answers = dns.resolver.resolve(domain, 'A', lifetime=1)
|
|
98 |
result = any(answer.address for answer in answers)
|
|
99 |
except dns.resolver.NXDOMAIN as e:
|
|
100 |
exception = e
|
|
101 |
result = False
|
|
102 |
except dns.resolver.NoAnswer as e:
|
|
103 |
exception = e
|
|
104 |
result = False
|
|
105 |
except dns.exception.DNSException as e:
|
|
106 |
exception = e
|
|
107 |
logger.warning(f'utils: could not check dnsbl {dnsbl} for domain "%s": %s', domain, e)
|
|
108 |
result = False
|
|
109 |
logger.debug('utils: dnsbl lookup of "%s", result=%s exception=%s', domain, result, exception)
|
|
110 |
return result
|
|
111 |
|
|
112 |
|
|
113 |
class DNSBL:
|
|
114 |
def __init__(self, domain):
|
|
115 |
if not is_valid_hostname(domain):
|
|
116 |
raise ValueError('%s is not a valid domain name' % domain)
|
|
117 |
self.domain = domain
|
|
118 |
|
|
119 |
def __contains__(self, remote_addr):
|
|
120 |
if not remote_addr or not isinstance(remote_addr, str):
|
|
121 |
return False
|
|
122 |
validate_ipv4_address(remote_addr)
|
|
123 |
return check_dnsbl(self.domain, remote_addr)
|
|
124 |
|
|
125 |
|
|
126 |
def dnsbl(domain):
|
|
127 |
return DNSBL(domain)
|
|
128 |
|
|
129 |
|
70 |
130 |
class BaseExpressionValidator(ast.NodeVisitor):
|
71 |
131 |
authorized_nodes = []
|
72 |
132 |
forbidden_nodes = []
|