Projet

Général

Profil

0001-general-don-t-use-session-for-after_url-persistence-.patch

(wcs) - Frédéric Péters, 13 janvier 2016 22:11

Télécharger (14,3 ko)

Voir les différences:

Subject: [PATCH] general: don't use session for after_url persistence (#5637)

 tests/test_admin_pages.py      |  4 ++--
 tests/test_backoffice_pages.py |  3 +--
 tests/test_form_pages.py       |  6 +++---
 tests/test_saml_auth.py        | 22 ++++++++++++++++++----
 wcs/qommon/errors.py           |  7 ++++---
 wcs/qommon/ident/idp.py        | 15 ++++++++-------
 wcs/qommon/ident/password.py   | 12 ++++++++----
 wcs/qommon/saml2.py            | 18 +++++++++---------
 wcs/qommon/sessions.py         |  3 +--
 wcs/root.py                    |  9 +++++++--
 10 files changed, 61 insertions(+), 38 deletions(-)
tests/test_admin_pages.py
83 83
def test_with_user(pub):
84 84
    create_superuser(pub)
85 85
    resp = get_app(pub).get('/backoffice/', status=302)
86
    resp = resp.follow()
87
    assert resp.location == 'http://example.net/login/'
86
    assert resp.location == 'http://example.net/login/?next=http%3A%2F%2Fexample.net%2Fbackoffice%2F'
88 87

  
89 88
def test_with_superuser(pub):
90 89
    app = login(get_app(pub))
......
2318 2317

  
2319 2318
    from test_saml_auth import setup_environment
2320 2319
    setup_environment(pub)
2320
    pub.user_class.wipe() # makes sure there are no users
2321 2321

  
2322 2322
    resp = resp.click('Identity Providers')
2323 2323
    assert 'http://sso.example.net/' in resp.body
tests/test_backoffice_pages.py
139 139
def test_backoffice_unlogged(pub):
140 140
    create_superuser(pub)
141 141
    resp = get_app(pub).get('/backoffice/', status=302)
142
    resp = resp.follow()
143
    assert resp.location == 'http://example.net/login/'
142
    assert resp.location == 'http://example.net/login/?next=http%3A%2F%2Fexample.net%2Fbackoffice%2F'
144 143

  
145 144
def test_backoffice_home(pub):
146 145
    create_superuser(pub)
tests/test_form_pages.py
183 183
    formdef.store()
184 184
    home = get_app(pub).get('/')
185 185
    assert home.status_int == 302
186
    assert home.location == 'http://example.net/login'
186
    assert home.location == 'http://example.net/login/?next=http%3A%2F%2Fexample.net%2F'
187 187

  
188 188
def test_home_always_advertise(pub):
189 189
    formdef = create_formdef()
......
552 552
    formdata_user.store()
553 553

  
554 554
    resp = get_app(pub).get('/test/%s/' % formdata.id)
555
    assert resp.location == 'http://example.net/login'
555
    assert resp.location.startswith('http://example.net/login/?next=')
556 556

  
557 557
    resp = get_app(pub).get('/test/%s/' % formdata_user.id)
558
    assert resp.location == 'http://example.net/login'
558
    assert resp.location.startswith('http://example.net/login/?next=')
559 559

  
560 560
    resp = login(get_app(pub), username='foo', password='foo').get('/test/%s/' % formdata_user.id)
561 561
    assert 'The form has been recorded on' in resp
tests/test_saml_auth.py
2 2
import os
3 3
import sys
4 4
import shutil
5
import urlparse
5 6

  
6 7
try:
7 8
    import lasso
......
80 81

  
81 82
    pub.write_cfg()
82 83

  
84
    pub.user_class.wipe()
85
    pub.user_class().store()
86

  
83 87
def teardown_module(module):
84 88
    shutil.rmtree(pub.APP_DIR)
85 89

  
......
159 163
    body = saml2.assertionConsumerPost()
160 164

  
161 165
    assert req.response.status_code == 303
162
    assert req.response.headers['location'] == 'http://example.net/'
166
    assert req.response.headers['location'] == 'http://example.net'
163 167
    assert req.session.user is not None
164 168

  
165 169
def test_assertion_consumer_existing_federation():
......
205 209
def test_assertion_consumer_redirect_after_url():
206 210
    setup_environment(pub)
207 211
    req = get_assertion_consumer_request()
208
    req.session.after_url = '/foobar'
212
    req.form['RelayState'] = '/foobar'
209 213
    saml2 = Saml2Directory()
210 214
    saml_response_body = req.form['SAMLResponse']
211 215
    body = saml2.assertionConsumerPost()
212
    assert req.response.headers['location'] == 'http://example.net/foobar'
216
    assert req.response.headers['location'] == '/foobar'
213 217

  
214 218
def test_saml_login_page():
215 219
    setup_environment(pub)
......
225 229
    assert resp.status_int == 302
226 230
    assert resp.location.startswith('http://sso.example.net/saml2/sso?SAMLRequest=')
227 231

  
232
def test_saml_backoffice_redirect():
233
    setup_environment(pub)
234
    resp = get_app(pub).get('/backoffice/')
235
    assert resp.status_int == 302
236
    assert resp.location.startswith('http://example.net/login/?next=')
237
    resp = resp.follow()
238
    assert resp.location.startswith('http://sso.example.net/saml2/sso')
239
    assert urlparse.parse_qs(urlparse.urlparse(resp.location).query)['SAMLRequest']
240
    assert urlparse.parse_qs(urlparse.urlparse(resp.location).query)['RelayState'] == ['http://example.net/backoffice/']
241

  
228 242
def test_saml_register():
229 243
    setup_environment(pub)
230 244
    get_app(pub).get('/register/', status=404)
......
234 248
    # if there's no specific registration URL, this initiates a SSO and there
235 249
    # should be a registration link on the identity provider
236 250
    resp = get_app(pub).get('/register/')
237
    assert resp.location == 'http://example.net/login/'
251
    assert resp.location == 'http://example.net/login/?next=http%3A%2F%2Fexample.net%2Fregister%2F'
238 252
    resp = resp.follow()
239 253
    assert resp.location.startswith('http://sso.example.net/saml2/sso?SAMLRequest=')
240 254

  
wcs/qommon/errors.py
14 14
# You should have received a copy of the GNU General Public License
15 15
# along with this program; if not, see <http://www.gnu.org/licenses/>.
16 16

  
17
from quixote import get_publisher
17
import urllib
18 18

  
19
from quixote import get_publisher
19 20
import quixote
20 21
from quixote.errors import *
21 22
from quixote.html import TemplateIO, htmltext
......
50 51
        if request.user:
51 52
            return AccessForbiddenError.render(self)
52 53

  
53
        session.after_url = request.get_frontoffice_url()
54 54
        if self.public_msg:
55 55
            session.message = ('error', self.public_msg)
56
        login_url = get_publisher().get_root_url() + 'login'
56
        login_url = get_publisher().get_root_url() + 'login/'
57
        login_url += '?' + urllib.urlencode({'next': request.get_frontoffice_url()})
57 58
        quixote.redirect(login_url)
58 59

  
59 60
class EmailError(Exception):
wcs/qommon/ident/idp.py
20 20
    lasso = None
21 21

  
22 22
from cStringIO import StringIO
23
import urllib
23 24
import urllib2
24 25
import urlparse
25 26

  
......
146 147
                                get_cfg('saml_identities', {}).get('registration-url'),
147 148
                                vars)
148 149
                return redirect(registration_url)
149
            get_session().after_url = get_request().get_frontoffice_url()
150 150
            ident_methods = get_cfg('identification', {}).get('methods', [])
151 151
            if len(ident_methods) > 1:
152
                return redirect(get_publisher().get_root_url() + 'login/idp/')
152
                login_url = get_publisher().get_root_url() + 'login/idp/'
153 153
            else:
154
                return redirect(get_publisher().get_root_url() + 'login/')
154
                login_url = get_publisher().get_root_url() + 'login/'
155
            login_url += '?' + urllib.urlencode({'next': get_request().get_frontoffice_url()})
156
            return redirect(login_url)
155 157

  
156 158
        if not get_request().user.anonymous:
157 159
            raise errors.AccessForbiddenError()
......
1058 1060
    def login(self):
1059 1061
        idps = get_cfg('idp', {})
1060 1062

  
1061
        if get_response().iframe_mode and not get_session().after_url:
1062
            t = get_request().get_url()
1063
            get_session().after_url = str('%s://%s%s' % (
1063
        if get_response().iframe_mode and not get_request().form.get('next'):
1064
            get_request().form = {'next': '%s://%s%s' % (
1064 1065
                    get_request().get_scheme(),
1065 1066
                    get_request().get_server(False),
1066
                    get_publisher().get_root_url()))
1067
                    get_publisher().get_root_url())}
1067 1068

  
1068 1069
        # there is only one visible IdP, perform login automatically on
1069 1070
        # this one.
wcs/qommon/ident/password.py
195 195
        return Directory._q_traverse(self, path)
196 196

  
197 197
    def login(self):
198
        next_url = get_request().form.get('next')
199
        if get_request().get_method() == 'GET':
200
            get_request().form = {}
198 201
        identities_cfg = get_cfg('identities', {})
199
        form = Form(enctype = 'multipart/form-data', id = 'login-form', use_tokens = False)
202
        form = Form(enctype='multipart/form-data', id='login-form',
203
                use_tokens=False, action=get_request().get_url())
204
        form.add_hidden('next', next_url)
200 205
        if identities_cfg.get('email-as-username', False):
201 206
            form.add(StringWidget, 'username', title = _('Email'), size=25, required=True)
202 207
        else:
......
285 290
            account.warned_about_unused_account = False
286 291
            account.store()
287 292

  
288
        if session.after_url:
289
            after_url = session.after_url
290
            session.after_url = None
293
        if form and form.get_widget('next').parse():
294
            after_url = form.get_widget('next').parse()
291 295
            return redirect(after_url)
292 296
        else:
293 297
            return redirect(get_publisher().get_root_url() + get_publisher().after_login_url)
wcs/qommon/saml2.py
226 226
        login.request.forceAuthn = False
227 227
        login.request.isPassive = get_request().form.get('IsPassive') == 'true'
228 228
        login.request.consent = 'urn:oasis:names:tc:SAML:2.0:consent:current-implicit'
229
        login.msgRelayState = get_request().form.get('next')
230
        print 'relay state:', login.msgRelayState
229 231
        login.buildAuthnRequestMsg()
230 232
        return redirect(login.msgUrl)
231 233

  
......
385 387
        relay_state = request.form.get('RelayState', None)
386 388
        session = get_session()
387 389
        response = get_response()
388
        if session.after_url:
389
            after_url = session.after_url
390
            session.after_url = None
391
            return redirect(after_url)
392
        response.set_status(303)
393
        if relay_state == 'backoffice' and get_publisher().backoffice_directory_class:
394
            response.headers['location'] = urlparse.urljoin(
395
                    get_request().get_url(), str('../backoffice/'))
390
        if relay_state == 'backoffice':
391
            after_url = get_publisher().get_backoffice_url()
392
        elif relay_state:
393
            after_url = relay_state
396 394
        else:
397
            response.headers['location'] = urlparse.urljoin(get_request().get_url(), str('..'))
395
            after_url = get_publisher().get_frontoffice_url()
396
        response.set_status(303)
397
        response.headers['location'] = after_url
398 398
        response.content_type = 'text/plain'
399 399
        return "Your browser should redirect you"
400 400

  
wcs/qommon/sessions.py
69 69
    _names = 'sessions'
70 70

  
71 71
    name_identifier = None
72
    after_url = None
73 72
    lasso_session_dump = None
74 73
    lasso_session_index = None
75 74
    lasso_anonymous_identity_dump = None
......
104 103
        return (time.time() - self.get_access_time()) > duration
105 104

  
106 105
    def has_info(self):
107
        return self.name_identifier or self.after_url or \
106
        return self.name_identifier or \
108 107
            self.lasso_session_dump or self.message or \
109 108
            self.lasso_anonymous_identity_dump or \
110 109
            self.lasso_manage_name_id_dump or \
wcs/root.py
17 17
import json
18 18
import os
19 19
import re
20
import urllib
20 21

  
21 22
try:
22 23
    from hobo.scrutiny.wsgi.middleware import VersionMiddleware
......
75 76
        ident_methods = get_cfg('identification', {}).get('methods', [])
76 77

  
77 78
        if get_request().form.get('ReturnUrl'):
78
            get_session().after_url = get_request().form.get('ReturnUrl')
79
            get_request().form['next'] = get_request().form.pop('ReturnUrl')
79 80

  
80 81
        if len(ident_methods) == 0:
81 82
            get_logger().info('no ident method defined')
......
109 110
            if form.is_submitted() and not form.has_errors():
110 111
                method = form.get_widget('method').parse()
111 112
                if get_publisher().ident_methods.get(method)().is_interactive():
112
                    return redirect('../ident/%s/login' % method)
113
                    login_url = '../ident/%s/login' % method
114
                    if get_request().form.get('next'):
115
                        login_url += '?' + urllib.urlencode(
116
                                {'next': get_request().form.get('next')})
117
                    return redirect(login_url)
113 118
                else:
114 119
                    try:
115 120
                        return qommon.ident.login(method)
116
-