1 |
1 |
import logging
|
2 |
2 |
|
3 |
|
from rest_framework import authentication, exceptions
|
|
3 |
from rest_framework import authentication, exceptions, status
|
4 |
4 |
|
5 |
5 |
from hobo import signature
|
6 |
6 |
|
... | ... | |
61 |
61 |
return 'Publik Service Admin'
|
62 |
62 |
|
63 |
63 |
|
|
64 |
class PublikAuthenticationFailed(exceptions.APIException):
|
|
65 |
status_code = status.HTTP_401_UNAUTHORIZED
|
|
66 |
default_code = 'invalid-signature'
|
|
67 |
|
|
68 |
def __init__(self, code):
|
|
69 |
super(PublikAuthenticationFailed, self).__init__({'err': code})
|
|
70 |
|
|
71 |
|
64 |
72 |
class PublikAuthentication(authentication.BaseAuthentication):
|
65 |
73 |
def __init__(self, *args, **kwargs):
|
66 |
74 |
self.logger = logging.getLogger(__name__)
|
... | ... | |
80 |
88 |
try:
|
81 |
89 |
return User.objects.get(uuid=name_id)
|
82 |
90 |
except User.DoesNotExist:
|
83 |
|
raise exceptions.AuthenticationFailed('No user matches uuid=%r' % name_id)
|
|
91 |
raise PublikAuthenticationFailed('user-not-found')
|
|
92 |
|
84 |
93 |
elif UserSAMLIdentifier:
|
85 |
94 |
try:
|
86 |
95 |
return UserSAMLIdentifier.objects.get(name_id=name_id).user
|
87 |
96 |
except UserSAMLIdentifier.DoesNotExist:
|
88 |
|
raise exceptions.AuthenticationFailed(
|
89 |
|
'No user matches nameid=%r' % name_id)
|
|
97 |
raise PublikAuthenticationFailed('user-not-found')
|
90 |
98 |
else:
|
91 |
|
raise exceptions.AuthenticationFailed(
|
92 |
|
'No usable model to match nameid=%r' % name_id)
|
|
99 |
raise PublikAuthenticationFailed('no-usable-model')
|
93 |
100 |
else:
|
94 |
101 |
orig = request.GET['orig']
|
95 |
102 |
try:
|
... | ... | |
100 |
107 |
klass = import_string(settings.HOBO_ANONYMOUS_SERVICE_USER_CLASS)
|
101 |
108 |
self.logger.info('anonymous signature validated')
|
102 |
109 |
return klass()
|
103 |
|
raise exceptions.AuthenticationFailed('Anonymous service user is unsupported')
|
|
110 |
raise PublikAuthenticationFailed('no-user-for-orig')
|
104 |
111 |
|
105 |
112 |
def get_orig_key(self, orig):
|
106 |
113 |
if not hasattr(settings, 'KNOWN_SERVICES'):
|
107 |
114 |
self.logger.warning('no known services')
|
108 |
|
raise exceptions.AuthenticationFailed('No KNOWN_SERVICES setting')
|
|
115 |
raise PublikAuthenticationFailed('no-known-services-setting')
|
109 |
116 |
for service_id in settings.KNOWN_SERVICES:
|
110 |
117 |
for slug, service in settings.KNOWN_SERVICES[service_id].items():
|
111 |
118 |
if service.get('verif_orig') == orig and service.get('secret'):
|
112 |
119 |
return service['secret']
|
113 |
120 |
self.logger.warning('no secret found for origin %r', orig)
|
114 |
|
raise exceptions.AuthenticationFailed('no secret found for origin %r' % orig)
|
|
121 |
raise PublikAuthenticationFailed('no-secret-found-for-orig')
|
115 |
122 |
|
116 |
123 |
def authenticate(self, request):
|
117 |
124 |
full_path = request.get_full_path()
|
... | ... | |
120 |
127 |
key = self.get_orig_key(request.GET['orig'])
|
121 |
128 |
if not signature.check_url(full_path, key):
|
122 |
129 |
self.logger.warning('invalid signature')
|
123 |
|
raise exceptions.AuthenticationFailed('Invalid signature')
|
|
130 |
raise PublikAuthenticationFailed('invalid-signature')
|
124 |
131 |
user = self.resolve_user(request)
|
125 |
132 |
self.logger.info('user authenticated with signature %s', user)
|
126 |
133 |
return (user, None)
|