Projet

Général

Profil

inbetween.diff

Paul Marillonnet, 11 janvier 2018 18:49

Télécharger (9,08 ko)

Voir les différences:


  

mellon/adapters.py
36 36
        else:
37 37
            for extra_idp in self.get_identity_providers_setting():
38 38
                if extra_idp.get('ENTITY_ID') == entity_id or \
39
                        idp_metadata_extract_entity_id(extra_idp) == entity_id:
39
                        idp_metadata_extract_entity_id(extra_idp.get('METADATA')) == entity_id:
40 40
                    idp = extra_idp.copy()
41 41

  
42 42
        extra_idp_settings = idp_settings_load(entity_id)
mellon/federation_utils.py
38 38
        path = os.path.join('metadata-cache', filename)
39 39

  
40 40
        unix_path = default_storage.path(path)
41
        if not os.path.exists('metadata-cache'):
42
            os.makedirs('metadata-cache')
41
        dirname = os.path.dirname(unix_path)
42
        if not os.path.exists(dirname):
43
            os.makedirs(dirname)
43 44
        f = open(unix_path, 'w')
44 45
        try:
45 46
            fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
......
74 75
def get_federation_from_url(url, update_cache=False):
75 76
    logger = logging.getLogger(__name__)
76 77
    filename = url2filename(url)
77
    path = os.path.join('metadata-cache', filename)
78
    if not default_storage.exists(path) or update_cache or \
79
            default_storage.created_time(path) < datetime.now() - timedelta(days=1):
78
    filepath = os.path.join('metadata-cache', filename)
79
    if not default_storage.exists(filepath) or update_cache or \
80
            default_storage.created_time(filepath) < datetime.now() - timedelta(days=1):
80 81
        load_federation_cache(url)
81 82
    else:
82 83
        logger.warning('federation %s has not been loaded', url)
83
    return path
84
    return default_storage.path(filepath)
84 85

  
85 86

  
86 87
def idp_metadata_filepath(entity_id):
87 88
    filename = url2filename(entity_id)
88
    return os.path.join('./metadata-cache', filename)
89
    filepath = os.path.join('./metadata-cache', filename)
90
    return filepath
89 91

  
90 92

  
91 93
def idp_settings_filepath(entity_id):
92 94
    filename = url2filename(entity_id) + "_settings.json"
93
    return os.path.join('./metadata-cache', filename)
95
    filepath = os.path.join('./metadata-cache', filename)
96
    return filepath
94 97

  
95 98

  
96 99
def idp_metadata_is_cached(entity_id):
......
132 135
        return
133 136
    logger = logging.getLogger(__name__)
134 137
    filepath = idp_metadata_filepath(entity_id)
138

  
139
    dirname = os.path.dirname(filepath)
140
    if not default_storage.exists(dirname):
141
        os.makedirs(default_storage.path(dirname))
142

  
135 143
    if idp_metadata_needs_refresh(entity_id):
136
        with open(filepath, 'w') as f:
144
        with open(default_storage.path(filepath), 'w') as f:
137 145
            try:
138 146
                fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
139 147
                f.write(metadata_content)
......
142 150
                logger.error('Couldn\'t store metadata for EntityID %r',
143 151
                        entity_id)
144 152
                return
145
    return filepath
153
    return default_storage.path(filepath)
146 154

  
147 155

  
148 156
def idp_metadata_load(entity_id):
......
150 158
    filepath = idp_metadata_filepath(entity_id)
151 159
    if default_storage.exists(filepath):
152 160
        logger.info('Loading metadata for EntityID %r', entity_id)
153
        with open(filepath, 'r') as f:
161
        with open(default_storage.path(filepath), 'r') as f:
154 162
            return f.read()
155 163
    else:
156 164
        logger.warning('No metadata file for EntityID %r', entity_id)
......
168 176
    if not entity_id:
169 177
        return
170 178

  
179
    dirname = os.path.dirname(filepath)
180
    if not default_storage.exists(dirname):
181
        os.makedirs(default_storage.path(dirname))
182

  
171 183
    for key, value in idp.items():
172 184
        if key not in ('METADATA', 'ENTITY_ID'):
173 185
            idp_settings.update({key: value})
174 186

  
175 187
    if idp_settings_needs_refresh(entity_id) and idp_settings:
176
        with open(filepath, 'w') as f:
188
        with open(default_storage.path(filepath), 'w') as f:
177 189
            try:
178 190
                fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
179 191
                f.write(json.dumps(idp_settings))
......
181 193
            except:
182 194
                logger.error('Couldn\'t store settings for EntityID %r',
183 195
                        entity_id)
184
                return
185
    return filepath
186 196

  
187 197

  
188 198
def idp_settings_load(entity_id):
......
190 200
    filepath = idp_settings_filepath(entity_id)
191 201
    if default_storage.exists(filepath):
192 202
        logger.info('Loading JSON settings for EntityID %r', entity_id)
193
        with open(filepath, 'r') as f:
203
        with open(default_storage.path(filepath), 'r') as f:
194 204
            try:
195 205
                idp_settings = json.loads(f.read())
196 206
            except:
tests/conftest.py
42 42
    caplog.handler.stream = py.io.TextIO()
43 43
    caplog.handler.records = []
44 44
    return caplog
45

  
46

  
47
# XXX temporary workaround
48
#     non-federated IdPs shouldn't have their MD cached
49
@pytest.fixture(autouse=True)
50
def mellon_settings(settings, tmpdir):
51
        settings.MEDIA_ROOT = str(tmpdir)
tests/test_default_adapter.py
89 89
    assert user.email == 'test@example.net'
90 90
    assert user.is_superuser is False
91 91
    assert user.is_staff is False
92
    assert len(caplog.records) == 6
92
    assert len(caplog.records) == 5
93 93
    assert 'created new user' in caplog.text
94 94
    assert 'set field first_name' in caplog.text
95 95
    assert 'set field last_name' in caplog.text
......
102 102
    user = SAMLBackend().authenticate(saml_attributes=saml_attributes)
103 103
    assert user.groups.count() == 3
104 104
    assert set(user.groups.values_list('name', flat=True)) == set(saml_attributes['group'])
105
    assert len(caplog.records) == 6
105
    assert len(caplog.records) == 5
106 106
    assert 'created new user' in caplog.text
107 107
    assert 'adding group GroupA' in caplog.text
108 108
    assert 'adding group GroupB' in caplog.text
......
112 112
    user = SAMLBackend().authenticate(saml_attributes=saml_attributes2)
113 113
    assert user.groups.count() == 2
114 114
    assert set(user.groups.values_list('name', flat=True)) == set(saml_attributes2['group'])
115
    assert len(caplog.records) == 9
115
    assert len(caplog.records) == 7
116 116
    assert 'removing group GroupA' in caplog.records[-1].message
117 117

  
118 118

  
......
142 142
    del local_saml_attributes['email']
143 143
    user = SAMLBackend().authenticate(saml_attributes=local_saml_attributes)
144 144
    assert not user.email
145
    assert len(caplog.records) == 6
145
    assert len(caplog.records) == 5
146 146
    assert 'created new user' in caplog.text
147 147
    assert re.search(r'invalid reference.*email', caplog.text)
148 148
    assert 'set field first_name' in caplog.text
......
160 160
    local_saml_attributes['first_name'] = [('y' * 32)]
161 161
    user = SAMLBackend().authenticate(saml_attributes=local_saml_attributes)
162 162
    assert user.first_name == 'y' * 30
163
    assert len(caplog.records) == 6
163
    assert len(caplog.records) == 5
164 164
    assert 'created new user' in caplog.text
165 165
    assert 'set field first_name' in caplog.text
166 166
    assert 'to value %r ' % (u'y' * 30) in caplog.text
tests/test_federations_utils.py
1 1
import os
2 2
import time
3 3

  
4
from django.core.files.storage import default_storage
4 5
from django.utils.text import slugify
5 6
from httmock import HTTMock
6 7

  
......
10 11

  
11 12
def test_mock_fedmd_caching():
12 13
    url = u'https://dummy.mdserver/metadata.xml'
13
    filepath = os.path.join('metadata-cache/', truncate_unique(slugify(url)))
14

  
15
    if os.path.isfile(filepath):
16
        os.remove(filepath)
14
    filepath = default_storage.path(os.path.join('metadata-cache/', truncate_unique(slugify(url))))
17 15

  
18 16
    with HTTMock(sample_federation_response):
19 17
        tmp = get_federation_from_url(url)
20 18

  
21
    assert tmp == filepath
19
    assert default_storage.path(tmp) == filepath
22 20

  
23 21
    st = os.stat(filepath)
24 22

  
......
35 33
    storig = os.stat(os.path.join('tests', 'federation-sample.xml'))
36 34

  
37 35
    assert storig.st_size == st.st_size
38

  
39
    os.remove(filepath)