0003-add-rp-remote-idp-authentication-16842.patch
fargo/oauth2/authentication.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU Affero General Public License |
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
import urlparse |
|
18 |
import logging |
|
19 | ||
20 |
import requests |
|
21 | ||
22 |
from django.conf import settings |
|
23 | ||
17 | 24 |
from django.utils.translation import ugettext_lazy as _ |
18 | 25 | |
19 | 26 |
from rest_framework.authentication import BasicAuthentication |
... | ... | |
51 | 58 | |
52 | 59 |
class FargoOAUTH2Authentication(BasicAuthentication): |
53 | 60 | |
61 |
def authenticate_through_idp(self, client_id, client_secret): |
|
62 |
'''Check client_id and client_secret with configured IdP, and verify it is an OIDC |
|
63 |
client. |
|
64 |
''' |
|
65 |
logger = logging.getLogger(__name__) |
|
66 | ||
67 |
authentic_idp = getattr(settings, 'FARGO_IDP_URL', None) |
|
68 |
if not authentic_idp: |
|
69 |
logger.warning(u'idp check-password not configured') |
|
70 |
return False, '' |
|
71 | ||
72 |
url = urlparse.urljoin(authentic_idp, 'api/check-password/') |
|
73 |
try: |
|
74 |
response = requests.post(url, json={ |
|
75 |
'username': client_id, |
|
76 |
'password': client_secret}, auth=(client_id, client_secret), verify=False) |
|
77 |
response.raise_for_status() |
|
78 |
except requests.RequestException as e: |
|
79 |
logger.warning(u'idp check-password API failed: %s', e) |
|
80 |
return False, 'idp is down' |
|
81 |
try: |
|
82 |
response = response.json() |
|
83 |
except ValueError as e: |
|
84 |
logger.warning(u'idp check-password API failed: %s, %r', e, response.content) |
|
85 |
return False, 'idp is down' |
|
86 | ||
87 |
if response.get('result') == 0: |
|
88 |
logger.warning(u'idp check-password API failed') |
|
89 |
return False, response.get('errors', [''])[0] |
|
90 | ||
91 |
return True, None |
|
92 | ||
54 | 93 |
def authenticate_credentials(self, client_id, client_secret): |
55 | 94 |
try: |
56 | 95 |
client = OAuth2Client.objects.get( |
57 | 96 |
client_id=client_id, client_secret=client_secret) |
58 | 97 |
except OAuth2Client.DoesNotExist: |
59 |
raise AuthenticationFailed(_('Invalid client_id/client_secret.')) |
|
98 |
success, error = self.authenticate_through_idp(client_id, client_secret) |
|
99 |
if not success: |
|
100 |
raise AuthenticationFailed(error or _('Invalid client_id/client_secret.')) |
|
101 |
client = OAuth2Client.objects.get(client_id=client_id) |
|
102 |
client.client_secret = client_secret |
|
103 |
client.save() |
|
60 | 104 | |
61 | 105 |
user = OAuth2User(client) |
62 | 106 |
user.authenticated = True |
tests/test_oauth2.py | ||
---|---|---|
1 |
import json |
|
2 |
import mock |
|
1 | 3 |
import pytest |
2 | 4 |
from urllib import quote |
3 | 5 |
import urlparse |
... | ... | |
14 | 16 |
pytestmark = pytest.mark.django_db |
15 | 17 | |
16 | 18 | |
19 |
class FakedResponse(mock.Mock): |
|
20 | ||
21 |
def json(self): |
|
22 |
return json.loads(self.content) |
|
23 | ||
24 | ||
17 | 25 |
@pytest.fixture |
18 | 26 |
def oauth2_client(): |
19 | 27 |
return OAuth2Client.objects.create( |
... | ... | |
160 | 168 |
url += '?%s' % urlencode({'redirect_uri': 'https://example.com'}) |
161 | 169 |
resp = app.get(url) |
162 | 170 |
assert 'This document is already in your portfolio' in resp.content |
171 | ||
172 | ||
173 |
@mock.patch('fargo.oauth2.authentication.requests.post') |
|
174 |
def test_idp_authentication(mocked_post, settings, app, oauth2_client, john_doe, user_doc): |
|
175 |
login(app, user=john_doe) |
|
176 |
url = reverse('oauth2-authorize') |
|
177 |
params = { |
|
178 |
'client_id': oauth2_client.client_id, |
|
179 |
'client_secret': 'fake', |
|
180 |
'response_type': 'code', |
|
181 |
'state': 'achipeachope', |
|
182 |
'redirect': 'https://example.com/' |
|
183 |
} |
|
184 |
params['redirect_uri'] = 'https://example.com' |
|
185 |
resp = app.get(url, params=params) |
|
186 |
resp.forms[0]['document'].select('1') |
|
187 |
resp = resp.forms[0].submit() |
|
188 |
auth = OAuth2Authorize.objects.filter(user_document__user=john_doe)[0] |
|
189 |
params.pop('response_type') |
|
190 |
params.pop('state') |
|
191 |
params['grant_type'] = 'authorization_code' |
|
192 |
params['code'] = auth.code |
|
193 | ||
194 |
url = reverse('oauth2-get-token') |
|
195 |
# when remote remote idp not set |
|
196 |
app.authorization = ('Basic', ('client-id', 'fake')) |
|
197 |
resp = app.post(url, params=params, status=401) |
|
198 |
resp.json['detail'] == 'Invalid client_id/client_secret.' |
|
199 |
# when remote idp fails to authenticate rp |
|
200 |
settings.FARGO_IDP_URL = 'https://idp.example.org' |
|
201 |
response = { |
|
202 |
"result": 0, "errors": ["Invalid username/password."] |
|
203 |
} |
|
204 |
mocked_post.return_value = FakedResponse(content=json.dumps(response)) |
|
205 |
resp = app.post(url, params=params, status=401) |
|
206 |
resp.json['detail'] == 'Invalid client_id/client_secret.' |
|
207 |
# when remote idp authenticates rp |
|
208 |
response = {"result": 1, "errors": []} |
|
209 |
mocked_post.return_value = FakedResponse(content=json.dumps(response)) |
|
210 |
resp = app.post(url, params=params, status=200) |
|
211 |
assert resp.json['access_token'] == auth.access_token |
|
212 |
url = reverse('oauth2-get-document') |
|
213 |
app.authorization = ('Bearer', str(auth.access_token)) |
|
214 |
resp = app.get(url, status=200) |
|
163 |
- |