0002-django4-replaced-force_text-with-equivalent-force_st.patch
src/authentic2/api_views.py | ||
---|---|---|
28 | 28 |
from django.db import models |
29 | 29 |
from django.shortcuts import get_object_or_404 |
30 | 30 |
from django.utils.dateparse import parse_datetime |
31 |
from django.utils.encoding import force_text
|
|
31 |
from django.utils.encoding import force_str
|
|
32 | 32 |
from django.utils.text import slugify |
33 | 33 |
from django.utils.translation import gettext_lazy as _ |
34 | 34 |
from django.views.decorators.cache import cache_control |
... | ... | |
258 | 258 |
response = { |
259 | 259 |
'result': 0, |
260 | 260 |
'errors': {'__all__': ['Mail sending failed']}, |
261 |
'exception': force_text(e),
|
|
261 |
'exception': force_str(e),
|
|
262 | 262 |
} |
263 | 263 |
response_status = status.HTTP_503_SERVICE_UNAVAILABLE |
264 | 264 |
else: |
src/authentic2/backends/ldap_backend.py | ||
---|---|---|
38 | 38 |
from django.core.exceptions import ImproperlyConfigured |
39 | 39 |
from django.db.models import Q |
40 | 40 |
from django.db.transaction import atomic |
41 |
from django.utils.encoding import force_bytes, force_text
|
|
41 |
from django.utils.encoding import force_bytes, force_str
|
|
42 | 42 |
from django.utils.translation import gettext as _ |
43 | 43 |
from django.utils.translation import ngettext |
44 | 44 |
from ldap.controls import DecodeControlTuples, SimplePagedResultsControl, ppolicy |
... | ... | |
200 | 200 |
if d is None: |
201 | 201 |
return d |
202 | 202 |
elif isinstance(d, str): |
203 |
return force_text(d)
|
|
203 |
return force_str(d)
|
|
204 | 204 |
elif isinstance(d, (list, tuple)): |
205 | 205 |
return d.__class__(map_text(x) for x in d) |
206 | 206 |
elif isinstance(d, dict): |
... | ... | |
293 | 293 |
settings.SECRET_KEY, encrypted_bindpw, raise_on_error=False |
294 | 294 |
) |
295 | 295 |
if decrypted: |
296 |
decrypted = force_text(decrypted)
|
|
296 |
decrypted = force_str(decrypted)
|
|
297 | 297 |
self.ldap_data['block']['bindpw'] = decrypted |
298 | 298 |
del self.ldap_data['block']['encrypted_bindpw'] |
299 | 299 | |
... | ... | |
302 | 302 |
data = dict(self.ldap_data) |
303 | 303 |
data['block'] = dict(data['block']) |
304 | 304 |
if data['block'].get('bindpw'): |
305 |
data['block']['encrypted_bindpw'] = force_text(
|
|
305 |
data['block']['encrypted_bindpw'] = force_str(
|
|
306 | 306 |
crypto.aes_base64_encrypt(settings.SECRET_KEY, force_bytes(data['block']['bindpw'])) |
307 | 307 |
) |
308 | 308 |
del data['block']['bindpw'] |
... | ... | |
339 | 339 |
cache = self.ldap_data.setdefault('password', {}) |
340 | 340 |
if password is not None: |
341 | 341 |
# Prevent eavesdropping of the password through the session storage |
342 |
password = force_text(crypto.aes_base64_encrypt(settings.SECRET_KEY, force_bytes(password)))
|
|
342 |
password = force_str(crypto.aes_base64_encrypt(settings.SECRET_KEY, force_bytes(password)))
|
|
343 | 343 |
cache[self.dn] = password |
344 | 344 |
# ensure session is marked dirty |
345 | 345 |
self.update_request() |
... | ... | |
350 | 350 |
password = cache.get(self.dn) |
351 | 351 |
if password is not None: |
352 | 352 |
try: |
353 |
password = force_text(crypto.aes_base64_decrypt(settings.SECRET_KEY, password))
|
|
353 |
password = force_str(crypto.aes_base64_decrypt(settings.SECRET_KEY, password))
|
|
354 | 354 |
except crypto.DecryptionError: |
355 | 355 |
log.error('unable to decrypt a stored LDAP password') |
356 | 356 |
self.keep_password_in_session(None) |
357 | 357 |
password = None |
358 | 358 |
else: |
359 |
password = force_text(password)
|
|
359 |
password = force_str(password)
|
|
360 | 360 |
return password |
361 | 361 |
else: |
362 | 362 |
self.keep_password_in_session(None) |
... | ... | |
413 | 413 | |
414 | 414 |
def get_attributes(self, attribute_source, ctx): |
415 | 415 |
cache_key = hashlib.md5( |
416 |
(force_text(str(self.pk)) + ';' + force_text(self.dn)).encode('utf-8')
|
|
416 |
(force_str(str(self.pk)) + ';' + force_str(self.dn)).encode('utf-8')
|
|
417 | 417 |
).hexdigest() |
418 | 418 |
conn = self.get_connection() |
419 | 419 |
# prevents blocking on temporary LDAP failures |
... | ... | |
657 | 657 |
for conn in self.get_connections(block): |
658 | 658 |
ldap_uri = conn.get_option(ldap.OPT_URI) |
659 | 659 |
authz_ids = [] |
660 |
user_basedn = force_text(block.get('user_basedn') or block['basedn'])
|
|
660 |
user_basedn = force_str(block.get('user_basedn') or block['basedn'])
|
|
661 | 661 | |
662 | 662 |
try: |
663 | 663 |
if block['user_dn_template']: |
664 |
template = force_text(block['user_dn_template'])
|
|
664 |
template = force_str(block['user_dn_template'])
|
|
665 | 665 |
escaped_username = escape_dn_chars(username) |
666 | 666 |
authz_ids.append(template.format(username=escaped_username)) |
667 | 667 |
else: |
... | ... | |
670 | 670 |
authz_ids.append(username) |
671 | 671 |
elif block['user_filter']: |
672 | 672 |
# allow multiple occurences of the username in the filter |
673 |
user_filter = force_text(block['user_filter'])
|
|
673 |
user_filter = force_str(block['user_filter'])
|
|
674 | 674 |
n = len(user_filter.split('%s')) - 1 |
675 | 675 |
try: |
676 | 676 |
query = filter_format(user_filter, (username,) * n) |
... | ... | |
818 | 818 | |
819 | 819 |
def create_username(self, block, attributes): |
820 | 820 |
'''Build a username using the configured template''' |
821 |
username_template = force_text(block['username_template'])
|
|
821 |
username_template = force_str(block['username_template'])
|
|
822 | 822 |
try: |
823 | 823 |
return username_template.format(realm=block['realm'], **attributes) |
824 | 824 |
except KeyError as e: |
... | ... | |
955 | 955 |
if member_of_attribute: |
956 | 956 |
group_dns.update([dn.lower() for dn in attributes.get(member_of_attribute, [])]) |
957 | 957 |
if group_filter: |
958 |
group_filter = force_text(group_filter)
|
|
958 |
group_filter = force_str(group_filter)
|
|
959 | 959 |
params = attributes.copy() |
960 | 960 |
params['user_dn'] = dn |
961 | 961 |
query = FilterFormatter().format(group_filter, **params) |
... | ... | |
1095 | 1095 |
def _get_target_ou(self, block): |
1096 | 1096 |
ou_slug = block['ou_slug'] |
1097 | 1097 |
if ou_slug: |
1098 |
ou_slug = force_text(ou_slug)
|
|
1098 |
ou_slug = force_str(ou_slug)
|
|
1099 | 1099 |
try: |
1100 | 1100 |
return OrganizationalUnit.objects.get(slug=ou_slug) |
1101 | 1101 |
except OrganizationalUnit.DoesNotExist: |
... | ... | |
1120 | 1120 | |
1121 | 1121 |
@classmethod |
1122 | 1122 |
def get_sync_ldap_user_filter(cls, block): |
1123 |
user_filter = force_text(block['sync_ldap_users_filter'] or block['user_filter'])
|
|
1123 |
user_filter = force_str(block['sync_ldap_users_filter'] or block['user_filter'])
|
|
1124 | 1124 |
user_filter = user_filter.replace('%s', '*') |
1125 | 1125 |
return user_filter |
1126 | 1126 | |
... | ... | |
1217 | 1217 |
attribute_map = results[0][1] if results else {} |
1218 | 1218 |
# add mandatory attributes |
1219 | 1219 |
for key, mandatory_values in mandatory_attributes_values.items(): |
1220 |
key = force_text(key)
|
|
1220 |
key = force_str(key)
|
|
1221 | 1221 |
old = attribute_map.setdefault(key, []) |
1222 | 1222 |
new = set(old) | set(mandatory_values) |
1223 | 1223 |
attribute_map[key] = list(new) |
1224 | 1224 |
# apply mappings |
1225 | 1225 |
for from_attribute, to_attribute in attribute_mappings: |
1226 |
from_attribute = force_text(from_attribute)
|
|
1226 |
from_attribute = force_str(from_attribute)
|
|
1227 | 1227 |
if from_attribute not in attribute_map: |
1228 | 1228 |
continue |
1229 |
to_attribute = force_text(to_attribute)
|
|
1229 |
to_attribute = force_str(to_attribute)
|
|
1230 | 1230 |
old = attribute_map.setdefault(to_attribute, []) |
1231 | 1231 |
new = set(old) | set(attribute_map[from_attribute]) |
1232 | 1232 |
attribute_map[to_attribute] = list(new) |
1233 |
attribute_map['dn'] = force_text(dn)
|
|
1233 |
attribute_map['dn'] = force_str(dn)
|
|
1234 | 1234 | |
1235 | 1235 |
# extra attributes |
1236 | 1236 |
attribute_map = cls.get_ldap_extra_attributes(block, conn, dn, attribute_map) |
... | ... | |
1346 | 1346 |
if quote: |
1347 | 1347 |
decoded.append((attribute, urllib.parse.unquote(value))) |
1348 | 1348 |
else: |
1349 |
decoded.append((attribute, force_text(value)))
|
|
1349 |
decoded.append((attribute, force_str(value)))
|
|
1350 | 1350 |
filters = [filter_format('(%s=%s)', (a, b)) for a, b in decoded] |
1351 | 1351 |
return '(&{})'.format(''.join(filters)) |
1352 | 1352 | |
... | ... | |
1647 | 1647 |
log.info('Synchronising users from realm "%s"', block['realm']) |
1648 | 1648 |
conn = cls.get_connection(block) |
1649 | 1649 |
if conn is None: |
1650 |
log.warning('unable to synchronize with LDAP servers %s', force_text(block['url']))
|
|
1650 |
log.warning('unable to synchronize with LDAP servers %s', force_str(block['url']))
|
|
1651 | 1651 |
return |
1652 | 1652 |
cls.check_group_to_role_mappings(block) |
1653 |
user_basedn = force_text(block.get('user_basedn') or block['basedn'])
|
|
1653 |
user_basedn = force_str(block.get('user_basedn') or block['basedn'])
|
|
1654 | 1654 |
user_filter = cls.get_sync_ldap_user_filter(block) |
1655 | 1655 |
attribute_names = cls.get_ldap_attributes_names(block) |
1656 | 1656 |
results = cls.paged_search( |
... | ... | |
1715 | 1715 |
'external_guid', flat=True |
1716 | 1716 |
) |
1717 | 1717 |
) |
1718 |
basedn = force_text(block.get('user_basedn') or block['basedn'])
|
|
1718 |
basedn = force_str(block.get('user_basedn') or block['basedn'])
|
|
1719 | 1719 |
attribute_names = list( |
1720 | 1720 |
{a[0] for a in cls.attribute_name_from_external_id_tuple(block['external_id_tuples'])} |
1721 | 1721 |
| set(USUAL_GUID_ATTRIBUTES) |
... | ... | |
1794 | 1794 |
new_attrs = {'dn': dn} |
1795 | 1795 |
for key in attrs: |
1796 | 1796 |
try: |
1797 |
new_attrs[key.lower()] = [force_text(value, encoding) for value in attrs[key]]
|
|
1797 |
new_attrs[key.lower()] = [force_str(value, encoding) for value in attrs[key]]
|
|
1798 | 1798 |
except UnicodeDecodeError: |
1799 | 1799 |
log.debug('unable to decode attribute %r as UTF-8, converting to base64', key) |
1800 | 1800 |
new_attrs[key.lower()] = [base64.b64encode(value).decode('ascii') for value in attrs[key]] |
... | ... | |
1877 | 1877 |
try: |
1878 | 1878 |
if credentials: |
1879 | 1879 |
who, password = credentials[0], credentials[1] |
1880 |
password = force_text(password)
|
|
1880 |
password = force_str(password)
|
|
1881 | 1881 |
conn.simple_bind_s(who, password) |
1882 | 1882 |
log_message = 'with user %s' % who |
1883 | 1883 |
elif block['bindsasl']: |
... | ... | |
1887 | 1887 |
conn.sasl_interactive_bind_s(who, auth) |
1888 | 1888 |
log_message = 'with account %s' % who |
1889 | 1889 |
elif block['binddn'] and block['bindpw']: |
1890 |
who = force_text(block['binddn'])
|
|
1891 |
conn.simple_bind_s(who, force_text(block['bindpw']))
|
|
1890 |
who = force_str(block['binddn'])
|
|
1891 |
conn.simple_bind_s(who, force_str(block['bindpw']))
|
|
1892 | 1892 |
log_message = 'with binddn %s' % who |
1893 | 1893 |
else: |
1894 | 1894 |
who = 'anonymous' |
... | ... | |
1959 | 1959 |
if not isinstance(block[d], str): |
1960 | 1960 |
raise ImproperlyConfigured('LDAP_AUTH_SETTINGS: attribute %r must be a string' % d) |
1961 | 1961 |
try: |
1962 |
block[d] = force_text(block[d])
|
|
1962 |
block[d] = force_str(block[d])
|
|
1963 | 1963 |
except UnicodeEncodeError: |
1964 | 1964 |
raise ImproperlyConfigured('LDAP_AUTH_SETTINGS: attribute %r must be a string' % d) |
1965 | 1965 |
if isinstance(value, bool) and not isinstance(block[d], bool): |
... | ... | |
1981 | 1981 |
# we handle strings, list of strings and list of list or tuple whose first element is a |
1982 | 1982 |
# string |
1983 | 1983 |
if isinstance(block[key], str): |
1984 |
block[key] = force_text(block[key]).lower()
|
|
1984 |
block[key] = force_str(block[key]).lower()
|
|
1985 | 1985 |
elif isinstance(block[key], (list, tuple)): |
1986 | 1986 |
new_seq = [] |
1987 | 1987 |
for elt in block[key]: |
... | ... | |
1998 | 1998 |
elif isinstance(block[key], dict): |
1999 | 1999 |
newdict = {} |
2000 | 2000 |
for subkey in block[key]: |
2001 |
newdict[force_text(subkey).lower()] = block[key][subkey]
|
|
2001 |
newdict[force_str(subkey).lower()] = block[key][subkey]
|
|
2002 | 2002 |
block[key] = newdict |
2003 | 2003 |
else: |
2004 | 2004 |
raise NotImplementedError( |
... | ... | |
2036 | 2036 |
for block in config: |
2037 | 2037 |
if block['authentication'] is False: |
2038 | 2038 |
continue |
2039 |
if user_external_id.source != force_text(block['realm']):
|
|
2039 |
if user_external_id.source != force_str(block['realm']):
|
|
2040 | 2040 |
continue |
2041 | 2041 |
for external_id_tuple in map_text(block['external_id_tuples']): |
2042 | 2042 |
conn = self.ldap_backend.get_connection(block) |
src/authentic2/csv_import.py | ||
---|---|---|
26 | 26 |
from django.core.validators import RegexValidator |
27 | 27 |
from django.db import IntegrityError, models |
28 | 28 |
from django.db.transaction import atomic |
29 |
from django.utils.encoding import force_bytes, force_text
|
|
29 |
from django.utils.encoding import force_bytes, force_str
|
|
30 | 30 |
from django.utils.translation import gettext as _ |
31 | 31 | |
32 | 32 |
from authentic2 import app_settings |
... | ... | |
79 | 79 |
return self |
80 | 80 | |
81 | 81 |
def __next__(self): |
82 |
return force_text(self.fd.__next__().encode('utf-8'))
|
|
82 |
return force_str(self.fd.__next__().encode('utf-8'))
|
|
83 | 83 | |
84 | 84 |
next = __next__ |
85 | 85 | |
... | ... | |
156 | 156 | |
157 | 157 |
def parse_csv(input_fd): |
158 | 158 |
try: |
159 |
content = force_text(input_fd.read().encode('utf-8'))
|
|
159 |
content = force_str(input_fd.read().encode('utf-8'))
|
|
160 | 160 |
except UnicodeDecodeError: |
161 | 161 |
self.error = Error('bad-encoding', _('Bad encoding')) |
162 | 162 |
return False |
src/authentic2/forms/authentication.py | ||
---|---|---|
22 | 22 |
from django.contrib.auth import forms as auth_forms |
23 | 23 |
from django.forms.widgets import Media |
24 | 24 |
from django.utils import html |
25 |
from django.utils.encoding import force_text
|
|
25 |
from django.utils.encoding import force_str
|
|
26 | 26 |
from django.utils.translation import gettext |
27 | 27 |
from django.utils.translation import gettext_lazy as _ |
28 | 28 | |
... | ... | |
159 | 159 |
invalid_login_message.append(_('Try again or use the forgotten password link below.')) |
160 | 160 |
elif getattr(settings, 'REGISTRATION_OPEN', True): |
161 | 161 |
invalid_login_message.append(_('Try again or create an account.')) |
162 |
error_messages['invalid_login'] = ' '.join([force_text(x) for x in invalid_login_message])
|
|
162 |
error_messages['invalid_login'] = ' '.join([force_str(x) for x in invalid_login_message])
|
|
163 | 163 |
return error_messages |
src/authentic2/forms/widgets.py | ||
---|---|---|
34 | 34 |
from django.forms.widgets import EmailInput as BaseEmailInput |
35 | 35 |
from django.forms.widgets import PasswordInput as BasePasswordInput |
36 | 36 |
from django.forms.widgets import TextInput, TimeInput |
37 |
from django.utils.encoding import force_text
|
|
37 |
from django.utils.encoding import force_str
|
|
38 | 38 |
from django.utils.formats import get_format, get_language |
39 | 39 |
from django.utils.safestring import mark_safe |
40 | 40 |
from django.utils.translation import gettext_lazy as _ |
... | ... | |
225 | 225 |
def format_value(self, value): |
226 | 226 |
if value is not None: |
227 | 227 |
if isinstance(value, datetime.datetime): |
228 |
return force_text(value.isoformat())
|
|
228 |
return force_str(value.isoformat())
|
|
229 | 229 |
return value |
230 | 230 | |
231 | 231 |
src/authentic2/hashers.py | ||
---|---|---|
23 | 23 |
from django.contrib.auth import hashers |
24 | 24 |
from django.contrib.auth.hashers import make_password |
25 | 25 |
from django.utils.crypto import constant_time_compare |
26 |
from django.utils.encoding import force_bytes, force_text
|
|
26 |
from django.utils.encoding import force_bytes, force_str
|
|
27 | 27 |
from django.utils.translation import gettext_noop as _ |
28 | 28 | |
29 | 29 | |
... | ... | |
160 | 160 |
algo_name, salt_offset, hex_encode = OPENLDAP_ALGO_MAPPING[algo] |
161 | 161 |
salt, password = (password[salt_offset:], password[:salt_offset]) if salt_offset else ('', password) |
162 | 162 |
if hex_encode: |
163 |
password = force_text(hexlify(password), encoding='ascii')
|
|
164 |
salt = force_text(hexlify(force_bytes(salt)), encoding='ascii')
|
|
163 |
password = force_str(hexlify(password), encoding='ascii')
|
|
164 |
salt = force_str(hexlify(force_bytes(salt)), encoding='ascii')
|
|
165 | 165 |
return '%s$%s$%s' % (algo_name, salt, password) |
166 | 166 |
else: |
167 | 167 |
return make_password(password) |
... | ... | |
172 | 172 |
assert password |
173 | 173 |
assert b'$' not in salt |
174 | 174 |
hash = self.digest(force_bytes(password + salt)).hexdigest() |
175 |
salt = force_text(hexlify(salt), encoding='ascii')
|
|
175 |
salt = force_str(hexlify(salt), encoding='ascii')
|
|
176 | 176 |
return "%s$%s$%s" % (self.algorithm, salt, hash) |
177 | 177 | |
178 | 178 |
def verify(self, password, encoded): |
... | ... | |
223 | 223 |
assert password |
224 | 224 |
assert b'$' not in salt |
225 | 225 |
hash = self.digest(force_bytes(password) + salt).hexdigest() |
226 |
salt = force_text(hexlify(force_bytes(salt)), encoding='ascii')
|
|
226 |
salt = force_str(hexlify(force_bytes(salt)), encoding='ascii')
|
|
227 | 227 |
return "%s$md5$%s$%s" % (self.algorithm, salt, hash) |
228 | 228 | |
229 | 229 |
def verify(self, password, encoded): |
... | ... | |
247 | 247 |
h, salt = encoded.split(':', 1) |
248 | 248 |
else: |
249 | 249 |
h, salt = encoded, '' |
250 |
salt = force_text(hexlify(force_bytes(salt)), encoding='ascii')
|
|
250 |
salt = force_str(hexlify(force_bytes(salt)), encoding='ascii')
|
|
251 | 251 | |
252 | 252 |
return '%s$md5$%s$%s' % (cls.algorithm, salt, h) |
253 | 253 | |
... | ... | |
259 | 259 |
if subalgo != 'md5': |
260 | 260 |
raise NotImplementedError |
261 | 261 |
if salt: |
262 |
return '%s:%s' % (_hash, force_text(unhexlify(force_bytes(salt))))
|
|
262 |
return '%s:%s' % (_hash, force_str(unhexlify(force_bytes(salt))))
|
|
263 | 263 |
else: |
264 | 264 |
return _hash |
265 | 265 | |
... | ... | |
283 | 283 |
salt = force_bytes(salt) |
284 | 284 | |
285 | 285 |
hashed = base64.b64encode(hashlib.sha1(password + salt).digest() + salt) |
286 |
return "%s$%s%s" % (self.algorithm, self._prefix, force_text(hashed))
|
|
286 |
return "%s$%s%s" % (self.algorithm, self._prefix, force_str(hashed))
|
|
287 | 287 | |
288 | 288 |
def verify(self, password, encoded): |
289 | 289 |
"""Verify the given password against the encoded string.""" |
src/authentic2/idp/saml/saml2_endpoints.py | ||
---|---|---|
50 | 50 |
from django.http import HttpResponse, HttpResponseBadRequest, HttpResponseForbidden, HttpResponseRedirect |
51 | 51 |
from django.shortcuts import redirect, render |
52 | 52 |
from django.urls import reverse |
53 |
from django.utils.encoding import force_bytes, force_str, force_text
|
|
53 |
from django.utils.encoding import force_bytes, force_str |
|
54 | 54 |
from django.utils.timezone import utc |
55 | 55 |
from django.utils.translation import gettext as _ |
56 | 56 |
from django.utils.translation import gettext_noop as N_ |
... | ... | |
271 | 271 |
seen = set() |
272 | 272 |
# Keep current attributes, mark string values as already added |
273 | 273 |
for attribute in attribute_statement.attribute: |
274 |
name = force_text(attribute.name)
|
|
275 |
name_format = force_text(attribute.nameFormat)
|
|
274 |
name = force_str(attribute.name)
|
|
275 |
name_format = force_str(attribute.nameFormat)
|
|
276 | 276 |
# sequence from lasso are always tuple, so convert to list |
277 | 277 |
attributes[(name, name_format)] = attribute, list(attribute.attributeValue) |
278 | 278 |
for atv in attribute.attributeValue: |
... | ... | |
282 | 282 |
and isinstance(atv.any[0], lasso.MiscTextNode) |
283 | 283 |
and atv.any[0].textChild |
284 | 284 |
): |
285 |
seen.add((name, name_format, force_text(atv.any[0].content)))
|
|
285 |
seen.add((name, name_format, force_str(atv.any[0].content)))
|
|
286 | 286 |
definitions = list(qs) |
287 | 287 | |
288 | 288 |
# special handling of nid format edupersontargetedid |
... | ... | |
360 | 360 |
elif value is None: |
361 | 361 |
value = '' |
362 | 362 |
else: |
363 |
value = force_text(value)
|
|
363 |
value = force_str(value)
|
|
364 | 364 |
tn = lasso.MiscTextNode.newWithString(force_str(value)) |
365 | 365 |
tn.textChild = True |
366 | 366 |
return tn |
... | ... | |
369 | 369 |
def saml2_add_attribute_values(assertion, attributes): |
370 | 370 |
if attributes: |
371 | 371 |
logger.debug('adding attributes') |
372 |
logger.debug('assertion before processing %s', force_text(assertion.dump()))
|
|
372 |
logger.debug('assertion before processing %s', force_str(assertion.dump()))
|
|
373 | 373 |
logger.debug('adding attributes %s', attributes) |
374 | 374 |
if not assertion.attributeStatement: |
375 | 375 |
assertion.attributeStatement = [lasso.Saml2AttributeStatement()] |
... | ... | |
404 | 404 |
attribute_value.any = [text_node] |
405 | 405 |
attribute_value_list.append(attribute_value) |
406 | 406 |
attribute.attributeValue = attribute_value_list |
407 |
logger.debug('assertion after processing %s', force_text(assertion.dump()))
|
|
407 |
logger.debug('assertion after processing %s', force_str(assertion.dump()))
|
|
408 | 408 | |
409 | 409 | |
410 | 410 |
def build_assertion(request, login, provider, nid_format='transient'): |
... | ... | |
463 | 463 |
expiry_date = request.session.get_expiry_date() |
464 | 464 |
session_not_on_or_after = timezone_now + (expiry_date - timezone_now) * 0.5 |
465 | 465 |
assertion.authnStatement[0].sessionNotOnOrAfter = datetime_to_xs_datetime(session_not_on_or_after) |
466 |
logger.debug('assertion building in progress %s', force_text(assertion.dump()))
|
|
466 |
logger.debug('assertion building in progress %s', force_str(assertion.dump()))
|
|
467 | 467 |
fill_assertion(request, login.request, assertion, login.remoteProviderId, nid_format) |
468 | 468 |
# Save federation and new session |
469 | 469 |
if nid_format == 'persistent': |
... | ... | |
605 | 605 |
and name_id_policy.format |
606 | 606 |
and name_id_policy.format != lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED |
607 | 607 |
): |
608 |
logger.debug('nameID policy is %s', force_text(name_id_policy.dump()))
|
|
608 |
logger.debug('nameID policy is %s', force_str(name_id_policy.dump()))
|
|
609 | 609 |
nid_format = saml2_urn_to_nidformat(name_id_policy.format, accepted=policy.accepted_name_id_format) |
610 | 610 |
logger.debug('nameID format %s', nid_format) |
611 | 611 |
default_nid_format = policy.default_name_id_format |
... | ... | |
635 | 635 |
the login form was submitted |
636 | 636 |
""" |
637 | 637 |
nonce = login.request.id or get_nonce() |
638 |
save_key_values(nonce, force_text(login.dump()), False, nid_format)
|
|
638 |
save_key_values(nonce, force_str(login.dump()), False, nid_format)
|
|
639 | 639 |
next_url = make_url(continue_sso, params={NONCE_FIELD_NAME: nonce}) |
640 | 640 |
logger.debug('redirect to login page with next url %s', next_url) |
641 | 641 |
return login_require( |
... | ... | |
653 | 653 | |
654 | 654 |
def need_consent_for_federation(request, login, nid_format): |
655 | 655 |
nonce = login.request.id or get_nonce() |
656 |
save_key_values(nonce, force_text(login.dump()), False, nid_format)
|
|
656 |
save_key_values(nonce, force_str(login.dump()), False, nid_format)
|
|
657 | 657 |
display_name = None |
658 | 658 |
try: |
659 | 659 |
provider = LibertyProvider.objects.get(entity_id=login.request.issuer.content) |
... | ... | |
943 | 943 |
logger.debug('ask the user consent now') |
944 | 944 |
return need_consent_for_federation(request, login, nid_format) |
945 | 945 | |
946 |
logger.debug('login dump before processing %s', force_text(login.dump()))
|
|
946 |
logger.debug('login dump before processing %s', force_str(login.dump()))
|
|
947 | 947 |
try: |
948 | 948 |
if needs_persistence(nid_format): |
949 | 949 |
logger.debug('load identity dump') |
950 | 950 |
load_federation(request, get_entity_id(request), login, user) |
951 | 951 |
login.validateRequestMsg(not user.is_anonymous, consent_obtained) |
952 |
logger.debug('validateRequestMsg %s', force_text(login.dump()))
|
|
952 |
logger.debug('validateRequestMsg %s', force_str(login.dump()))
|
|
953 | 953 |
except lasso.LoginRequestDeniedError: |
954 | 954 |
logger.warning('access denied due to LoginRequestDeniedError') |
955 | 955 |
set_saml2_response_responder_status_code(login.response, lasso.SAML2_STATUS_CODE_REQUEST_DENIED) |
... | ... | |
1003 | 1003 |
def save_artifact(request, login): |
1004 | 1004 |
'''Remember an artifact message for later retrieving''' |
1005 | 1005 |
LibertyArtifact( |
1006 |
artifact=login.artifact, content=force_text(login.artifactMessage), provider_id=login.remoteProviderId
|
|
1006 |
artifact=login.artifact, content=force_str(login.artifactMessage), provider_id=login.remoteProviderId
|
|
1007 | 1007 |
).save() |
1008 | 1008 |
logger.debug('artifact saved') |
1009 | 1009 | |
... | ... | |
1465 | 1465 |
provider_id=logout.remoteProviderId, django_session_key=request.session.session_key |
1466 | 1466 |
).delete() |
1467 | 1467 |
# Save some values for cleaning up |
1468 |
save_key_values(logout.request.id, force_text(logout.dump()), request.session.session_key)
|
|
1468 |
save_key_values(logout.request.id, force_str(logout.dump()), request.session.session_key)
|
|
1469 | 1469 | |
1470 | 1470 |
# Use the logout view and come back to the finish slo view |
1471 | 1471 |
next_url = make_url(finish_slo, params={'id': logout.request.id}) |
... | ... | |
1552 | 1552 |
return redirect_next(request, next) or ko_icon(request) |
1553 | 1553 |
return process_logout_response(request, logout, soap_response, next) |
1554 | 1554 |
else: |
1555 |
save_key_values(logout.request.id, force_text(logout.dump()), provider_id, next)
|
|
1555 |
save_key_values(logout.request.id, force_str(logout.dump()), provider_id, next)
|
|
1556 | 1556 |
return HttpResponseRedirect(logout.msgUrl) |
1557 | 1557 | |
1558 | 1558 |
src/authentic2/ldap_utils.py | ||
---|---|---|
19 | 19 | |
20 | 20 |
import ldap.dn |
21 | 21 |
import ldap.filter |
22 |
from django.utils.encoding import force_text
|
|
22 |
from django.utils.encoding import force_str
|
|
23 | 23 | |
24 | 24 | |
25 | 25 |
class DnFormatter(string.Formatter): |
... | ... | |
39 | 39 | |
40 | 40 |
def convert_field(self, value, conversion): |
41 | 41 |
if conversion == 's': |
42 |
return force_text(value)
|
|
42 |
return force_str(value)
|
|
43 | 43 |
return super().convert_field(value, conversion) |
44 | 44 | |
45 | 45 | |
... | ... | |
60 | 60 | |
61 | 61 |
def convert_field(self, value, conversion): |
62 | 62 |
if conversion == 's': |
63 |
return force_text(value)
|
|
63 |
return force_str(value)
|
|
64 | 64 |
return super().convert_field(value, conversion) |
src/authentic2/manager/views.py | ||
---|---|---|
27 | 27 |
from django.forms import MediaDefiningClass |
28 | 28 |
from django.http import Http404, HttpResponse |
29 | 29 |
from django.urls import reverse, reverse_lazy |
30 |
from django.utils.encoding import force_text
|
|
30 |
from django.utils.encoding import force_str
|
|
31 | 31 |
from django.utils.functional import cached_property |
32 | 32 |
from django.utils.timezone import now |
33 | 33 |
from django.utils.translation import gettext_lazy as _ |
... | ... | |
267 | 267 |
data['location'] = location |
268 | 268 |
if hasattr(response, 'render'): |
269 | 269 |
response.render() |
270 |
data['content'] = force_text(response.content)
|
|
270 |
data['content'] = force_str(response.content)
|
|
271 | 271 |
return HttpResponse(json.dumps(data), content_type='application/json') |
272 | 272 | |
273 | 273 | |
... | ... | |
710 | 710 |
# retrieve ldap uri, not directly visible in configuration block |
711 | 711 |
config['ldap_uri'] = conn.get_option(ldap.OPT_URI) |
712 | 712 |
# user filters need to be formatted to ldapsearch syntax |
713 |
config['user_filter'] = force_text(block.get('user_filter'), '').replace('%s', '*')
|
|
713 |
config['user_filter'] = force_str(block.get('user_filter'), '').replace('%s', '*')
|
|
714 | 714 |
config['sync_ldap_users_filter'] = ( |
715 |
force_text(block.get('sync_ldap_users_filter'), '').replace('%s', '*').replace('%s', '*')
|
|
715 |
force_str(block.get('sync_ldap_users_filter'), '').replace('%s', '*').replace('%s', '*')
|
|
716 | 716 |
) |
717 | 717 | |
718 | 718 |
kwargs['ldap_list'].append(config) |
src/authentic2/manager/widgets.py | ||
---|---|---|
20 | 20 |
import pickle |
21 | 21 | |
22 | 22 |
from django.contrib.auth import get_user_model |
23 |
from django.utils.encoding import force_text
|
|
23 |
from django.utils.encoding import force_str
|
|
24 | 24 |
from django_select2.forms import ModelSelect2MultipleWidget, ModelSelect2Widget |
25 | 25 | |
26 | 26 |
from authentic2.a2_rbac.models import Role |
... | ... | |
73 | 73 |
attrs = super().build_attrs(*args, **kwargs) |
74 | 74 |
field_data = { |
75 | 75 |
'class': self.__class__.__name__, |
76 |
'where_clause': force_text(base64.b64encode(pickle.dumps(self.queryset.query.where))),
|
|
76 |
'where_clause': force_str(base64.b64encode(pickle.dumps(self.queryset.query.where))),
|
|
77 | 77 |
} |
78 | 78 |
attrs['data-field_id'] = crypto.dumps(field_data) |
79 | 79 |
return attrs |
src/authentic2/saml/common.py | ||
---|---|---|
26 | 26 |
from django.http import Http404, HttpResponse, HttpResponseRedirect |
27 | 27 |
from django.shortcuts import render |
28 | 28 |
from django.urls import reverse |
29 |
from django.utils.encoding import force_text
|
|
29 |
from django.utils.encoding import force_str
|
|
30 | 30 | |
31 | 31 |
from authentic2.compat_lasso import lasso |
32 | 32 |
from authentic2.http_utils import get_url |
... | ... | |
393 | 393 |
return False |
394 | 394 |
if server: |
395 | 395 |
server.addProviderFromBuffer( |
396 |
lasso.PROVIDER_ROLE_SP, force_text(liberty_provider.metadata.encode('utf8'))
|
|
396 |
lasso.PROVIDER_ROLE_SP, force_str(liberty_provider.metadata.encode('utf8'))
|
|
397 | 397 |
) |
398 | 398 |
policy = get_sp_options_policy(liberty_provider) |
399 | 399 |
if policy: |
src/authentic2/saml/fields.py | ||
---|---|---|
27 | 27 |
from django.core.exceptions import ValidationError |
28 | 28 |
from django.db import models |
29 | 29 |
from django.template.defaultfilters import pluralize |
30 |
from django.utils.encoding import force_bytes, force_text
|
|
30 |
from django.utils.encoding import force_bytes, force_str
|
|
31 | 31 |
from django.utils.text import capfirst |
32 | 32 | |
33 | 33 | |
... | ... | |
39 | 39 | |
40 | 40 | |
41 | 41 |
def dumps(value): |
42 |
return PickledObject(force_text(base64.b64encode(pickle.dumps(value, protocol=0))))
|
|
42 |
return PickledObject(force_str(base64.b64encode(pickle.dumps(value, protocol=0))))
|
|
43 | 43 | |
44 | 44 | |
45 | 45 |
# This is a copy of http://djangosnippets.org/snippets/513/ |
src/authentic2/saml/forms.py | ||
---|---|---|
20 | 20 |
from django import forms |
21 | 21 |
from django.conf import settings |
22 | 22 |
from django.core.exceptions import ValidationError |
23 |
from django.utils.encoding import force_text
|
|
23 |
from django.utils.encoding import force_str
|
|
24 | 24 |
from django.utils.translation import gettext_lazy as _ |
25 | 25 | |
26 | 26 |
from authentic2.a2_rbac.models import OrganizationalUnit |
... | ... | |
52 | 52 |
try: |
53 | 53 |
response = requests.get(url, timeout=settings.REQUESTS_TIMEOUT) |
54 | 54 |
response.raise_for_status() |
55 |
content = force_text(response.content)
|
|
55 |
content = force_str(response.content)
|
|
56 | 56 |
except requests.RequestException as e: |
57 | 57 |
raise ValidationError( |
58 | 58 |
_('Retrieval of %(url)s failed: %(exception)s') % {'url': url, 'exception': e} |
src/authentic2/saml/models.py | ||
---|---|---|
26 | 26 |
from django.db import models |
27 | 27 |
from django.db.models import Q |
28 | 28 |
from django.db.models.query import QuerySet |
29 |
from django.utils.encoding import force_str, force_text
|
|
29 |
from django.utils.encoding import force_str |
|
30 | 30 |
from django.utils.translation import gettext_lazy as _ |
31 | 31 | |
32 | 32 |
from authentic2.compat_lasso import lasso |
... | ... | |
162 | 162 |
] |
163 | 163 |
) |
164 | 164 | |
165 |
NAME_ID_FORMATS_CHOICES = [(force_text(x), y['caption']) for x, y in NAME_ID_FORMATS.items()]
|
|
165 |
NAME_ID_FORMATS_CHOICES = [(force_str(x), y['caption']) for x, y in NAME_ID_FORMATS.items()]
|
|
166 | 166 | |
167 | 167 |
ACCEPTED_NAME_ID_FORMAT_LENGTH = sum(len(x) for x, y in NAME_ID_FORMATS.items()) + len(NAME_ID_FORMATS) - 1 |
168 | 168 | |
... | ... | |
397 | 397 | |
398 | 398 |
def clean(self): |
399 | 399 |
super().clean() |
400 |
p = lasso.Provider.newFromBuffer(lasso.PROVIDER_ROLE_ANY, force_text(self.metadata.encode('utf8')))
|
|
400 |
p = lasso.Provider.newFromBuffer(lasso.PROVIDER_ROLE_ANY, force_str(self.metadata.encode('utf8')))
|
|
401 | 401 |
if p is None: |
402 | 402 |
raise ValidationError(_('Invalid metadata file')) |
403 | 403 |
self.entity_id = p.providerId |
src/authentic2/utils/lazy.py | ||
---|---|---|
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 | 17 | |
18 |
from django.utils.encoding import force_text
|
|
18 |
from django.utils.encoding import force_str
|
|
19 | 19 |
from django.utils.functional import keep_lazy |
20 | 20 |
from django.utils.text import format_lazy |
21 | 21 | |
... | ... | |
34 | 34 | |
35 | 35 |
ex.: lazy_label(_('Default label'), lambda: app_settings.CUSTOM_LABEL) |
36 | 36 |
""" |
37 |
return force_text(func() or default) |
|
37 |
return force_str(func() or default) |
src/authentic2_auth_fc/utils.py | ||
---|---|---|
29 | 29 |
from django.conf import settings |
30 | 30 |
from django.shortcuts import resolve_url |
31 | 31 |
from django.urls import reverse |
32 |
from django.utils.encoding import force_bytes, force_text
|
|
32 |
from django.utils.encoding import force_bytes, force_str
|
|
33 | 33 |
from django.utils.http import urlencode |
34 | 34 |
from django.utils.timezone import now |
35 | 35 |
from django.utils.translation import gettext_lazy as _ |
... | ... | |
281 | 281 |
return None, 'hmac signature does not match' |
282 | 282 |
payload = base64url_decode(str(payload)) |
283 | 283 |
try: |
284 |
payload = json.loads(force_text(payload))
|
|
284 |
payload = json.loads(force_str(payload))
|
|
285 | 285 |
except ValueError: |
286 | 286 |
return None, 'invalid payload' |
287 | 287 |
if client_id and ('aud' not in payload or payload['aud'] != client_id): |
src/authentic2_idp_oidc/utils.py | ||
---|---|---|
23 | 23 | |
24 | 24 |
from django.conf import settings |
25 | 25 |
from django.core.exceptions import ImproperlyConfigured |
26 |
from django.utils.encoding import force_bytes, force_text
|
|
26 |
from django.utils.encoding import force_bytes, force_str
|
|
27 | 27 |
from jwcrypto.jwk import JWK, InvalidJWKValue, JWKSet |
28 | 28 |
from jwcrypto.jwt import JWT |
29 | 29 | |
... | ... | |
97 | 97 |
if client.idtoken_algo == client.ALGO_HMAC: |
98 | 98 |
header = {'alg': 'HS256'} |
99 | 99 |
k = base64url(client.client_secret.encode('utf-8')) |
100 |
jwk = JWK(kty='oct', k=force_text(k))
|
|
100 |
jwk = JWK(kty='oct', k=force_str(k))
|
|
101 | 101 |
elif client.idtoken_algo == client.ALGO_RSA: |
102 | 102 |
header = {'alg': 'RS256'} |
103 | 103 |
jwk = get_first_rsa_sig_key() |
... | ... | |
134 | 134 |
if client.identifier_policy in (client.POLICY_PAIRWISE, client.POLICY_PAIRWISE_REVERSIBLE): |
135 | 135 |
return make_pairwise_sub(client, user, profile=profile) |
136 | 136 |
elif client.identifier_policy == client.POLICY_UUID: |
137 |
return force_text(user.uuid)
|
|
137 |
return force_str(user.uuid)
|
|
138 | 138 |
elif client.identifier_policy == client.POLICY_EMAIL: |
139 | 139 |
return user.email |
140 | 140 |
else: |
src/authentic2_idp_oidc/views.py | ||
---|---|---|
28 | 28 |
from django.http import HttpResponse, HttpResponseNotAllowed, JsonResponse |
29 | 29 |
from django.shortcuts import render |
30 | 30 |
from django.urls import reverse |
31 |
from django.utils.encoding import force_text
|
|
31 |
from django.utils.encoding import force_str
|
|
32 | 32 |
from django.utils.http import urlencode |
33 | 33 |
from django.utils.timezone import now, utc |
34 | 34 |
from django.utils.translation import gettext as _ |
... | ... | |
185 | 185 |
error_description = _('Wrong client secret') |
186 | 186 | |
187 | 187 |
def __init__(self, *args, wrong_id, **kwargs): |
188 |
kwargs['extra_info'] = _('received %s') % force_text(wrong_id)
|
|
188 |
kwargs['extra_info'] = _('received %s') % force_str(wrong_id)
|
|
189 | 189 |
super().__init__(*args, **kwargs) |
190 | 190 | |
191 | 191 | |
... | ... | |
539 | 539 |
if authorization[0] != 'Basic' or len(authorization) != 2: |
540 | 540 |
return None, None |
541 | 541 |
try: |
542 |
decoded = force_text(base64.b64decode(authorization[1]))
|
|
542 |
decoded = force_str(base64.b64decode(authorization[1]))
|
|
543 | 543 |
except Base64Error: |
544 | 544 |
return None, None |
545 | 545 |
parts = decoded.split(':') |
tests/api/test_all.py | ||
---|---|---|
28 | 28 |
from django.contrib.contenttypes.models import ContentType |
29 | 29 |
from django.core import mail |
30 | 30 |
from django.urls import reverse |
31 |
from django.utils.encoding import force_text
|
|
31 |
from django.utils.encoding import force_str
|
|
32 | 32 |
from django.utils.text import slugify |
33 | 33 |
from requests.models import Response |
34 | 34 | |
... | ... | |
107 | 107 |
# login |
108 | 108 |
client.login(request=None, username='john.doe', password='password') |
109 | 109 |
response = client.get('/api/user/', HTTP_ORIGIN='http://testserver') |
110 |
data = json.loads(force_text(response.content))
|
|
110 |
data = json.loads(force_str(response.content))
|
|
111 | 111 |
assert isinstance(data, dict) |
112 | 112 |
assert set(data.keys()) == { |
113 | 113 |
'uuid', |
tests/idp_oidc/test_misc.py | ||
---|---|---|
30 | 30 |
from django.test.utils import override_settings |
31 | 31 |
from django.urls import reverse |
32 | 32 |
from django.utils.dateparse import parse_datetime |
33 |
from django.utils.encoding import force_text
|
|
33 |
from django.utils.encoding import force_str
|
|
34 | 34 |
from django.utils.timezone import now |
35 | 35 |
from jwcrypto.jwk import JWK, JWKSet |
36 | 36 |
from jwcrypto.jwt import JWT |
... | ... | |
325 | 325 |
algs = ['RS256', 'ES256'] |
326 | 326 |
elif oidc_client.idtoken_algo == oidc_client.ALGO_HMAC: |
327 | 327 |
k = base64.b64encode(oidc_client.client_secret.encode('utf-8')) |
328 |
key = JWK(kty='oct', k=force_text(k))
|
|
328 |
key = JWK(kty='oct', k=force_str(k))
|
|
329 | 329 |
algs = ['HS256'] |
330 | 330 |
else: |
331 | 331 |
raise NotImplementedError |
... | ... | |
1037 | 1037 |
algs = ['RS256', 'ES256'] |
1038 | 1038 |
elif oidc_client.idtoken_algo == oidc_client.ALGO_HMAC: |
1039 | 1039 |
k = base64.b64encode(oidc_client.client_secret.encode('utf-8')) |
1040 |
key = JWK(kty='oct', k=force_text(k))
|
|
1040 |
key = JWK(kty='oct', k=force_str(k))
|
|
1041 | 1041 |
algs = ['HS256'] |
1042 | 1042 |
else: |
1043 | 1043 |
raise NotImplementedError |
... | ... | |
1152 | 1152 |
id_token = response.json['id_token'] |
1153 | 1153 | |
1154 | 1154 |
k = base64.b64encode(oidc_client.client_secret.encode('utf-8')) |
1155 |
key = JWK(kty='oct', k=force_text(k))
|
|
1155 |
key = JWK(kty='oct', k=force_str(k))
|
|
1156 | 1156 |
jwt = JWT(jwt=id_token, key=key) |
1157 | 1157 |
claims = json.loads(jwt.claims) |
1158 | 1158 | |
... | ... | |
1255 | 1255 |
id_token = response.json['id_token'] |
1256 | 1256 | |
1257 | 1257 |
k = base64.b64encode(oidc_client.client_secret.encode('utf-8')) |
1258 |
key = JWK(kty='oct', k=force_text(k))
|
|
1258 |
key = JWK(kty='oct', k=force_str(k))
|
|
1259 | 1259 |
jwt = JWT(jwt=id_token, key=key) |
1260 | 1260 |
claims = json.loads(jwt.claims) |
1261 | 1261 | |
... | ... | |
1365 | 1365 |
token_url = make_url('oidc-token') |
1366 | 1366 |
if oidc_client.idtoken_algo == OIDCClient.ALGO_HMAC: |
1367 | 1367 |
k = base64url(oidc_client.client_secret.encode('utf-8')) |
1368 |
jwk = JWK(kty='oct', k=force_text(k))
|
|
1368 |
jwk = JWK(kty='oct', k=force_str(k))
|
|
1369 | 1369 |
elif oidc_client.idtoken_algo == OIDCClient.ALGO_RSA: |
1370 | 1370 |
jwk = get_first_rsa_sig_key() |
1371 | 1371 |
elif oidc_client.idtoken_algo == OIDCClient.ALGO_EC: |
tests/idp_oidc/test_user_profiles.py | ||
---|---|---|
23 | 23 |
import pytest |
24 | 24 |
from django.contrib.auth import get_user_model |
25 | 25 |
from django.urls import reverse |
26 |
from django.utils.encoding import force_text
|
|
26 |
from django.utils.encoding import force_str
|
|
27 | 27 |
from django.utils.timezone import now |
28 | 28 |
from jwcrypto.jwk import JWK |
29 | 29 |
from jwcrypto.jwt import JWT |
... | ... | |
197 | 197 |
assert access_token |
198 | 198 |
id_token = response.json['id_token'] |
199 | 199 |
k = base64.b64encode(oidc_client.client_secret.encode('utf-8')) |
200 |
key = JWK(kty='oct', k=force_text(k))
|
|
200 |
key = JWK(kty='oct', k=force_str(k))
|
|
201 | 201 |
algs = ['HS256'] |
202 | 202 |
jwt = JWT(jwt=id_token, key=key, algs=algs) |
203 | 203 |
claims = json.loads(jwt.claims) |
... | ... | |
256 | 256 |
access_token = query['access_token'][0] |
257 | 257 |
id_token = query['id_token'][0] |
258 | 258 |
k = base64.b64encode(oidc_client.client_secret.encode('utf-8')) |
259 |
key = JWK(kty='oct', k=force_text(k))
|
|
259 |
key = JWK(kty='oct', k=force_str(k))
|
|
260 | 260 |
algs = ['HS256'] |
261 | 261 |
jwt = JWT(jwt=id_token, key=key, algs=algs) |
262 | 262 |
claims = json.loads(jwt.claims) |
... | ... | |
420 | 420 |
) |
421 | 421 |
id_token = response.json['id_token'] |
422 | 422 |
k = base64.b64encode(oidc_client.client_secret.encode('utf-8')) |
423 |
key = JWK(kty='oct', k=force_text(k))
|
|
423 |
key = JWK(kty='oct', k=force_str(k))
|
|
424 | 424 |
algs = ['HS256'] |
425 | 425 |
jwt = JWT(jwt=id_token, key=key, algs=algs) |
426 | 426 |
claims = json.loads(jwt.claims) |
tests/test_all.py | ||
---|---|---|
28 | 28 |
from django.test.client import Client |
29 | 29 |
from django.test.utils import override_settings |
30 | 30 |
from django.urls import reverse |
31 |
from django.utils.encoding import force_text
|
|
31 |
from django.utils.encoding import force_str
|
|
32 | 32 |
from django.utils.translation import gettext as _ |
33 | 33 |
from rest_framework import status, test |
34 | 34 | |
... | ... | |
463 | 463 |
activation_url = get_link_from_mail(mail.outbox[-1]) |
464 | 464 |
response = client.get(activation_url, follow=True) |
465 | 465 |
self.assertEqual(response.status_code, status.HTTP_200_OK) |
466 |
assert utils_misc.make_url(return_url, params={'token': token}) in force_text(response.content)
|
|
466 |
assert utils_misc.make_url(return_url, params={'token': token}) in force_str(response.content)
|
|
467 | 467 |
self.assertEqual(User.objects.count(), user_count + 1) |
468 | 468 |
response = client.get(reverse('auth_homepage')) |
469 | 469 |
self.assertContains(response, username) |
... | ... | |
569 | 569 |
activation_url = get_link_from_mail(mail.outbox[0]) |
570 | 570 |
response = client.get(activation_url, follow=True) |
571 | 571 |
self.assertEqual(response.status_code, status.HTTP_200_OK) |
572 |
assert utils_misc.make_url(return_url, params={'token': token}) in force_text(response.content)
|
|
572 |
assert utils_misc.make_url(return_url, params={'token': token}) in force_str(response.content)
|
|
573 | 573 |
self.assertEqual(User.objects.count(), user_count + 1) |
574 | 574 |
response = client.get(reverse('auth_homepage')) |
575 | 575 |
self.assertContains(response, username) |
... | ... | |
669 | 669 |
activation_url = get_link_from_mail(activation_mail1) |
670 | 670 |
response = client.get(activation_url, follow=True) |
671 | 671 |
self.assertEqual(response.status_code, status.HTTP_200_OK) |
672 |
assert utils_misc.make_url(return_url, params={'token': token}) in force_text(response.content)
|
|
672 |
assert utils_misc.make_url(return_url, params={'token': token}) in force_str(response.content)
|
|
673 | 673 |
self.assertEqual(User.objects.count(), user_count + 1) |
674 | 674 |
response = client.get(reverse('auth_homepage')) |
675 | 675 |
self.assertContains(response, username) |
tests/test_auth_oidc.py | ||
---|---|---|
28 | 28 |
from django.db import IntegrityError, transaction |
29 | 29 |
from django.http import QueryDict |
30 | 30 |
from django.urls import reverse |
31 |
from django.utils.encoding import force_str, force_text
|
|
31 |
from django.utils.encoding import force_str |
|
32 | 32 |
from django.utils.timezone import now, utc |
33 | 33 |
from httmock import HTTMock, urlmatch |
34 | 34 |
from jwcrypto.common import base64url_decode, base64url_encode, json_encode |
... | ... | |
287 | 287 |
else: # hmac |
288 | 288 |
jwt = JWT(header={'alg': 'HS256'}, claims=id_token) |
289 | 289 |
k = base64url_encode(oidc_provider.client_secret.encode('utf-8')) |
290 |
jwt.make_signed_token(JWK(kty='oct', k=force_text(k)))
|
|
290 |
jwt.make_signed_token(JWK(kty='oct', k=force_str(k)))
|
|
291 | 291 | |
292 | 292 |
content = { |
293 | 293 |
'access_token': '1234', |
tests/test_idp_cas.py | ||
---|---|---|
19 | 19 |
from django.contrib.auth import get_user_model |
20 | 20 |
from django.test.client import Client, RequestFactory |
21 | 21 |
from django.test.utils import override_settings |
22 |
from django.utils.encoding import force_text
|
|
22 |
from django.utils.encoding import force_str
|
|
23 | 23 | |
24 | 24 |
from authentic2.a2_rbac.models import Role |
25 | 25 |
from authentic2.a2_rbac.utils import get_default_ou |
... | ... | |
106 | 106 |
client = Client() |
107 | 107 |
response = client.get('/idp/cas/login') |
108 | 108 |
self.assertEqual(response.status_code, 400) |
109 |
self.assertIn('no service', force_text(response.content))
|
|
109 |
self.assertIn('no service', force_str(response.content))
|
|
110 | 110 |
response = client.get('/idp/cas/login', {constants.SERVICE_PARAM: 'http://google.com/'}) |
111 | 111 |
self.assertRedirectsComplex(response, 'http://google.com/') |
112 | 112 |
response = client.get( |
... | ... | |
144 | 144 |
user=self.user, |
145 | 145 |
service=self.service, |
146 | 146 |
) |
147 |
self.assertIn('https://casclient.com/loser/', force_text(response.content))
|
|
147 |
self.assertIn('https://casclient.com/loser/', force_str(response.content))
|
|
148 | 148 | |
149 | 149 |
def test_role_access_control_granted(self): |
150 | 150 |
client = Client() |
... | ... | |
222 | 222 |
) |
223 | 223 |
self.assertEqual(response.status_code, 200) |
224 | 224 |
self.assertEqual(response['content-type'], 'text/plain') |
225 |
self.assertEqual(force_text(response.content), 'yes\n%s\n' % self.LOGIN)
|
|
225 |
self.assertEqual(force_str(response.content), 'yes\n%s\n' % self.LOGIN)
|
|
226 | 226 |
# Verify ticket has been deleted |
227 | 227 |
with self.assertRaises(Ticket.DoesNotExist): |
228 | 228 |
Ticket.objects.get() |
tests/test_idp_saml2.py | ||
---|---|---|
29 | 29 |
from django.core.files import File |
30 | 30 |
from django.template import Context, Template |
31 | 31 |
from django.urls import reverse |
32 |
from django.utils.encoding import force_bytes, force_str, force_text
|
|
32 |
from django.utils.encoding import force_bytes, force_str |
|
33 | 33 |
from django.utils.translation import gettext as _ |
34 | 34 | |
35 | 35 |
from authentic2.a2_rbac.models import OrganizationalUnit, Role |
... | ... | |
418 | 418 |
name_id = login.assertion.subject.nameID |
419 | 419 |
if self.sp.default_name_id_format == 'username': |
420 | 420 |
assert name_id.format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED |
421 |
assert force_text(name_id.content) == user.username
|
|
421 |
assert force_str(name_id.content) == user.username
|
|
422 | 422 |
elif self.sp.default_name_id_format == 'uuid': |
423 | 423 |
assert name_id.format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED |
424 |
assert force_text(name_id.content) == user.uuid
|
|
424 |
assert force_str(name_id.content) == user.uuid
|
|
425 | 425 |
else: |
426 | 426 |
raise NotImplementedError( |
427 | 427 |
'unknown default_name_id_format %s' % self.sp.default_name_id_format |
... | ... | |
677 | 677 |
func.nid_format, |
678 | 678 |
) |
679 | 679 |
return { |
680 |
at.name: {''.join(force_text(mtn.dump()) for mtn in atv.any) for atv in at.attributeValue}
|
|
680 |
at.name: {''.join(force_str(mtn.dump()) for mtn in atv.any) for atv in at.attributeValue}
|
|
681 | 681 |
for at in assertion.attributeStatement[0].attribute |
682 | 682 |
} |
683 | 683 | |
... | ... | |
820 | 820 |
assert edpt is not None |
821 | 821 |
node = lasso.Node.newFromXmlNode(force_str(ET.tostring(edpt))) |
822 | 822 |
assert isinstance(node, lasso.Saml2NameID) |
823 |
assert force_text(node.content) == '_A485C0ACEEF43A6D39145F5CFE25D9D3B6F15DC6443F412263C76D81C72DA8D5'
|
|
823 |
assert force_str(node.content) == '_A485C0ACEEF43A6D39145F5CFE25D9D3B6F15DC6443F412263C76D81C72DA8D5'
|
|
824 | 824 |
assert node.format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT |
825 | 825 | |
826 | 826 |
assert node.nameQualifier == 'http://testserver/idp/saml2/metadata' |
... | ... | |
853 | 853 |
assert len(attributes[edu_name]) == 1 |
854 | 854 |
node = lasso.Node.newFromXmlNode(force_str(list(attributes[edu_name])[0])) |
855 | 855 |
assert isinstance(node, lasso.Saml2NameID) |
856 |
assert force_text(node.content) == '_A485C0ACEEF43A6D39145F5CFE25D9D3B6F15DC6443F412263C76D81C72DA8D5'
|
|
856 |
assert force_str(node.content) == '_A485C0ACEEF43A6D39145F5CFE25D9D3B6F15DC6443F412263C76D81C72DA8D5'
|
|
857 | 857 |
assert node.format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT |
858 | 858 |
assert node.nameQualifier == 'http://testserver/idp/saml2/metadata' |
859 | 859 |
assert node.spNameQualifier == 'https://sp.com/' |
... | ... | |
885 | 885 |
assert len(attributes['edupersontargetedid']) == 1 |
886 | 886 |
node = lasso.Node.newFromXmlNode(force_str(list(attributes['edupersontargetedid'])[0])) |
887 | 887 |
assert isinstance(node, lasso.Saml2NameID) |
888 |
assert force_text(node.content) == '_A485C0ACEEF43A6D39145F5CFE25D9D3B6F15DC6443F412263C76D81C72DA8D5'
|
|
888 |
assert force_str(node.content) == '_A485C0ACEEF43A6D39145F5CFE25D9D3B6F15DC6443F412263C76D81C72DA8D5'
|
|
889 | 889 |
assert node.format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT |
890 | 890 |
assert node.nameQualifier == 'http://testserver/idp/saml2/metadata' |
891 | 891 |
assert node.spNameQualifier == 'https://sp.com/' |
tests/test_ldap.py | ||
---|---|---|
29 | 29 |
from django.core.exceptions import ImproperlyConfigured |
30 | 30 |
from django.urls import reverse |
31 | 31 |
from django.utils import timezone |
32 |
from django.utils.encoding import force_bytes, force_text
|
|
32 |
from django.utils.encoding import force_bytes, force_str
|
|
33 | 33 |
from ldap.dn import escape_dn_chars |
34 | 34 |
from ldaptools.slapd import Slapd, has_slapd |
35 | 35 | |
... | ... | |
392 | 392 |
settings.LDAP_AUTH_SETTINGS = [ |
393 | 393 |
{ |
394 | 394 |
'url': [slapd.ldap_url], |
395 |
'binddn': force_text(DN),
|
|
395 |
'binddn': force_str(DN),
|
|
396 | 396 |
'bindpw': PASS, |
397 | 397 |
'basedn': 'o=ôrga', |
398 | 398 |
'use_tls': False, |
... | ... | |
1004 | 1004 |
settings.LDAP_AUTH_SETTINGS = [ |
1005 | 1005 |
{ |
1006 | 1006 |
'url': [slapd.ldap_url], |
1007 |
'binddn': force_text(slapd.root_bind_dn),
|
|
1008 |
'bindpw': force_text(slapd.root_bind_password),
|
|
1007 |
'binddn': force_str(slapd.root_bind_dn),
|
|
1008 |
'bindpw': force_str(slapd.root_bind_password),
|
|
1009 | 1009 |
'basedn': 'o=ôrga', |
1010 | 1010 |
'use_tls': False, |
1011 | 1011 |
'attributes': ['uid', 'carLicense'], |
... | ... | |
1069 | 1069 |
settings.LDAP_AUTH_SETTINGS = [ |
1070 | 1070 |
{ |
1071 | 1071 |
'url': [slapd.ldap_url], |
1072 |
'binddn': force_text(slapd.root_bind_dn),
|
|
1073 |
'bindpw': force_text(slapd.root_bind_password),
|
|
1072 |
'binddn': force_str(slapd.root_bind_dn),
|
|
1073 |
'bindpw': force_str(slapd.root_bind_password),
|
|
1074 | 1074 |
'basedn': 'o=ôrga', |
1075 | 1075 |
'use_tls': False, |
1076 | 1076 |
'attributes': ['uid', 'carLicense'], |
... | ... | |
1106 | 1106 |
settings.LDAP_AUTH_SETTINGS = [ |
1107 | 1107 |
{ |
1108 | 1108 |
'url': [slapd.ldap_url], |
1109 |
'binddn': force_text(slapd.root_bind_dn),
|
|
1110 |
'bindpw': force_text(slapd.root_bind_password),
|
|
1109 |
'binddn': force_str(slapd.root_bind_dn),
|
|
1110 |
'bindpw': force_str(slapd.root_bind_password),
|
|
1111 | 1111 |
'basedn': 'o=ôrga', |
1112 | 1112 |
'use_tls': False, |
1113 | 1113 |
'user_can_change_password': False, |
... | ... | |
1669 | 1669 |
settings.LDAP_AUTH_SETTINGS = [ |
1670 | 1670 |
{ |
1671 | 1671 |
'url': [slapd_ppolicy.ldap_url], |
1672 |
'binddn': force_text(slapd_ppolicy.root_bind_dn),
|
|
1673 |
'bindpw': force_text(slapd_ppolicy.root_bind_password),
|
|
1672 |
'binddn': force_str(slapd_ppolicy.root_bind_dn),
|
|
1673 |
'bindpw': force_str(slapd_ppolicy.root_bind_password),
|
|
1674 | 1674 |
'basedn': 'o=ôrga', |
1675 | 1675 |
'use_tls': False, |
1676 | 1676 |
'attributes': ['carLicense'], |
... | ... | |
1779 | 1779 |
settings.LDAP_AUTH_SETTINGS = [ |
1780 | 1780 |
{ |
1781 | 1781 |
'url': [slapd.ldap_url], |
1782 |
'binddn': force_text(DN),
|
|
1782 |
'binddn': force_str(DN),
|
|
1783 | 1783 |
'bindpw': PASS, |
1784 | 1784 |
'basedn': 'o=ôrga', |
1785 | 1785 |
'ou_slug': ou1.slug, |
... | ... | |
1810 | 1810 |
settings.LDAP_AUTH_SETTINGS = [ |
1811 | 1811 |
{ |
1812 | 1812 |
'url': [slapd.ldap_url], |
1813 |
'binddn': force_text(DN),
|
|
1813 |
'binddn': force_str(DN),
|
|
1814 | 1814 |
'bindpw': PASS, |
1815 | 1815 |
'basedn': 'o=ôrga', |
1816 | 1816 |
'use_tls': False, |
... | ... | |
2119 | 2119 |
settings.LDAP_AUTH_SETTINGS = [ |
2120 | 2120 |
{ |
2121 | 2121 |
'url': [slapd.ldap_url], |
2122 |
'binddn': force_text(slapd.root_bind_dn),
|
|
2123 |
'bindpw': force_text(slapd.root_bind_password),
|
|
2122 |
'binddn': force_str(slapd.root_bind_dn),
|
|
2123 |
'bindpw': force_str(slapd.root_bind_password),
|
|
2124 | 2124 |
'basedn': 'o=ôrga', |
2125 | 2125 |
'use_tls': False, |
2126 | 2126 |
'attributes': ['carLicense'], |
... | ... | |
2210 | 2210 |
settings.LDAP_AUTH_SETTINGS = [ |
2211 | 2211 |
{ |
2212 | 2212 |
'url': [slapd.ldap_url], |
2213 |
'binddn': force_text('cn=%s,o=ôrga' % escape_dn_chars('Étienne Michu')),
|
|
2213 |
'binddn': force_str('cn=%s,o=ôrga' % escape_dn_chars('Étienne Michu')),
|
|
2214 | 2214 |
'bindpw': 'passé', |
2215 | 2215 |
'basedn': 'o=ôrga', |
2216 | 2216 |
'use_tls': False, |
tests/test_role_manager.py | ||
---|---|---|
18 | 18 | |
19 | 19 |
import django_tables2 as tables |
20 | 20 |
from django.urls import reverse |
21 |
from django.utils.encoding import force_bytes, force_text
|
|
21 |
from django.utils.encoding import force_bytes, force_str
|
|
22 | 22 |
from webtest import Upload |
23 | 23 | |
24 | 24 |
from authentic2.a2_rbac.models import OrganizationalUnit, Role |
... | ... | |
44 | 44 | |
45 | 45 |
export_response = response.click('CSV', href='/export/') |
46 | 46 |
reader = csv.reader( |
47 |
[force_text(line) for line in export_response.body.split(force_bytes('\r\n'))], delimiter=','
|
|
47 |
[force_str(line) for line in export_response.body.split(force_bytes('\r\n'))], delimiter=','
|
|
48 | 48 |
) |
49 | 49 |
rows = [row for row in reader] |
50 | 50 | |
... | ... | |
65 | 65 | |
66 | 66 |
export_response = search_response.click('CSV', href='/export/') |
67 | 67 |
reader = csv.reader( |
68 |
[force_text(line) for line in export_response.body.split(force_bytes('\r\n'))], delimiter=','
|
|
68 |
[force_str(line) for line in export_response.body.split(force_bytes('\r\n'))], delimiter=','
|
|
69 | 69 |
) |
70 | 70 |
rows = [row for row in reader] |
71 | 71 |
tests/utils.py | ||
---|---|---|
48 | 48 |
from django.shortcuts import resolve_url |
49 | 49 |
from django.test import TestCase |
50 | 50 |
from django.urls import reverse |
51 |
from django.utils.encoding import force_text, iri_to_uri
|
|
51 |
from django.utils.encoding import force_str, iri_to_uri
|
|
52 | 52 |
from lxml import etree |
53 | 53 | |
54 | 54 |
from authentic2 import models |
... | ... | |
100 | 100 |
def basic_authorization_header(user, password=None): |
101 | 101 |
cred = '%s:%s' % (user.username, password or user.username) |
102 | 102 |
b64_cred = base64.b64encode(cred.encode('utf-8')) |
103 |
return {'Authorization': 'Basic %s' % str(force_text(b64_cred))}
|
|
103 |
return {'Authorization': 'Basic %s' % str(force_str(b64_cred))}
|
|
104 | 104 | |
105 | 105 | |
106 | 106 |
def get_response_form(response, form='form'): |
107 |
- |