0050-misc-fix-unused-variable-pylint-error-56982.patch
src/authentic2/a2_rbac/models.py | ||
---|---|---|
296 | 296 |
op = MANAGE_MEMBERS_OP |
297 | 297 |
Permission = rbac_utils.get_permission_model() |
298 | 298 |
operation = rbac_utils.get_operation(op) |
299 |
self_perm, created = Permission.objects.get_or_create(
|
|
299 |
self_perm, dummy = Permission.objects.get_or_create(
|
|
300 | 300 |
operation=operation, |
301 | 301 |
target_ct=ContentType.objects.get_for_model(self), |
302 | 302 |
target_id=self.pk, |
... | ... | |
310 | 310 |
op = MANAGE_MEMBERS_OP |
311 | 311 |
Permission = rbac_utils.get_permission_model() |
312 | 312 |
operation = rbac_utils.get_operation(op) |
313 |
self_perm, created = Permission.objects.get_or_create(
|
|
313 |
self_perm, dummy = Permission.objects.get_or_create(
|
|
314 | 314 |
operation=operation, target_ct=ContentType.objects.get_for_model(self), target_id=self.pk |
315 | 315 |
) |
316 | 316 |
self.permissions.through.objects.get_or_create(role=self, permission=self_perm) |
src/authentic2/a2_rbac/signal_handlers.py | ||
---|---|---|
34 | 34 |
if OrganizationalUnit.objects.exists(): |
35 | 35 |
return |
36 | 36 |
# Create a default OU if none exists currently |
37 |
default_ou, created = OrganizationalUnit.objects.get_or_create(
|
|
37 |
default_ou, dummy = OrganizationalUnit.objects.get_or_create(
|
|
38 | 38 |
slug='default', |
39 | 39 |
defaults={ |
40 | 40 |
'default': True, |
src/authentic2/a2_rbac/utils.py | ||
---|---|---|
33 | 33 |
def get_view_user_perm(ou=None): |
34 | 34 |
User = get_user_model() |
35 | 35 |
Permission = rbac_utils.get_permission_model() |
36 |
view_user_perm, created = Permission.objects.get_or_create(
|
|
36 |
view_user_perm, dummy = Permission.objects.get_or_create(
|
|
37 | 37 |
operation=rbac_utils.get_operation(VIEW_OP), |
38 | 38 |
target_ct=ContentType.objects.get_for_model(ContentType), |
39 | 39 |
target_id=ContentType.objects.get_for_model(User).pk, |
... | ... | |
46 | 46 |
def get_search_ou_perm(ou=None): |
47 | 47 |
if ou: |
48 | 48 |
Permission = rbac_utils.get_permission_model() |
49 |
view_ou_perm, created = Permission.objects.get_or_create(
|
|
49 |
view_ou_perm, dummy = Permission.objects.get_or_create(
|
|
50 | 50 |
operation=rbac_utils.get_operation(SEARCH_OP), |
51 | 51 |
target_ct=ContentType.objects.get_for_model(ou), |
52 | 52 |
target_id=ou.pk, |
... | ... | |
55 | 55 |
else: |
56 | 56 |
OU = rbac_utils.get_ou_model() |
57 | 57 |
Permission = rbac_utils.get_permission_model() |
58 |
view_ou_perm, created = Permission.objects.get_or_create(
|
|
58 |
view_ou_perm, dummy = Permission.objects.get_or_create(
|
|
59 | 59 |
operation=rbac_utils.get_operation(SEARCH_OP), |
60 | 60 |
target_ct=ContentType.objects.get_for_model(ContentType), |
61 | 61 |
target_id=ContentType.objects.get_for_model(OU).pk, |
... | ... | |
67 | 67 |
def get_manage_authorizations_user_perm(ou=None): |
68 | 68 |
User = get_user_model() |
69 | 69 |
Permission = rbac_utils.get_permission_model() |
70 |
manage_authorizations_user_perm, created = Permission.objects.get_or_create(
|
|
70 |
manage_authorizations_user_perm, dummy = Permission.objects.get_or_create(
|
|
71 | 71 |
operation=rbac_utils.get_operation(models.MANAGE_AUTHORIZATIONS_OP), |
72 | 72 |
target_ct=ContentType.objects.get_for_model(ContentType), |
73 | 73 |
target_id=ContentType.objects.get_for_model(User).pk, |
src/authentic2/api_views.py | ||
---|---|---|
1079 | 1079 |
for authenticator in self.get_authenticators(): |
1080 | 1080 |
if hasattr(authenticator, 'authenticate_credentials'): |
1081 | 1081 |
try: |
1082 |
user, oidc_client = authenticator.authenticate_credentials( |
|
1082 |
user, dummy_oidc_client = authenticator.authenticate_credentials(
|
|
1083 | 1083 |
username, password, request=request |
1084 | 1084 |
) |
1085 | 1085 |
result['result'] = 1 |
src/authentic2/apps/journal/models.py | ||
---|---|---|
166 | 166 | |
167 | 167 |
@GlobalCache |
168 | 168 |
def event_type_cache(name): |
169 |
event_type, created = EventType.objects.get_or_create(name=name)
|
|
169 |
event_type, dummy = EventType.objects.get_or_create(name=name)
|
|
170 | 170 |
return event_type |
171 | 171 | |
172 | 172 | |
... | ... | |
419 | 419 |
else: |
420 | 420 |
yield None |
421 | 421 |
count += 1 |
422 |
for i in range(len(reference_types) - count):
|
|
422 |
for dummy in range(len(reference_types) - count):
|
|
423 | 423 |
yield None |
424 | 424 | |
425 | 425 |
class Meta: |
src/authentic2/authenticators.py | ||
---|---|---|
80 | 80 |
.annotate(count=Count('ou')) |
81 | 81 |
.order_by('-count') |
82 | 82 |
) |
83 |
for ou_id, count in qs: |
|
83 |
for ou_id, dummy_count in qs:
|
|
84 | 84 |
if not ou_id: |
85 | 85 |
continue |
86 | 86 |
service_ou_ids.append(ou_id) |
src/authentic2/backends/ldap_backend.py | ||
---|---|---|
691 | 691 |
return |
692 | 692 |
for conn in cls.get_connections(block): |
693 | 693 |
existing_groups = cls.get_groups_dns(conn, block) |
694 |
for group_dn, role_slugs in group_to_role_mapping: |
|
694 |
for group_dn, dummy_role_slugs in group_to_role_mapping:
|
|
695 | 695 |
if group_dn in existing_groups: |
696 | 696 |
continue |
697 | 697 |
log.warning('ldap: unknown group "%s" mapped to a role', group_dn) |
... | ... | |
828 | 828 |
else: |
829 | 829 |
log.warning( |
830 | 830 |
'could not rebind after a bind failure, unable to attach the error to the' |
831 |
' user' |
|
831 |
' user (error %s)', |
|
832 |
error, |
|
832 | 833 |
) |
833 | 834 |
user_login_failure(authz_id) |
834 | 835 |
else: |
... | ... | |
1043 | 1044 |
if create is None: |
1044 | 1045 |
create = block['create_group'] |
1045 | 1046 |
if create: |
1046 |
group, created = Group.objects.get_or_create(name=group_name)
|
|
1047 |
group, dummy = Group.objects.get_or_create(name=group_name)
|
|
1047 | 1048 |
return group |
1048 | 1049 |
else: |
1049 | 1050 |
try: |
... | ... | |
1176 | 1177 |
attributes.add(block[field]) |
1177 | 1178 |
for external_id_tuple in map_text(block['external_id_tuples']): |
1178 | 1179 |
attributes.update(cls.attribute_name_from_external_id_tuple(external_id_tuple)) |
1179 |
for from_at, to_at in map_text(block['attribute_mappings']): |
|
1180 |
for dummy_from_at, to_at in map_text(block['attribute_mappings']):
|
|
1180 | 1181 |
attributes.add(to_at) |
1181 | 1182 |
for mapping in block['user_attributes']: |
1182 | 1183 |
from_ldap = mapping.get('from_ldap') |
... | ... | |
1467 | 1468 |
user._changed = False |
1468 | 1469 |
external_id = self.build_external_id(map_text(block['external_id_tuples'][0]), attributes) |
1469 | 1470 |
if external_id: |
1470 |
new, created = UserExternalId.objects.get_or_create(
|
|
1471 |
new, dummy = UserExternalId.objects.get_or_create(
|
|
1471 | 1472 |
user=user, external_id=external_id, source=force_text(block['realm']) |
1472 | 1473 |
) |
1473 | 1474 |
if block['clean_external_id_on_update']: |
... | ... | |
1538 | 1539 |
while first_pass or pg_ctrl.cookie: |
1539 | 1540 |
first_pass = False |
1540 | 1541 |
msgid = conn.search_ext(*args, serverctrls=[pg_ctrl], **kwargs) |
1541 |
result_type, data, msgid, serverctrls = conn.result3(msgid) |
|
1542 |
dummy_result_type, data, msgid, serverctrls = conn.result3(msgid)
|
|
1542 | 1543 |
pg_ctrl.cookie = serverctrls[0].cookie |
1543 | 1544 |
yield from cls.normalize_ldap_results(data) |
1544 | 1545 | |
... | ... | |
1785 | 1786 |
context = ssl.create_default_context() |
1786 | 1787 |
try: |
1787 | 1788 |
with socket.create_connection((hostname, port), timeout=2) as sock: |
1788 |
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
|
|
1789 |
with context.wrap_socket(sock, server_hostname=hostname): |
|
1789 | 1790 |
pass |
1790 | 1791 |
except (socket.herror, socket.gaierror) as e: |
1791 | 1792 |
return False, 'socket address error on host %s: %s' % (hostname, e) |
src/authentic2/backends/models_backend.py | ||
---|---|---|
54 | 54 |
queries.append(models.Q(**{username_field: username})) |
55 | 55 |
if '@' not in username: |
56 | 56 |
if app_settings.REALMS: |
57 |
for realm, desc in app_settings.REALMS:
|
|
57 |
for realm, dummy in app_settings.REALMS:
|
|
58 | 58 |
queries.append(models.Q(**{username_field: upn(username, realm)})) |
59 | 59 |
else: |
60 | 60 |
queries.append(models.Q(**{username_field: upn(username, realm)})) |
src/authentic2/csv_import.py | ||
---|---|---|
268 | 268 |
def clean_password_hash(self): |
269 | 269 |
password_hash = self.cleaned_data['password_hash'] |
270 | 270 |
try: |
271 |
hasher = identify_hasher(password_hash)
|
|
271 |
identify_hasher(password_hash) |
|
272 | 272 |
except ValueError: |
273 | 273 |
raise ValidationError(_('Invalid password format or unknown hashing algorithm.')) |
274 | 274 |
return password_hash |
... | ... | |
485 | 485 |
pass |
486 | 486 |
if not header.field: |
487 | 487 |
try: |
488 |
attribute = Attribute.objects.get(name=header.name) # NOQA: F841
|
|
488 |
Attribute.objects.get(name=header.name)
|
|
489 | 489 |
header.attribute = True |
490 | 490 |
except Attribute.DoesNotExist: |
491 | 491 |
pass |
src/authentic2/custom_user/apps.py | ||
---|---|---|
50 | 50 |
content_type = ContentType.objects.get_for_model(User) |
51 | 51 | |
52 | 52 |
attrs = {} |
53 |
attrs['first_name'], created = Attribute.objects.get_or_create(
|
|
53 |
attrs['first_name'], dummy = Attribute.objects.get_or_create(
|
|
54 | 54 |
name='first_name', |
55 | 55 |
defaults={ |
56 | 56 |
'kind': 'string', |
... | ... | |
61 | 61 |
'user_visible': True, |
62 | 62 |
}, |
63 | 63 |
) |
64 |
attrs['last_name'], created = Attribute.objects.get_or_create(
|
|
64 |
attrs['last_name'], dummy = Attribute.objects.get_or_create(
|
|
65 | 65 |
name='last_name', |
66 | 66 |
defaults={ |
67 | 67 |
'kind': 'string', |
... | ... | |
76 | 76 |
serialize = get_kind('string').get('serialize') |
77 | 77 |
for user in User.objects.all(): |
78 | 78 |
for attr_name, value in attrs.items(): |
79 |
av, created = AttributeValue.objects.get_or_create(
|
|
79 |
AttributeValue.objects.get_or_create( |
|
80 | 80 |
content_type=content_type, |
81 | 81 |
object_id=user.id, |
82 | 82 |
attribute=value, |
src/authentic2/custom_user/migrations/0001_initial.py | ||
---|---|---|
43 | 43 |
assert old_user.id == new_user.id |
44 | 44 |
for group in old_user.groups.all(): |
45 | 45 |
new_groups.append(GroupThrough(user_id=new_user.id, group_id=group.id)) |
46 |
for permission in old_user.user_permissions.all():
|
|
46 |
for dummy in old_user.user_permissions.all():
|
|
47 | 47 |
new_permissions.append(PermissionThrough(user_id=new_user.id, group_id=group.id)) |
48 | 48 |
# mass create group and permission relations |
49 | 49 |
GroupThrough.objects.bulk_create(new_groups) |
src/authentic2/forms/registration.py | ||
---|---|---|
144 | 144 |
if commit and app_settings.A2_REGISTRATION_GROUPS: |
145 | 145 |
groups = [] |
146 | 146 |
for name in app_settings.A2_REGISTRATION_GROUPS: |
147 |
group, created = Group.objects.get_or_create(name=name)
|
|
147 |
group, dummy = Group.objects.get_or_create(name=name)
|
|
148 | 148 |
groups.append(group) |
149 | 149 |
user.groups = groups |
150 | 150 |
return user |
src/authentic2/hashers.py | ||
---|---|---|
74 | 74 |
return '%s$%s$%s$%s' % (self.algorithm, count, salt, h) |
75 | 75 | |
76 | 76 |
def to_drupal(self, encoded): |
77 |
algo, count, salt, h = encoded.split('$', 3) |
|
77 |
dummy_algo, count, salt, h = encoded.split('$', 3)
|
|
78 | 78 |
count = self.i64toa(math.ceil(math.log(int(count), 2))) |
79 | 79 |
return '$S$%s%s%s' % (count, salt, h) |
80 | 80 | |
... | ... | |
83 | 83 |
assert salt and '$' not in salt |
84 | 84 |
h = salt.encode() |
85 | 85 |
password = password.encode() |
86 |
for i in range(iterations + 1):
|
|
86 |
for dummy in range(iterations + 1):
|
|
87 | 87 |
h = self.digest(h + password).digest() |
88 | 88 |
return "%s$%d$%s$%s" % (self.algorithm, iterations, salt, self.b64encode(h)[:43]) |
89 | 89 | |
90 | 90 |
def verify(self, password, encoded): |
91 |
algorithm, iterations, salt, hash = encoded.split('$', 3)
|
|
91 |
algorithm, iterations, salt, dummy = encoded.split('$', 3)
|
|
92 | 92 |
assert algorithm == self.algorithm |
93 | 93 |
encoded_2 = self.encode(password, salt, int(iterations)) |
94 | 94 |
return constant_time_compare(encoded, encoded_2) |
... | ... | |
121 | 121 |
return "%s$%s$%s" % (self.algorithm, salt, hash) |
122 | 122 | |
123 | 123 |
def verify(self, password, encoded): |
124 |
algorithm, salt, hash = encoded.split('$', 2) |
|
124 |
algorithm, salt, dummy_hash = encoded.split('$', 2)
|
|
125 | 125 |
assert algorithm == self.algorithm |
126 | 126 |
encoded_2 = self.encode(password, salt) |
127 | 127 |
return constant_time_compare(encoded, encoded_2) |
... | ... | |
227 | 227 |
return "%s$md5$%s$%s" % (self.algorithm, salt, hash) |
228 | 228 | |
229 | 229 |
def verify(self, password, encoded): |
230 |
algorithm, subalgo, salt, hash = encoded.split('$', 3)
|
|
230 |
algorithm, dummy_subalgo, salt, dummy_hash = encoded.split('$', 3)
|
|
231 | 231 |
salt = unhexlify(salt) |
232 | 232 |
if algorithm != self.algorithm: |
233 | 233 |
raise ValueError('not a joomla encoded password') |
src/authentic2/management/commands/check-and-repair.py | ||
---|---|---|
121 | 121 |
with tenant_context(tenant): |
122 | 122 |
self.tenant_shown = False |
123 | 123 |
self.output = False |
124 |
with fake_atomic(faked=self.fake) as faked:
|
|
124 |
with fake_atomic(faked=self.fake): |
|
125 | 125 |
self.check_and_repair(options) |
126 | 126 |
if self.fake and self.output: |
127 | 127 |
self.success('Faked!') |
... | ... | |
233 | 233 |
if targets_with_duplicates: |
234 | 234 |
self.warning('Found %d manage members permissions with duplicates', len(targets_with_duplicates)) |
235 | 235 |
if self.repair: |
236 |
for operation_id, target_ct_id, target_id in targets_with_duplicates: |
|
236 |
for dummy_operation_id, target_ct_id, target_id in targets_with_duplicates:
|
|
237 | 237 |
qs = Permission.objects.filter(target_ct_id=target_ct_id, target_id=target_id) |
238 | 238 |
for perm in qs: |
239 | 239 |
linked_admin_role = Role.objects.get( |
... | ... | |
245 | 245 |
self.notice(' - %s: [%s]', target, '; '.join(map(str, qs))) |
246 | 246 |
if self.do_repair(): |
247 | 247 |
self.notice('Deleting duplicate manage members permissions...', ending=' ') |
248 |
for operation_id, target_ct_id, target_id in targets_with_duplicates: |
|
248 |
for dummy_operation_id, target_ct_id, target_id in targets_with_duplicates:
|
|
249 | 249 |
qs = Permission.objects.filter(target_ct_id=target_ct_id, target_id=target_id).order_by( |
250 | 250 |
'id' |
251 | 251 |
) |
src/authentic2/management/commands/sync-ldap-users.py | ||
---|---|---|
55 | 55 |
elif verbosity == 3: |
56 | 56 |
ldap_logger.setLevel(logging.DEBUG) |
57 | 57 | |
58 |
for user in LDAPBackend.get_users():
|
|
58 |
for dummy in LDAPBackend.get_users():
|
|
59 | 59 |
continue |
src/authentic2/manager/forms.py | ||
---|---|---|
97 | 97 |
def __init__(self, *args, **kwargs): |
98 | 98 |
super().__init__(*args, **kwargs) |
99 | 99 |
if self.request and not self.request.user.is_anonymous: |
100 |
for name, field in self.fields.items():
|
|
100 |
for field in self.fields.values():
|
|
101 | 101 |
qs = getattr(field, 'queryset', None) |
102 | 102 |
if not qs: |
103 | 103 |
continue |
src/authentic2/manager/role_views.py | ||
---|---|---|
323 | 323 |
action = form.cleaned_data.get('action') |
324 | 324 |
Permission = get_permission_model() |
325 | 325 |
if action == 'add' and operation and target: |
326 |
perm, created = Permission.objects.get_or_create(
|
|
326 |
perm, dummy = Permission.objects.get_or_create(
|
|
327 | 327 |
operation=operation, |
328 | 328 |
ou=ou, |
329 | 329 |
target_ct=ContentType.objects.get_for_model(target), |
src/authentic2/manager/user_views.py | ||
---|---|---|
966 | 966 |
qs = OIDCAuthorization.objects.filter(user=self.get_object()) |
967 | 967 |
qs = qs.filter(id=auth_id.pk) |
968 | 968 |
oidc_authorization = qs.first() |
969 |
count, cascade = qs.delete()
|
|
969 |
count, dummy = qs.delete()
|
|
970 | 970 |
if count: |
971 | 971 |
self.request.journal.record( |
972 | 972 |
'manager.user.sso.authorization.deletion', |
src/authentic2/migrations/0031_add_search_vector_to_attributes.py | ||
---|---|---|
6 | 6 |
def create_trigger(apps, schema_editor): |
7 | 7 |
with schema_editor.connection.cursor() as cursor: |
8 | 8 |
cursor.execute('SHOW default_text_search_config') |
9 |
(default_text_search_config,) = cursor.fetchone()
|
|
9 |
assert cursor.fetchone()
|
|
10 | 10 |
cursor.execute( |
11 | 11 |
'''CREATE OR REPLACE FUNCTION authentic2_update_atv_search_vector() RETURNS TRIGGER AS $$ |
12 | 12 |
BEGIN |
src/authentic2/migrations/__init__.py | ||
---|---|---|
82 | 82 |
def database_backwards(self, app_label, schema_editor, from_state, to_state): |
83 | 83 |
if not self.allowed(app_label, schema_editor, to_state): |
84 | 84 |
return |
85 |
for i, (null_columns, non_null_columns) in enumerate(self.indexes()):
|
|
85 |
for i in range(self.indexes()):
|
|
86 | 86 |
schema_editor.execute('DROP INDEX IF EXISTS "%s_%s"' % (self.index_name, i)) |
87 | 87 | |
88 | 88 |
def describe(self): |
src/authentic2/models.py | ||
---|---|---|
421 | 421 |
raise ServiceAccessDenied(service=self) |
422 | 422 | |
423 | 423 |
def add_authorized_role(self, role): |
424 |
authorization, created = AuthorizedRole.objects.get_or_create(service=self, role=role)
|
|
424 |
authorization, dummy = AuthorizedRole.objects.get_or_create(service=self, role=role)
|
|
425 | 425 |
return authorization |
426 | 426 | |
427 | 427 |
def remove_authorized_role(self, role): |
src/authentic2/saml/management/commands/mapping.py | ||
---|---|---|
43 | 43 | |
44 | 44 | |
45 | 45 |
def get_definition_from_oid(oid): |
46 |
for key, value in ATTRIBUTE_MAPPING.items():
|
|
46 |
for value in ATTRIBUTE_MAPPING.values():
|
|
47 | 47 |
if value['oid'] == oid: |
48 | 48 |
return value |
49 | 49 | |
... | ... | |
55 | 55 | |
56 | 56 | |
57 | 57 |
def get_definition_from_alias(alias): |
58 |
for key, value in ATTRIBUTE_MAPPING.items():
|
|
58 |
for value in ATTRIBUTE_MAPPING.values():
|
|
59 | 59 |
if 'alias' in value and alias in value['alias']: |
60 | 60 |
return value |
61 | 61 |
src/authentic2/saml/management/commands/sync-metadata.py | ||
---|---|---|
126 | 126 |
is_required = ra.get(IS_REQUIRED, 'false') == 'true' |
127 | 127 |
if name_format != lasso.SAML2_ATTRIBUTE_NAME_FORMAT_URI: |
128 | 128 |
continue |
129 |
def_name, defn = resolve_urn_oid(oid)
|
|
129 |
def_name, dummy = resolve_urn_oid(oid)
|
|
130 | 130 |
if def_name is None: |
131 | 131 |
warnings.warn( |
132 | 132 |
'attribute %s/%s unsupported on service provider %s' |
src/authentic2/saml/managers.py | ||
---|---|---|
49 | 49 |
def cleanup(self): |
50 | 50 |
for federation in self.filter(user__isnull=True): |
51 | 51 |
results = federation_delete.send_robust(sender=federation) |
52 |
for callback, result in results: |
|
52 |
for dummy_callback, result in results:
|
|
53 | 53 |
if not result: |
54 | 54 |
return |
55 | 55 |
federation.delete() |
src/authentic2/utils/misc.py | ||
---|---|---|
526 | 526 |
composition = ((2, '23456789'), (6, 'ABCDEFGHJKLMNPQRSTUVWXYZ'), (1, '%$/\\#@!')) |
527 | 527 |
parts = [] |
528 | 528 |
for cnt, alphabet in composition: |
529 |
for i in range(cnt):
|
|
529 |
for dummy in range(cnt):
|
|
530 | 530 |
parts.append(random.SystemRandom().choice(alphabet)) |
531 | 531 |
random.shuffle(parts, random.SystemRandom().random) |
532 | 532 |
return ''.join(parts) |
... | ... | |
1293 | 1293 |
return [] |
1294 | 1294 | |
1295 | 1295 |
values = [] |
1296 |
for i, v in zip(range(count), parsed):
|
|
1296 |
for dummy, v in zip(range(count), parsed):
|
|
1297 | 1297 |
try: |
1298 | 1298 |
values.append(int(v)) |
1299 | 1299 |
except ValueError: |
src/authentic2/views.py | ||
---|---|---|
85 | 85 | |
86 | 86 |
@classmethod |
87 | 87 |
def can_edit_profile(cls): |
88 |
fields, labels = cls.get_fields() |
|
88 |
fields, dummy_labels = cls.get_fields()
|
|
89 | 89 |
return bool(fields) and app_settings.A2_PROFILE_CAN_EDIT_PROFILE |
90 | 90 | |
91 | 91 |
@classmethod |
src/authentic2_idp_oidc/apps.py | ||
---|---|---|
27 | 27 |
fragments = [] |
28 | 28 | |
29 | 29 |
oidc_sessions = get_oidc_sessions(request) |
30 |
for key, value in oidc_sessions.items():
|
|
30 |
for value in oidc_sessions.values():
|
|
31 | 31 |
if 'frontchannel_logout_uri' not in value: |
32 | 32 |
continue |
33 | 33 |
ctx = { |
src/authentic2_idp_oidc/views.py | ||
---|---|---|
824 | 824 |
if state: |
825 | 825 |
post_logout_redirect_uri = make_url(post_logout_redirect_uri, params={'state': state}) |
826 | 826 |
# FIXME: do something with id_token_hint |
827 |
id_token_hint = request.GET.get('id_token_hint')
|
|
827 |
request.GET.get('id_token_hint') |
|
828 | 828 |
return a2_logout(request, next_url=post_logout_redirect_uri, do_local=False, check_referer=False) |
src/django_rbac/backends.py | ||
---|---|---|
194 | 194 |
q.append(Q(ou_id=int(key[3:]))) |
195 | 195 |
continue |
196 | 196 |
elif perm_or_perms & value: |
197 |
ct_id, fk = key.split('.') |
|
197 |
dummy_ct_id, fk = key.split('.')
|
|
198 | 198 |
q.append(Q(pk=int(fk))) |
199 | 199 |
if q: |
200 | 200 |
return functools.reduce(Q.__or__, q) |
src/django_rbac/utils.py | ||
---|---|---|
79 | 79 |
def get_operation(operation_tpl): |
80 | 80 |
from . import models |
81 | 81 | |
82 |
operation, created = models.Operation.objects.get_or_create(slug=operation_tpl.slug)
|
|
82 |
operation, dummy = models.Operation.objects.get_or_create(slug=operation_tpl.slug)
|
|
83 | 83 |
return operation |
84 | 84 | |
85 | 85 |
tests/auth_fc/test_auth_fc.py | ||
---|---|---|
541 | 541 | |
542 | 542 |
def test_registration_page(settings, app, franceconnect, hooks): |
543 | 543 |
assert User.objects.count() == 0 |
544 |
response = app.get('/accounts/register/?service=portail&next=/idp/')
|
|
545 |
response = franceconnect.login_with_fc_fixed_params(app)
|
|
544 |
assert app.get('/accounts/register/?service=portail&next=/idp/')
|
|
545 |
franceconnect.login_with_fc_fixed_params(app) |
|
546 | 546 | |
547 | 547 |
# a new user has been created |
548 | 548 |
assert User.objects.count() == 1 |
tests/conftest.py | ||
---|---|---|
121 | 121 |
def create_user(**kwargs): |
122 | 122 |
User = get_user_model() |
123 | 123 |
password = kwargs.pop('password', None) or kwargs['username'] |
124 |
user, created = User.objects.get_or_create(**kwargs)
|
|
124 |
user, dummy = User.objects.get_or_create(**kwargs)
|
|
125 | 125 |
if password: |
126 | 126 |
user.clear_password = password |
127 | 127 |
user.set_password(password) |
tests/idp_oidc/test_misc.py | ||
---|---|---|
1290 | 1290 |
response = app.post(token_url, params=params) |
1291 | 1291 |
assert 'id_token' in response.json |
1292 | 1292 |
token = response.json['id_token'] |
1293 |
header, payload, signature = token.split('.')
|
|
1293 |
assert len(token.split('.')) == 3
|
|
1294 | 1294 |
jwt = JWT() |
1295 | 1295 |
# jwt deserialization implicitly checks the token signature: |
1296 | 1296 |
jwt.deserialize(token, key=jwk) |
... | ... | |
1305 | 1305 |
response = app.post(token_url, params=params, headers=client_authentication_headers(oidc_client)) |
1306 | 1306 |
assert 'id_token' in response.json |
1307 | 1307 |
token = response.json['id_token'] |
1308 |
header, payload, signature = token.split('.')
|
|
1308 |
assert len(token.split('.')) == 3
|
|
1309 | 1309 |
jwt = JWT() |
1310 | 1310 |
# jwt deserialization implicitly checks the token signature: |
1311 | 1311 |
jwt.deserialize(token, key=jwk) |
... | ... | |
1329 | 1329 |
'username': simple_user.username, |
1330 | 1330 |
'password': simple_user.username, |
1331 | 1331 |
} |
1332 |
for i in range(int(oidc_settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT.split('/')[0])):
|
|
1332 |
for _ in range(int(oidc_settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT.split('/')[0])):
|
|
1333 | 1333 |
response = app.post(token_url, params=params, status=400) |
1334 | 1334 |
assert response.json['error'] == 'invalid_client' |
1335 | 1335 |
assert 'Wrong client secret' in response.json['error_description'] |
... | ... | |
1353 | 1353 |
'username': simple_user.username, |
1354 | 1354 |
'password': simple_user.username, |
1355 | 1355 |
} |
1356 |
for i in range(int(oidc_settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT.split('/')[0])):
|
|
1356 |
for _ in range(int(oidc_settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT.split('/')[0])):
|
|
1357 | 1357 |
app.post(token_url, params=params) |
1358 | 1358 |
response = app.post(token_url, params=params, status=400) |
1359 | 1359 |
assert response.json['error'] == 'invalid_client' |
tests/test_a2_rbac.py | ||
---|---|---|
527 | 527 |
def test_update_self_admin_perm_migration(migration, new_perm_exists): |
528 | 528 |
old_apps = migration.before([('a2_rbac', '0022_auto_20200402_1101')]) |
529 | 529 |
Role = old_apps.get_model('a2_rbac', 'Role') |
530 |
OrganizationalUnit = old_apps.get_model('a2_rbac', 'OrganizationalUnit')
|
|
530 |
old_apps.get_model('a2_rbac', 'OrganizationalUnit') |
|
531 | 531 |
Permission = old_apps.get_model('a2_rbac', 'Permission') |
532 | 532 |
Operation = old_apps.get_model('django_rbac', 'Operation') |
533 | 533 |
ContentType = old_apps.get_model('contenttypes', 'ContentType') |
tests/test_all.py | ||
---|---|---|
305 | 305 |
# with cache the same value will come back |
306 | 306 |
g = GlobalCache(f, hostname_vary=False) |
307 | 307 |
values = set() |
308 |
for x in range(10):
|
|
308 |
for _ in range(10):
|
|
309 | 309 |
values.add(g()) |
310 | 310 |
self.assertEqual(len(values), 1) |
311 | 311 |
# with and hostname vary 10 values will come back |
312 | 312 |
g = GlobalCache(f, hostname_vary=True) |
313 | 313 |
values = set() |
314 |
for x in range(10):
|
|
314 |
for _ in range(10):
|
|
315 | 315 |
values.add(g()) |
316 | 316 |
self.assertEqual(len(values), 10) |
317 | 317 |
# null timeout, no cache |
tests/test_api.py | ||
---|---|---|
300 | 300 |
'last_name': 'Doeny', |
301 | 301 |
} |
302 | 302 |
headers = basic_authorization_header(admin) |
303 |
resp = app.put_json(f'/api/users/{simple_user.uuid}/', params=payload, headers=headers, status=200)
|
|
304 |
user = User.objects.get(id=simple_user.id)
|
|
303 |
app.put_json(f'/api/users/{simple_user.uuid}/', params=payload, headers=headers, status=200) |
|
304 |
User.objects.get(id=simple_user.id) |
|
305 | 305 | |
306 |
resp = app.patch_json(f'/api/users/{simple_user.uuid}/', params=payload, headers=headers, status=200)
|
|
306 |
app.patch_json(f'/api/users/{simple_user.uuid}/', params=payload, headers=headers, status=200) |
|
307 | 307 | |
308 | 308 | |
309 | 309 |
def test_api_users_create_with_email_verified(settings, app, admin): |
... | ... | |
344 | 344 |
'email': 'john.doe@nowhere.null', |
345 | 345 |
} |
346 | 346 |
headers = basic_authorization_header(admin) |
347 |
resp = app.post_json(f'/api/users/{simple_user.uuid}/email/', params=payload, headers=headers, status=200)
|
|
347 |
app.post_json(f'/api/users/{simple_user.uuid}/email/', params=payload, headers=headers, status=200) |
|
348 | 348 |
user = User.objects.get(id=simple_user.id) |
349 | 349 |
assert not user.email_verified |
350 | 350 | |
351 | 351 | |
352 | 352 |
def test_api_users_boolean_attribute(app, superuser): |
353 |
at = Attribute.objects.create(kind='boolean', name='boolean', label='boolean', required=True)
|
|
353 |
Attribute.objects.create(kind='boolean', name='boolean', label='boolean', required=True) |
|
354 | 354 |
superuser.attributes.boolean = True |
355 | 355 |
app.authorization = ('Basic', (superuser.username, superuser.username)) |
356 | 356 |
resp = app.get('/api/users/%s/' % superuser.uuid) |
... | ... | |
358 | 358 | |
359 | 359 | |
360 | 360 |
def test_api_users_boolean_attribute_optional(app, superuser): |
361 |
at = Attribute.objects.create(kind='boolean', name='boolean', label='boolean', required=False)
|
|
361 |
Attribute.objects.create(kind='boolean', name='boolean', label='boolean', required=False) |
|
362 | 362 |
superuser.attributes.boolean = True |
363 | 363 |
app.authorization = ('Basic', (superuser.username, superuser.username)) |
364 | 364 |
resp = app.get('/api/users/%s/' % superuser.uuid) |
... | ... | |
370 | 370 | |
371 | 371 |
user1 = User.objects.create(username='user1') |
372 | 372 |
user2 = User.objects.create(username='user2') |
373 |
user3 = User.objects.create(username='user3')
|
|
373 |
User.objects.create(username='user3') |
|
374 | 374 | |
375 | 375 |
role1 = Role.objects.create(name='role1') |
376 | 376 |
role2 = Role.objects.create(name='role2') |
... | ... | |
381 | 381 |
service1 = Service.objects.create(ou=get_default_ou(), name='service1', slug='service1') |
382 | 382 |
service1.add_authorized_role(role1) |
383 | 383 | |
384 |
service2 = Service.objects.create(ou=get_default_ou(), name='service2', slug='service2')
|
|
384 |
Service.objects.create(ou=get_default_ou(), name='service2', slug='service2') |
|
385 | 385 | |
386 | 386 |
resp = app.get('/api/users/') |
387 | 387 |
assert len(resp.json['results']) == 4 |
... | ... | |
411 | 411 |
service1 = Service.objects.create(ou=get_default_ou(), name='service1', slug='service1') |
412 | 412 |
service1.add_authorized_role(role1) |
413 | 413 | |
414 |
service2 = Service.objects.create(ou=get_default_ou(), name='service2', slug='service2')
|
|
414 |
Service.objects.create(ou=get_default_ou(), name='service2', slug='service2') |
|
415 | 415 | |
416 | 416 |
for user in superuser, user1, user2, user3: |
417 | 417 |
simple_role.members.add(user) |
... | ... | |
958 | 958 |
if not api_user.has_perm('a2_rbac.manage_members_role', role): |
959 | 959 |
status = 403 |
960 | 960 | |
961 |
resp = app.put_json(f'/api/roles/{role.uuid}/relationships/members/', params={'data': []}, status=status)
|
|
961 |
app.put_json(f'/api/roles/{role.uuid}/relationships/members/', params={'data': []}, status=status) |
|
962 | 962 |
if api_user.has_perm('a2_rbac.manage_members_role', role): |
963 | 963 |
assert len(role.members.all()) == 0 |
964 | 964 |
else: |
... | ... | |
1128 | 1128 |
def test_user_synchronization(app, simple_user): |
1129 | 1129 |
headers = basic_authorization_header(simple_user) |
1130 | 1130 |
uuids = [] |
1131 |
for i in range(100):
|
|
1131 |
for _ in range(100):
|
|
1132 | 1132 |
user = User.objects.create(first_name='ben', last_name='dauve') |
1133 | 1133 |
uuids.append(user.uuid) |
1134 | 1134 |
unknown_uuids = [uuid.uuid4().hex for i in range(100)] |
... | ... | |
1532 | 1532 |
def test_no_opened_session_cookie_on_api(app, user, settings): |
1533 | 1533 |
settings.A2_OPENED_SESSION_COOKIE_DOMAIN = 'testserver.local' |
1534 | 1534 |
app.authorization = ('Basic', (user.username, user.username)) |
1535 |
resp = app.get('/api/users/')
|
|
1535 |
app.get('/api/users/') |
|
1536 | 1536 |
assert 'A2_OPENED_SESSION' not in app.cookies |
1537 | 1537 | |
1538 | 1538 | |
... | ... | |
1904 | 1904 |
'last_name': 'Doe', |
1905 | 1905 |
} |
1906 | 1906 |
headers = basic_authorization_header(admin) |
1907 |
resp = app.put_json(f'/api/users/{simple_user.uuid}/', params=payload, headers=headers, status=200)
|
|
1908 |
resp = app.get(f'/api/users/{simple_user.uuid}/', headers=headers, status=200)
|
|
1907 |
app.put_json(f'/api/users/{simple_user.uuid}/', params=payload, headers=headers, status=200) |
|
1908 |
app.get(f'/api/users/{simple_user.uuid}/', headers=headers, status=200) |
|
1909 | 1909 |
payload['prefered_color'] = None |
1910 | 1910 |
payload['date'] = None |
1911 | 1911 |
payload['birthdate'] = None |
... | ... | |
1913 | 1913 |
payload['prefered_color'] = '' |
1914 | 1914 |
payload['date'] = '' |
1915 | 1915 |
payload['birthdate'] = '' |
1916 |
resp = app.put_json(f'/api/users/{simple_user.uuid}/', params=payload, headers=headers, status=200)
|
|
1917 |
resp = app.get(f'/api/users/{simple_user.uuid}/', headers=headers, status=200)
|
|
1916 |
app.put_json(f'/api/users/{simple_user.uuid}/', params=payload, headers=headers, status=200) |
|
1917 |
app.get(f'/api/users/{simple_user.uuid}/', headers=headers, status=200) |
|
1918 | 1918 |
payload['prefered_color'] = None |
1919 | 1919 |
payload['date'] = None |
1920 | 1920 |
payload['birthdate'] = None |
... | ... | |
2104 | 2104 |
app.authorization = ('Basic', (admin.username, admin.username)) |
2105 | 2105 | |
2106 | 2106 |
user = User.objects.create(first_name='Jean', last_name='Dupont', ou=get_default_ou()) |
2107 |
homonym = User.objects.create(first_name='Jean', last_name='Dupont', ou=ou1)
|
|
2107 |
User.objects.create(first_name='Jean', last_name='Dupont', ou=ou1) |
|
2108 | 2108 | |
2109 | 2109 |
params = { |
2110 | 2110 |
'first_name': 'Jeanne', |
... | ... | |
2293 | 2293 |
'new_password': 'password2', |
2294 | 2294 |
} |
2295 | 2295 |
url = reverse('a2-api-password-change') |
2296 |
response = app.post_json(url, params=payload, status=400)
|
|
2296 |
app.post_json(url, params=payload, status=400) |
|
2297 | 2297 |
user2.delete() |
2298 |
response = app.post_json(url, params=payload)
|
|
2298 |
app.post_json(url, params=payload) |
|
2299 | 2299 |
assert User.objects.get(username='john.doe').check_password('password2') |
2300 | 2300 | |
2301 | 2301 | |
2302 | 2302 |
def test_api_users_delete(settings, app, admin, simple_user): |
2303 | 2303 |
headers = basic_authorization_header(admin) |
2304 |
resp = app.delete_json(f'/api/users/{simple_user.uuid}/', headers=headers)
|
|
2304 |
app.delete_json(f'/api/users/{simple_user.uuid}/', headers=headers) |
|
2305 | 2305 |
assert not User.objects.filter(pk=simple_user.pk).exists() |
2306 | 2306 |
assert_event('manager.user.deletion', user=admin, api=True) |
2307 | 2307 | |
... | ... | |
2382 | 2382 |
assert login_stats in resp.json['data'] |
2383 | 2383 | |
2384 | 2384 |
# same goes with users |
2385 |
user = User.objects.create(username='john.doe', email='john.doe@example.com', ou=ou)
|
|
2385 |
User.objects.create(username='john.doe', email='john.doe@example.com', ou=ou) |
|
2386 | 2386 |
login_stats['filters'].append( |
2387 | 2387 |
{ |
2388 | 2388 |
'id': 'users_ou', |
... | ... | |
2419 | 2419 |
event_type = EventType.objects.get_for_name(event_type_name) |
2420 | 2420 | |
2421 | 2421 |
freezer.move_to('2020-02-03 12:00') |
2422 |
event = Event.objects.create(type=event_type, references=[portal], data=method)
|
|
2423 |
event = Event.objects.create(type=event_type, references=[agendas, user], user=user, data=method)
|
|
2422 |
Event.objects.create(type=event_type, references=[portal], data=method) |
|
2423 |
Event.objects.create(type=event_type, references=[agendas, user], user=user, data=method) |
|
2424 | 2424 | |
2425 | 2425 |
freezer.move_to('2020-03-04 13:00') |
2426 |
event = Event.objects.create(type=event_type, references=[agendas], data=method)
|
|
2427 |
event = Event.objects.create(type=event_type, references=[portal], data=method2)
|
|
2426 |
Event.objects.create(type=event_type, references=[agendas], data=method) |
|
2427 |
Event.objects.create(type=event_type, references=[portal], data=method2) |
|
2428 | 2428 | |
2429 | 2429 |
resp = app.get('/api/statistics/%s/?time_interval=month' % event_name, headers=headers) |
2430 | 2430 |
data = resp.json['data'] |
... | ... | |
2531 | 2531 |
def test_api_statistics_no_crash_older_drf(app, admin): |
2532 | 2532 |
headers = basic_authorization_header(admin) |
2533 | 2533 |
expected_status = 200 if drf_version > '3.9' else 404 |
2534 |
resp = app.get('/api/statistics/login/?time_interval=month', headers=headers, status=expected_status)
|
|
2534 |
app.get('/api/statistics/login/?time_interval=month', headers=headers, status=expected_status) |
|
2535 | 2535 | |
2536 | 2536 | |
2537 | 2537 |
def test_find_duplicates_put(app, admin, settings): |
tests/test_attribute_kinds.py | ||
---|---|---|
473 | 473 |
'last_name': 'Doe', |
474 | 474 |
'birthdate': '2017-12-31', |
475 | 475 |
} |
476 |
response = app.post_json('/api/users/', params=payload)
|
|
476 |
app.post_json('/api/users/', params=payload) |
|
477 | 477 |
assert qs.get().attributes.birthdate == datetime.date(2017, 12, 31) |
478 | 478 |
qs.delete() |
479 | 479 |
tests/test_commands.py | ||
---|---|---|
285 | 285 |
default_ou = get_default_ou() |
286 | 286 |
admin_op = get_operation(ADMIN_OP) |
287 | 287 | |
288 |
ou1 = get_ou_model().objects.create(name='Orgunit1', slug='orgunit1')
|
|
288 |
get_ou_model().objects.create(name='Orgunit1', slug='orgunit1') |
|
289 | 289 |
role1 = Role.objects.create(name='Role 1', slug='role-1', ou=default_ou) |
290 | 290 |
perm1 = Permission.objects.create( |
291 | 291 |
operation=admin_op, |
... | ... | |
362 | 362 |
ou.email_is_unique = True |
363 | 363 |
ou.save() |
364 | 364 | |
365 |
user1 = User.objects.create( |
|
366 |
username='foo', email='foo@example.net', first_name='Toto', last_name='Foo', ou=ou |
|
367 |
) |
|
368 |
user2 = User.objects.create( |
|
369 |
username='foo', email='bar@example.net', first_name='Bar', last_name='Foo', ou=ou |
|
370 |
) |
|
371 |
user3 = User.objects.create( |
|
372 |
username='bar', email='bar@example.net', first_name='Tutu', last_name='Bar', ou=ou |
|
373 |
) |
|
365 |
User.objects.create(username='foo', email='foo@example.net', first_name='Toto', last_name='Foo', ou=ou) |
|
366 |
User.objects.create(username='foo', email='bar@example.net', first_name='Bar', last_name='Foo', ou=ou) |
|
367 |
User.objects.create(username='bar', email='bar@example.net', first_name='Tutu', last_name='Bar', ou=ou) |
|
374 | 368 | |
375 | 369 |
settings.A2_EMAIL_IS_UNIQUE = True |
376 | 370 |
settings.A2_USERNAME_IS_UNIQUE = True |
tests/test_concurrency.py | ||
---|---|---|
37 | 37 | |
38 | 38 |
AttributeValue.objects.all().delete() |
39 | 39 | |
40 |
for i in range(10):
|
|
40 |
for _ in range(10):
|
|
41 | 41 | |
42 | 42 |
def map_threads(f, l): |
43 | 43 |
threads = [] |
tests/test_crypto.py | ||
---|---|---|
26 | 26 | |
27 | 27 | |
28 | 28 |
def test_idempotency(): |
29 |
for i in range(10):
|
|
29 |
for _ in range(10):
|
|
30 | 30 |
s = force_bytes(str(random.getrandbits(1024))) |
31 | 31 |
assert crypto.aes_base64_decrypt(key, crypto.aes_base64_encrypt(key, s)) == s |
32 | 32 |
tests/test_csv_import.py | ||
---|---|---|
523 | 523 |
thomas.roles.remove(role) |
524 | 524 |
content_only_key = '''email key,_role_name |
525 | 525 |
tnoel@entrouvert.com,test_name''' |
526 |
importer = user_csv_importer_factory(content_slug_add)
|
|
526 |
importer = user_csv_importer_factory(content_only_key)
|
|
527 | 527 |
assert importer.run() |
528 | 528 |
thomas.refresh_from_db() |
529 | 529 |
assert thomas in role.members.all() |
tests/test_data_transfer.py | ||
---|---|---|
168 | 168 |
}, |
169 | 169 |
ImportContext(), |
170 | 170 |
) |
171 |
role, status = rd.deserialize()
|
|
171 |
role, dummy = rd.deserialize()
|
|
172 | 172 |
assert role.ou == ou |
173 | 173 | |
174 | 174 | |
... | ... | |
198 | 198 |
{'uuid': uuid, 'name': 'some-role', 'slug': 'some-role', 'ou': {'slug': 'ou-2'}, 'service': None}, |
199 | 199 |
ImportContext(), |
200 | 200 |
) |
201 |
role, status = rd.deserialize()
|
|
201 |
assert rd.deserialize()
|
|
202 | 202 |
existing_role.refresh_from_db() |
203 | 203 |
assert existing_role.ou == ou2 |
204 | 204 |
assert Role.objects.exclude(slug__startswith='_').count() == 1 |
... | ... | |
212 | 212 |
{'uuid': uuid, 'slug': 'some-role', 'name': 'some role changed', 'ou': None, 'service': None}, |
213 | 213 |
ImportContext(), |
214 | 214 |
) |
215 |
role, status = rd.deserialize()
|
|
215 |
role, dummy = rd.deserialize()
|
|
216 | 216 |
existing_role.refresh_from_db() |
217 | 217 |
assert role == existing_role |
218 | 218 |
assert existing_role.name == 'some role changed' |
... | ... | |
238 | 238 |
ImportContext(), |
239 | 239 |
) |
240 | 240 |
role, status = rd.deserialize() |
241 |
created, deleted = rd.attributes()
|
|
241 |
created, dummy = rd.attributes()
|
|
242 | 242 |
assert status == 'created' |
243 | 243 |
assert role.attributes.count() == 2 |
244 | 244 |
assert len(created) == 2 |
... | ... | |
284 | 284 | |
285 | 285 |
rd = RoleDeserializer(child_role_dict, ImportContext()) |
286 | 286 |
child_role, status = rd.deserialize() |
287 |
created, deleted = rd.parentings()
|
|
287 |
created, dummy = rd.parentings()
|
|
288 | 288 | |
289 | 289 |
assert status == 'created' |
290 | 290 |
assert len(created) == 1 |
... | ... | |
395 | 395 |
import_context = ImportContext() |
396 | 396 |
rd = RoleDeserializer(some_role_dict, import_context) |
397 | 397 |
rd.deserialize() |
398 |
perm_created, perm_deleted = rd.permissions()
|
|
398 |
perm_created, dummy = rd.permissions()
|
|
399 | 399 |
assert len(perm_created) == 1 |
400 | 400 |
perm = perm_created[0] |
401 | 401 |
assert perm.target == perm_role |
... | ... | |
418 | 418 |
import_context = ImportContext() |
419 | 419 |
rd = RoleDeserializer(some_role_dict, import_context) |
420 | 420 |
rd.deserialize() |
421 |
perm_created, perm_deleted = rd.permissions()
|
|
421 |
perm_created, dummy = rd.permissions()
|
|
422 | 422 |
assert len(perm_created) == 1 |
423 | 423 |
perm = perm_created[0] |
424 | 424 |
assert perm.target.app_label == 'admin' |
... | ... | |
534 | 534 | |
535 | 535 |
def test_roles_import_no_slug(db): |
536 | 536 |
roles = [{'name': 'some role', 'description': 'some role description', 'ou': None}] |
537 |
res = import_site({'roles': roles}, ImportContext())
|
|
537 |
import_site({'roles': roles}, ImportContext()) |
|
538 | 538 |
role = Role.objects.get(name='some role') |
539 | 539 |
assert role.slug == 'some-role' |
540 | 540 |
tests/test_idp_cas.py | ||
---|---|---|
130 | 130 |
response = client.get('/idp/cas/login', {constants.SERVICE_PARAM: self.URL}) |
131 | 131 |
location = response['Location'] |
132 | 132 |
query = urllib.parse.parse_qs(location.split('?')[1]) |
133 |
next_url, next_url_query = query['next'][0].split('?') |
|
133 |
dummy_next_url, next_url_query = query['next'][0].split('?')
|
|
134 | 134 |
next_url_query = urllib.parse.parse_qs(next_url_query) |
135 | 135 |
response = client.get(location) |
136 | 136 |
response = client.post( |
... | ... | |
150 | 150 |
response = client.get('/idp/cas/login', {constants.SERVICE_PARAM: self.URL}) |
151 | 151 |
location = response['Location'] |
152 | 152 |
query = urllib.parse.parse_qs(location.split('?')[1]) |
153 |
next_url, next_url_query = query['next'][0].split('?') |
|
153 |
dummy_next_url, next_url_query = query['next'][0].split('?')
|
|
154 | 154 |
next_url_query = urllib.parse.parse_qs(next_url_query) |
155 | 155 |
response = client.get(location) |
156 | 156 |
response = client.post( |
tests/test_idp_saml2.py | ||
---|---|---|
968 | 968 |
uGnhj8v6XwvbjKZrL9kA+xf8ziazZfvvw/VGTm+IVFYB7d1x457jY5zjjXJvNyso |
969 | 969 |
owIDAQAB |
970 | 970 |
-----END PUBLIC KEY-----''' |
971 |
response = app.get('/idp/saml2/metadata')
|
|
971 |
app.get('/idp/saml2/metadata') |
|
972 | 972 | |
973 | 973 | |
974 | 974 |
def test_null_character_nonce(app, db): |
tests/test_import_export_site_cmd.py | ||
---|---|---|
49 | 49 | |
50 | 50 |
monkeypatch.setattr(authentic2.management.commands.export_site, 'export_site', dummy_export_site) |
51 | 51 |
management.call_command('export_site') |
52 |
out, err = capsys.readouterr()
|
|
52 |
out, dummy = capsys.readouterr()
|
|
53 | 53 |
assert json.loads(out) == dummy_export_site() |
54 | 54 | |
55 | 55 | |
... | ... | |
82 | 82 | |
83 | 83 |
management.call_command('import_site', json_fixture(content)) |
84 | 84 | |
85 |
out, err = capsys.readouterr()
|
|
85 |
out, dummy = capsys.readouterr()
|
|
86 | 86 |
assert "Real run" in out |
87 | 87 |
assert "1 roles created" in out |
88 | 88 |
assert "0 roles updated" in out |
... | ... | |
175 | 175 | |
176 | 176 |
def test_import_site_update_roles(db, json_fixture): |
177 | 177 |
r1 = Role.objects.create(name='Role1', slug='role1') |
178 |
r2 = Role.objects.create(name='Role2', slug='role2')
|
|
178 |
Role.objects.create(name='Role2', slug='role2') |
|
179 | 179 | |
180 | 180 |
content = { |
181 | 181 |
'roles': [ |
tests/test_journal.py | ||
---|---|---|
169 | 169 |
assert EventTypeDefinition.get_for_name('user.sso') is SSO |
170 | 170 | |
171 | 171 |
with pytest.raises(AssertionError, match='already registered'): |
172 | ||
172 |
# pylint: disable=unused-variable |
|
173 | 173 |
class SSO2(UserEventTypes): |
174 | 174 |
name = 'user.sso' |
175 | 175 |
label = 'Single Sign On' |
... | ... | |
209 | 209 |
for name in 'abcdef': |
210 | 210 |
event_types.append(EventType.objects.create(name=name)) |
211 | 211 | |
212 |
for i in range(count):
|
|
212 |
for _ in range(count):
|
|
213 | 213 |
events.append( |
214 | 214 |
Event( |
215 | 215 |
type=random.choice(event_types), |
... | ... | |
371 | 371 |
journal.record('user.registration', how='fc') |
372 | 372 |
journal.record('user.logout') |
373 | 373 | |
374 |
for i in range(count):
|
|
374 |
for _ in range(count):
|
|
375 | 375 |
journal.record('user.login', how='fc') |
376 | 376 |
journal.record('user.logout') |
377 | 377 | |
... | ... | |
480 | 480 |
assert stats == {'series': [], 'x_labels': []} |
481 | 481 | |
482 | 482 |
freezer.move_to('2020-02-03 12:00') |
483 |
event = Event.objects.create(type=event_type, references=[user, portal], user=user, data=method)
|
|
484 |
event = Event.objects.create(type=event_type, references=[user2, portal], user=user2, data=method)
|
|
483 |
Event.objects.create(type=event_type, references=[user, portal], user=user, data=method) |
|
484 |
Event.objects.create(type=event_type, references=[user2, portal], user=user2, data=method) |
|
485 | 485 | |
486 | 486 |
freezer.move_to('2020-02-03 13:00') |
487 |
event = Event.objects.create(type=event_type, references=[user, portal], user=user, data=method2)
|
|
488 |
event = Event.objects.create(type=event_type, references=[user2, portal], user=user2, data=method2)
|
|
487 |
Event.objects.create(type=event_type, references=[user, portal], user=user, data=method2) |
|
488 |
Event.objects.create(type=event_type, references=[user2, portal], user=user2, data=method2) |
|
489 | 489 | |
490 | 490 |
freezer.move_to('2020-03-03 12:00') |
491 |
event = Event.objects.create(type=event_type, references=[user, portal], user=user, data=method)
|
|
492 |
event = Event.objects.create(type=event_type, references=[user, agendas], user=user, data=method)
|
|
493 |
event = Event.objects.create(type=event_type, references=[user, forms], user=user, data=method)
|
|
494 |
event = Event.objects.create(type=event_type, user=user)
|
|
491 |
Event.objects.create(type=event_type, references=[user, portal], user=user, data=method) |
|
492 |
Event.objects.create(type=event_type, references=[user, agendas], user=user, data=method) |
|
493 |
Event.objects.create(type=event_type, references=[user, forms], user=user, data=method) |
|
494 |
Event.objects.create(type=event_type, user=user) |
|
495 | 495 | |
496 | 496 |
stats = event_type_definition.get_method_statistics('timestamp') |
497 | 497 |
stats['series'].sort(key=lambda x: x['label']) |
... | ... | |
597 | 597 | |
598 | 598 | |
599 | 599 |
def test_statistics_fill_date_gaps(db, freezer): |
600 |
user = User.objects.create(username='john.doe', email='john.doe@example.com')
|
|
600 |
User.objects.create(username='john.doe', email='john.doe@example.com') |
|
601 | 601 |
method = {'how': 'password-on-https'} |
602 | 602 |
event_type = EventType.objects.get_for_name('user.login') |
603 | 603 | |
604 | 604 |
freezer.move_to('2020-12-29 12:00') |
605 |
event = Event.objects.create(type=event_type, data=method)
|
|
605 |
Event.objects.create(type=event_type, data=method) |
|
606 | 606 |
freezer.move_to('2021-01-02 13:00') |
607 |
event = Event.objects.create(type=event_type, data=method)
|
|
607 |
Event.objects.create(type=event_type, data=method) |
|
608 | 608 | |
609 | 609 |
event_type_definition = event_type.definition |
610 | 610 | |
... | ... | |
616 | 616 | |
617 | 617 |
Event.objects.all().delete() |
618 | 618 |
freezer.move_to('2020-11-29 12:00') |
619 |
event = Event.objects.create(type=event_type, data=method)
|
|
619 |
Event.objects.create(type=event_type, data=method) |
|
620 | 620 |
freezer.move_to('2022-02-02 13:00') |
621 |
event = Event.objects.create(type=event_type, data=method)
|
|
621 |
Event.objects.create(type=event_type, data=method) |
|
622 | 622 |
stats = event_type_definition.get_method_statistics('month') |
623 | 623 |
assert stats == { |
624 | 624 |
'x_labels': ['2020-11', '2020-12'] + ['2021-%02d' % i for i in range(1, 13)] + ['2022-01', '2022-02'], |
... | ... | |
627 | 627 | |
628 | 628 |
Event.objects.all().delete() |
629 | 629 |
freezer.move_to('2020-11-29 12:00') |
630 |
event = Event.objects.create(type=event_type, data=method)
|
|
630 |
Event.objects.create(type=event_type, data=method) |
|
631 | 631 |
freezer.move_to('2025-02-02 13:00') |
632 |
event = Event.objects.create(type=event_type, data=method)
|
|
632 |
Event.objects.create(type=event_type, data=method) |
|
633 | 633 |
stats = event_type_definition.get_method_statistics('year') |
634 | 634 |
assert stats == { |
635 | 635 |
'x_labels': ['2020', '2021', '2022', '2023', '2024', '2025'], |
... | ... | |
647 | 647 |
event_type_definition = event_type.definition |
648 | 648 | |
649 | 649 |
freezer.move_to('2020-02-03 12:00') |
650 |
event = Event.objects.create(type=event_type, references=[user, portal], user=user, data=method)
|
|
651 |
event = Event.objects.create(type=event_type, references=[user], user=user, data=method)
|
|
650 |
Event.objects.create(type=event_type, references=[user, portal], user=user, data=method) |
|
651 |
Event.objects.create(type=event_type, references=[user], user=user, data=method) |
|
652 | 652 | |
653 | 653 |
stats = event_type_definition.get_service_statistics('month') |
654 | 654 |
stats['series'].sort(key=lambda x: x['label']) |
... | ... | |
671 | 671 |
event_type = EventType.objects.get_for_name('user.login') |
672 | 672 |
event_type_definition = event_type.definition |
673 | 673 | |
674 |
event = Event.objects.create(type=event_type, references=[user, portal], user=user, data=method)
|
|
674 |
Event.objects.create(type=event_type, references=[user, portal], user=user, data=method) |
|
675 | 675 | |
676 | 676 |
ou_with_no_service = OU.objects.create(name='Second OU') |
677 | 677 |
stats = event_type_definition.get_method_statistics('month', services_ou=ou_with_no_service) |
... | ... | |
680 | 680 | |
681 | 681 |
def test_prefetcher(db): |
682 | 682 |
event_type = EventType.objects.get_for_name('user.login') |
683 |
for i in range(10):
|
|
683 |
for _ in range(10):
|
|
684 | 684 |
user = User.objects.create() |
685 | 685 |
Event.objects.create(type=event_type, user=user, references=[user]) |
686 | 686 |
Event.objects.create(type=event_type, user=user, references=[user]) |
tests/test_large_userbase.py | ||
---|---|---|
38 | 38 |
fake = faker.Faker(locale='fr_FR') |
39 | 39 |
faker.Faker.seed(0) |
40 | 40 | |
41 |
for i in range(100):
|
|
41 |
for _ in range(100):
|
|
42 | 42 |
User.objects.bulk_create( |
43 | 43 |
User( |
44 | 44 |
first_name=fake.first_name() + ' ' + fake.first_name(), |
... | ... | |
73 | 73 |
'last_name': user.last_name, |
74 | 74 |
} |
75 | 75 | |
76 |
for i in range(100):
|
|
76 |
for _ in range(100):
|
|
77 | 77 |
resp = app.get('/api/users/find_duplicates/', params=params) |
78 | 78 |
assert len(resp.json['data']) >= 1 |
79 | 79 | |
... | ... | |
88 | 88 |
'birthdate': str(user.attributes.birthdate), |
89 | 89 |
} |
90 | 90 | |
91 |
for i in range(100):
|
|
91 |
for _ in range(100):
|
|
92 | 92 |
resp = app.get('/api/users/find_duplicates/', params=params) |
93 | 93 |
assert len(resp.json['data']) >= 1 |
tests/test_ldap.py | ||
---|---|---|
1808 | 1808 |
} |
1809 | 1809 |
] |
1810 | 1810 |
with utils.check_log(caplog, "account name authentication filter doesn't contain '%s'"): |
1811 |
response = client.post(
|
|
1811 |
client.post( |
|
1812 | 1812 |
'/login/', {'login-password-submit': '1', 'username': USERNAME, 'password': PASS}, follow=True |
1813 | 1813 |
) |
1814 | 1814 |
tests/test_manager.py | ||
---|---|---|
178 | 178 | |
179 | 179 |
assert len(mailoutbox) == 0 |
180 | 180 |
assert User.objects.filter(ou_id=new_ou.id).count() == 0 |
181 |
for i in range(100):
|
|
181 |
for _ in range(100):
|
|
182 | 182 |
ou_add = login(app, superuser_or_admin, url) |
183 | 183 |
form = ou_add.form |
184 | 184 |
form.set('first_name', 'John') |
... | ... | |
426 | 426 |
field = response.form['search-ou'] |
427 | 427 |
options = field.options |
428 | 428 |
assert len(options) == 4 |
429 |
for key, checked, label in options:
|
|
429 |
for key, checked, _ in options:
|
|
430 | 430 |
assert not checked or key == 'all' |
431 | 431 |
assert 'all' in [o[0] for o in options] |
432 | 432 |
assert 'none' in [o[0] for o in options] |
... | ... | |
441 | 441 |
field = response.form['search-ou'] |
442 | 442 |
options = field.options |
443 | 443 |
assert len(options) == 4 |
444 |
for key, checked, label in options:
|
|
444 |
for key, checked, dummy in options:
|
|
445 | 445 |
assert not checked or key == str(get_default_ou().pk) |
446 | 446 |
q = response.pyquery.remove_namespaces() |
447 | 447 |
assert len(q('table tbody tr')) == 1 |
... | ... | |
487 | 487 |
field = response.form['search-ou'] |
488 | 488 |
options = field.options |
489 | 489 |
assert len(options) == 4 |
490 |
for key, checked, label in options:
|
|
490 |
for key, checked, _ in options:
|
|
491 | 491 |
if key == 'all': |
492 | 492 |
assert checked |
493 | 493 |
else: |
... | ... | |
536 | 536 |
options = field.options |
537 | 537 |
assert len(options) == 1 |
538 | 538 |
# ou1 is selected |
539 |
key, checked, label = options[0]
|
|
539 |
key, checked, _ = options[0]
|
|
540 | 540 |
assert checked |
541 | 541 |
assert key == str(ou1.pk) |
542 | 542 |
# verify table shown |
... | ... | |
551 | 551 |
field = response.form['search-ou'] |
552 | 552 |
options = field.options |
553 | 553 |
assert len(options) == 1 |
554 |
key, checked, label = options[0]
|
|
554 |
key, checked, dummy = options[0]
|
|
555 | 555 |
assert checked |
556 | 556 |
assert key == str(ou1.pk) |
557 | 557 |
q = response.pyquery.remove_namespaces() |
... | ... | |
572 | 572 |
field = response.form['search-ou'] |
573 | 573 |
options = field.options |
574 | 574 |
assert len(options) == 1 |
575 |
key, checked, label = options[0]
|
|
575 |
key, checked, _ = options[0]
|
|
576 | 576 |
assert checked |
577 | 577 |
assert key == str(ou1.pk) |
578 | 578 |
q = response.pyquery.remove_namespaces() |
... | ... | |
604 | 604 |
field = response.form['search-ou'] |
605 | 605 |
options = field.options |
606 | 606 |
assert len(options) == 1 |
607 |
key, checked, label = options[0]
|
|
607 |
key, checked, _ = options[0]
|
|
608 | 608 |
assert checked |
609 | 609 |
assert key == str(ou1.pk) |
610 | 610 |
q = response.pyquery.remove_namespaces() |
... | ... | |
625 | 625 |
field = response.form['search-ou'] |
626 | 626 |
options = field.options |
627 | 627 |
assert len(options) == 1 |
628 |
key, checked, label = options[0]
|
|
628 |
key, checked, dummy = options[0]
|
|
629 | 629 |
assert checked |
630 | 630 |
assert key == str(ou1.pk) |
631 | 631 |
q = response.pyquery.remove_namespaces() |
... | ... | |
1128 | 1128 |
def test_manager_role_inheritance_list_search_permission(app, admin, simple_user, simple_role, ou1): |
1129 | 1129 |
visible_role = Role.objects.create(name='visible_role', ou=simple_user.ou) |
1130 | 1130 |
visible_role_2 = Role.objects.create(name='visible_role_2', ou=ou1) |
1131 |
invisible_role = Role.objects.create(name='invisible_role', ou=simple_user.ou)
|
|
1131 |
Role.objects.create(name='invisible_role', ou=simple_user.ou) |
|
1132 | 1132 |
admin_of_simple_role = simple_role.get_admin_role() |
1133 | 1133 | |
1134 | 1134 |
admin_of_simple_role.members.add(simple_user) |
tests/test_role_manager.py | ||
---|---|---|
151 | 151 |
assert Role.objects.filter(name=role_ou2.name, ou=get_default_ou()).exists() |
152 | 152 | |
153 | 153 |
response.form.set('search-text', 'role_ou1') |
154 |
search_response = response.form.submit()
|
|
154 |
response.form.submit() |
|
155 | 155 | |
156 | 156 |
export_response = response.click('Export') |
157 | 157 |
new_export = export_response.json |
tests/test_template.py | ||
---|---|---|
121 | 121 | |
122 | 122 |
resp.form.set('username', simple_user.username) |
123 | 123 |
resp.form.set('password', simple_user.username) |
124 |
response = resp.form.submit(name='login-password-submit')
|
|
124 |
resp.form.submit(name='login-password-submit') |
|
125 | 125 | |
126 | 126 |
resp = app.get(reverse('account_management')) |
127 | 127 |
assert resp.pyquery('body').attr('data-service-slug') == service.slug |
tests/test_user_manager.py | ||
---|---|---|
361 | 361 |
users = [User(username='user%s' % i) for i in range(10)] |
362 | 362 |
User.objects.bulk_create(users) |
363 | 363 | |
364 |
response = login(app, superuser)
|
|
364 |
login(app, superuser) |
|
365 | 365 |
resp = app.get('/manage/users/?search-text=user1') |
366 | 366 |
resp = resp.click('CSV').follow() |
367 | 367 |
resp = resp.click('Download CSV') |
tests/utils.py | ||
---|---|---|
116 | 116 | |
117 | 117 |
def assert_redirects_complex(response, expected_url, **kwargs): |
118 | 118 |
assert response.status_code == 302, 'code should be 302' |
119 |
scheme, netloc, path, query, fragment = urllib.parse.urlsplit(response.url)
|
|
119 |
scheme, netloc, _, _, _ = urllib.parse.urlsplit(response.url)
|
|
120 | 120 |
e_scheme, e_netloc, e_path, e_query, e_fragment = urllib.parse.urlsplit(expected_url) |
121 | 121 |
e_scheme = e_scheme if e_scheme else scheme |
122 | 122 |
e_netloc = e_netloc if e_netloc else netloc |
... | ... | |
249 | 249 |
current_run_on_commit = connection.run_on_commit |
250 | 250 |
connection.run_on_commit = [] |
251 | 251 |
while current_run_on_commit: |
252 |
sids, func = current_run_on_commit.pop(0)
|
|
252 |
_, func = current_run_on_commit.pop(0)
|
|
253 | 253 |
func() |
254 | 254 | |
255 | 255 | |
256 |
- |