0001-general-don-t-use-session-for-after_url-persistence-.patch
tests/test_admin_pages.py | ||
---|---|---|
83 | 83 |
def test_with_user(pub): |
84 | 84 |
create_superuser(pub) |
85 | 85 |
resp = get_app(pub).get('/backoffice/', status=302) |
86 |
resp = resp.follow() |
|
87 |
assert resp.location == 'http://example.net/login/' |
|
86 |
assert resp.location == 'http://example.net/login/?next=http%3A%2F%2Fexample.net%2Fbackoffice%2F' |
|
88 | 87 | |
89 | 88 |
def test_with_superuser(pub): |
90 | 89 |
app = login(get_app(pub)) |
... | ... | |
2318 | 2317 | |
2319 | 2318 |
from test_saml_auth import setup_environment |
2320 | 2319 |
setup_environment(pub) |
2320 |
pub.user_class.wipe() # makes sure there are no users |
|
2321 | 2321 | |
2322 | 2322 |
resp = resp.click('Identity Providers') |
2323 | 2323 |
assert 'http://sso.example.net/' in resp.body |
tests/test_backoffice_pages.py | ||
---|---|---|
139 | 139 |
def test_backoffice_unlogged(pub): |
140 | 140 |
create_superuser(pub) |
141 | 141 |
resp = get_app(pub).get('/backoffice/', status=302) |
142 |
resp = resp.follow() |
|
143 |
assert resp.location == 'http://example.net/login/' |
|
142 |
assert resp.location == 'http://example.net/login/?next=http%3A%2F%2Fexample.net%2Fbackoffice%2F' |
|
144 | 143 | |
145 | 144 |
def test_backoffice_home(pub): |
146 | 145 |
create_superuser(pub) |
tests/test_form_pages.py | ||
---|---|---|
183 | 183 |
formdef.store() |
184 | 184 |
home = get_app(pub).get('/') |
185 | 185 |
assert home.status_int == 302 |
186 |
assert home.location == 'http://example.net/login' |
|
186 |
assert home.location == 'http://example.net/login/?next=http%3A%2F%2Fexample.net%2F'
|
|
187 | 187 | |
188 | 188 |
def test_home_always_advertise(pub): |
189 | 189 |
formdef = create_formdef() |
... | ... | |
552 | 552 |
formdata_user.store() |
553 | 553 | |
554 | 554 |
resp = get_app(pub).get('/test/%s/' % formdata.id) |
555 |
assert resp.location == 'http://example.net/login'
|
|
555 |
assert resp.location.startswith('http://example.net/login/?next=')
|
|
556 | 556 | |
557 | 557 |
resp = get_app(pub).get('/test/%s/' % formdata_user.id) |
558 |
assert resp.location == 'http://example.net/login'
|
|
558 |
assert resp.location.startswith('http://example.net/login/?next=')
|
|
559 | 559 | |
560 | 560 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/%s/' % formdata_user.id) |
561 | 561 |
assert 'The form has been recorded on' in resp |
tests/test_saml_auth.py | ||
---|---|---|
2 | 2 |
import os |
3 | 3 |
import sys |
4 | 4 |
import shutil |
5 |
import urlparse |
|
5 | 6 | |
6 | 7 |
try: |
7 | 8 |
import lasso |
... | ... | |
80 | 81 | |
81 | 82 |
pub.write_cfg() |
82 | 83 | |
84 |
pub.user_class.wipe() |
|
85 |
pub.user_class().store() |
|
86 | ||
83 | 87 |
def teardown_module(module): |
84 | 88 |
shutil.rmtree(pub.APP_DIR) |
85 | 89 | |
... | ... | |
159 | 163 |
body = saml2.assertionConsumerPost() |
160 | 164 | |
161 | 165 |
assert req.response.status_code == 303 |
162 |
assert req.response.headers['location'] == 'http://example.net/'
|
|
166 |
assert req.response.headers['location'] == 'http://example.net' |
|
163 | 167 |
assert req.session.user is not None |
164 | 168 | |
165 | 169 |
def test_assertion_consumer_existing_federation(): |
... | ... | |
205 | 209 |
def test_assertion_consumer_redirect_after_url(): |
206 | 210 |
setup_environment(pub) |
207 | 211 |
req = get_assertion_consumer_request() |
208 |
req.session.after_url = '/foobar'
|
|
212 |
req.form['RelayState'] = '/foobar'
|
|
209 | 213 |
saml2 = Saml2Directory() |
210 | 214 |
saml_response_body = req.form['SAMLResponse'] |
211 | 215 |
body = saml2.assertionConsumerPost() |
212 |
assert req.response.headers['location'] == 'http://example.net/foobar'
|
|
216 |
assert req.response.headers['location'] == '/foobar' |
|
213 | 217 | |
214 | 218 |
def test_saml_login_page(): |
215 | 219 |
setup_environment(pub) |
... | ... | |
225 | 229 |
assert resp.status_int == 302 |
226 | 230 |
assert resp.location.startswith('http://sso.example.net/saml2/sso?SAMLRequest=') |
227 | 231 | |
232 |
def test_saml_backoffice_redirect(): |
|
233 |
setup_environment(pub) |
|
234 |
resp = get_app(pub).get('/backoffice/') |
|
235 |
assert resp.status_int == 302 |
|
236 |
assert resp.location.startswith('http://example.net/login/?next=') |
|
237 |
resp = resp.follow() |
|
238 |
assert resp.location.startswith('http://sso.example.net/saml2/sso') |
|
239 |
assert urlparse.parse_qs(urlparse.urlparse(resp.location).query)['SAMLRequest'] |
|
240 |
assert urlparse.parse_qs(urlparse.urlparse(resp.location).query)['RelayState'] == ['http://example.net/backoffice/'] |
|
241 | ||
228 | 242 |
def test_saml_register(): |
229 | 243 |
setup_environment(pub) |
230 | 244 |
get_app(pub).get('/register/', status=404) |
... | ... | |
234 | 248 |
# if there's no specific registration URL, this initiates a SSO and there |
235 | 249 |
# should be a registration link on the identity provider |
236 | 250 |
resp = get_app(pub).get('/register/') |
237 |
assert resp.location == 'http://example.net/login/' |
|
251 |
assert resp.location == 'http://example.net/login/?next=http%3A%2F%2Fexample.net%2Fregister%2F'
|
|
238 | 252 |
resp = resp.follow() |
239 | 253 |
assert resp.location.startswith('http://sso.example.net/saml2/sso?SAMLRequest=') |
240 | 254 |
wcs/qommon/errors.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU General Public License |
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
from quixote import get_publisher |
|
17 |
import urllib |
|
18 | 18 | |
19 |
from quixote import get_publisher |
|
19 | 20 |
import quixote |
20 | 21 |
from quixote.errors import * |
21 | 22 |
from quixote.html import TemplateIO, htmltext |
... | ... | |
50 | 51 |
if request.user: |
51 | 52 |
return AccessForbiddenError.render(self) |
52 | 53 | |
53 |
session.after_url = request.get_frontoffice_url() |
|
54 | 54 |
if self.public_msg: |
55 | 55 |
session.message = ('error', self.public_msg) |
56 |
login_url = get_publisher().get_root_url() + 'login' |
|
56 |
login_url = get_publisher().get_root_url() + 'login/' |
|
57 |
login_url += '?' + urllib.urlencode({'next': request.get_frontoffice_url()}) |
|
57 | 58 |
quixote.redirect(login_url) |
58 | 59 | |
59 | 60 |
class EmailError(Exception): |
wcs/qommon/ident/idp.py | ||
---|---|---|
20 | 20 |
lasso = None |
21 | 21 | |
22 | 22 |
from cStringIO import StringIO |
23 |
import urllib |
|
23 | 24 |
import urllib2 |
24 | 25 |
import urlparse |
25 | 26 | |
... | ... | |
146 | 147 |
get_cfg('saml_identities', {}).get('registration-url'), |
147 | 148 |
vars) |
148 | 149 |
return redirect(registration_url) |
149 |
get_session().after_url = get_request().get_frontoffice_url() |
|
150 | 150 |
ident_methods = get_cfg('identification', {}).get('methods', []) |
151 | 151 |
if len(ident_methods) > 1: |
152 |
return redirect(get_publisher().get_root_url() + 'login/idp/')
|
|
152 |
login_url = get_publisher().get_root_url() + 'login/idp/'
|
|
153 | 153 |
else: |
154 |
return redirect(get_publisher().get_root_url() + 'login/') |
|
154 |
login_url = get_publisher().get_root_url() + 'login/' |
|
155 |
login_url += '?' + urllib.urlencode({'next': get_request().get_frontoffice_url()}) |
|
156 |
return redirect(login_url) |
|
155 | 157 | |
156 | 158 |
if not get_request().user.anonymous: |
157 | 159 |
raise errors.AccessForbiddenError() |
... | ... | |
1058 | 1060 |
def login(self): |
1059 | 1061 |
idps = get_cfg('idp', {}) |
1060 | 1062 | |
1061 |
if get_response().iframe_mode and not get_session().after_url: |
|
1062 |
t = get_request().get_url() |
|
1063 |
get_session().after_url = str('%s://%s%s' % ( |
|
1063 |
if get_response().iframe_mode and not get_request().form.get('next'): |
|
1064 |
get_request().form = {'next': '%s://%s%s' % ( |
|
1064 | 1065 |
get_request().get_scheme(), |
1065 | 1066 |
get_request().get_server(False), |
1066 |
get_publisher().get_root_url()))
|
|
1067 |
get_publisher().get_root_url())}
|
|
1067 | 1068 | |
1068 | 1069 |
# there is only one visible IdP, perform login automatically on |
1069 | 1070 |
# this one. |
wcs/qommon/ident/password.py | ||
---|---|---|
195 | 195 |
return Directory._q_traverse(self, path) |
196 | 196 | |
197 | 197 |
def login(self): |
198 |
next_url = get_request().form.get('next') |
|
199 |
if get_request().get_method() == 'GET': |
|
200 |
get_request().form = {} |
|
198 | 201 |
identities_cfg = get_cfg('identities', {}) |
199 |
form = Form(enctype = 'multipart/form-data', id = 'login-form', use_tokens = False) |
|
202 |
form = Form(enctype='multipart/form-data', id='login-form', |
|
203 |
use_tokens=False, action=get_request().get_url()) |
|
204 |
form.add_hidden('next', next_url) |
|
200 | 205 |
if identities_cfg.get('email-as-username', False): |
201 | 206 |
form.add(StringWidget, 'username', title = _('Email'), size=25, required=True) |
202 | 207 |
else: |
... | ... | |
285 | 290 |
account.warned_about_unused_account = False |
286 | 291 |
account.store() |
287 | 292 | |
288 |
if session.after_url: |
|
289 |
after_url = session.after_url |
|
290 |
session.after_url = None |
|
293 |
if form and form.get_widget('next').parse(): |
|
294 |
after_url = form.get_widget('next').parse() |
|
291 | 295 |
return redirect(after_url) |
292 | 296 |
else: |
293 | 297 |
return redirect(get_publisher().get_root_url() + get_publisher().after_login_url) |
wcs/qommon/saml2.py | ||
---|---|---|
226 | 226 |
login.request.forceAuthn = False |
227 | 227 |
login.request.isPassive = get_request().form.get('IsPassive') == 'true' |
228 | 228 |
login.request.consent = 'urn:oasis:names:tc:SAML:2.0:consent:current-implicit' |
229 |
login.msgRelayState = get_request().form.get('next') |
|
230 |
print 'relay state:', login.msgRelayState |
|
229 | 231 |
login.buildAuthnRequestMsg() |
230 | 232 |
return redirect(login.msgUrl) |
231 | 233 | |
... | ... | |
385 | 387 |
relay_state = request.form.get('RelayState', None) |
386 | 388 |
session = get_session() |
387 | 389 |
response = get_response() |
388 |
if session.after_url: |
|
389 |
after_url = session.after_url |
|
390 |
session.after_url = None |
|
391 |
return redirect(after_url) |
|
392 |
response.set_status(303) |
|
393 |
if relay_state == 'backoffice' and get_publisher().backoffice_directory_class: |
|
394 |
response.headers['location'] = urlparse.urljoin( |
|
395 |
get_request().get_url(), str('../backoffice/')) |
|
390 |
if relay_state == 'backoffice': |
|
391 |
after_url = get_publisher().get_backoffice_url() |
|
392 |
elif relay_state: |
|
393 |
after_url = relay_state |
|
396 | 394 |
else: |
397 |
response.headers['location'] = urlparse.urljoin(get_request().get_url(), str('..')) |
|
395 |
after_url = get_publisher().get_frontoffice_url() |
|
396 |
response.set_status(303) |
|
397 |
response.headers['location'] = after_url |
|
398 | 398 |
response.content_type = 'text/plain' |
399 | 399 |
return "Your browser should redirect you" |
400 | 400 |
wcs/qommon/sessions.py | ||
---|---|---|
69 | 69 |
_names = 'sessions' |
70 | 70 | |
71 | 71 |
name_identifier = None |
72 |
after_url = None |
|
73 | 72 |
lasso_session_dump = None |
74 | 73 |
lasso_session_index = None |
75 | 74 |
lasso_anonymous_identity_dump = None |
... | ... | |
104 | 103 |
return (time.time() - self.get_access_time()) > duration |
105 | 104 | |
106 | 105 |
def has_info(self): |
107 |
return self.name_identifier or self.after_url or \
|
|
106 |
return self.name_identifier or \ |
|
108 | 107 |
self.lasso_session_dump or self.message or \ |
109 | 108 |
self.lasso_anonymous_identity_dump or \ |
110 | 109 |
self.lasso_manage_name_id_dump or \ |
wcs/root.py | ||
---|---|---|
17 | 17 |
import json |
18 | 18 |
import os |
19 | 19 |
import re |
20 |
import urllib |
|
20 | 21 | |
21 | 22 |
try: |
22 | 23 |
from hobo.scrutiny.wsgi.middleware import VersionMiddleware |
... | ... | |
75 | 76 |
ident_methods = get_cfg('identification', {}).get('methods', []) |
76 | 77 | |
77 | 78 |
if get_request().form.get('ReturnUrl'): |
78 |
get_session().after_url = get_request().form.get('ReturnUrl')
|
|
79 |
get_request().form['next'] = get_request().form.pop('ReturnUrl')
|
|
79 | 80 | |
80 | 81 |
if len(ident_methods) == 0: |
81 | 82 |
get_logger().info('no ident method defined') |
... | ... | |
109 | 110 |
if form.is_submitted() and not form.has_errors(): |
110 | 111 |
method = form.get_widget('method').parse() |
111 | 112 |
if get_publisher().ident_methods.get(method)().is_interactive(): |
112 |
return redirect('../ident/%s/login' % method) |
|
113 |
login_url = '../ident/%s/login' % method |
|
114 |
if get_request().form.get('next'): |
|
115 |
login_url += '?' + urllib.urlencode( |
|
116 |
{'next': get_request().form.get('next')}) |
|
117 |
return redirect(login_url) |
|
113 | 118 |
else: |
114 | 119 |
try: |
115 | 120 |
return qommon.ident.login(method) |
116 |
- |