inbetween.diff
mellon/adapters.py | ||
---|---|---|
36 | 36 |
else: |
37 | 37 |
for extra_idp in self.get_identity_providers_setting(): |
38 | 38 |
if extra_idp.get('ENTITY_ID') == entity_id or \ |
39 |
idp_metadata_extract_entity_id(extra_idp) == entity_id: |
|
39 |
idp_metadata_extract_entity_id(extra_idp.get('METADATA')) == entity_id:
|
|
40 | 40 |
idp = extra_idp.copy() |
41 | 41 | |
42 | 42 |
extra_idp_settings = idp_settings_load(entity_id) |
mellon/federation_utils.py | ||
---|---|---|
38 | 38 |
path = os.path.join('metadata-cache', filename) |
39 | 39 | |
40 | 40 |
unix_path = default_storage.path(path) |
41 |
if not os.path.exists('metadata-cache'): |
|
42 |
os.makedirs('metadata-cache') |
|
41 |
dirname = os.path.dirname(unix_path) |
|
42 |
if not os.path.exists(dirname): |
|
43 |
os.makedirs(dirname) |
|
43 | 44 |
f = open(unix_path, 'w') |
44 | 45 |
try: |
45 | 46 |
fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB) |
... | ... | |
74 | 75 |
def get_federation_from_url(url, update_cache=False): |
75 | 76 |
logger = logging.getLogger(__name__) |
76 | 77 |
filename = url2filename(url) |
77 |
path = os.path.join('metadata-cache', filename) |
|
78 |
if not default_storage.exists(path) or update_cache or \ |
|
79 |
default_storage.created_time(path) < datetime.now() - timedelta(days=1): |
|
78 |
filepath = os.path.join('metadata-cache', filename)
|
|
79 |
if not default_storage.exists(filepath) or update_cache or \
|
|
80 |
default_storage.created_time(filepath) < datetime.now() - timedelta(days=1):
|
|
80 | 81 |
load_federation_cache(url) |
81 | 82 |
else: |
82 | 83 |
logger.warning('federation %s has not been loaded', url) |
83 |
return path
|
|
84 |
return default_storage.path(filepath)
|
|
84 | 85 | |
85 | 86 | |
86 | 87 |
def idp_metadata_filepath(entity_id): |
87 | 88 |
filename = url2filename(entity_id) |
88 |
return os.path.join('./metadata-cache', filename) |
|
89 |
filepath = os.path.join('./metadata-cache', filename) |
|
90 |
return filepath |
|
89 | 91 | |
90 | 92 | |
91 | 93 |
def idp_settings_filepath(entity_id): |
92 | 94 |
filename = url2filename(entity_id) + "_settings.json" |
93 |
return os.path.join('./metadata-cache', filename) |
|
95 |
filepath = os.path.join('./metadata-cache', filename) |
|
96 |
return filepath |
|
94 | 97 | |
95 | 98 | |
96 | 99 |
def idp_metadata_is_cached(entity_id): |
... | ... | |
132 | 135 |
return |
133 | 136 |
logger = logging.getLogger(__name__) |
134 | 137 |
filepath = idp_metadata_filepath(entity_id) |
138 | ||
139 |
dirname = os.path.dirname(filepath) |
|
140 |
if not default_storage.exists(dirname): |
|
141 |
os.makedirs(default_storage.path(dirname)) |
|
142 | ||
135 | 143 |
if idp_metadata_needs_refresh(entity_id): |
136 |
with open(filepath, 'w') as f:
|
|
144 |
with open(default_storage.path(filepath), 'w') as f:
|
|
137 | 145 |
try: |
138 | 146 |
fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB) |
139 | 147 |
f.write(metadata_content) |
... | ... | |
142 | 150 |
logger.error('Couldn\'t store metadata for EntityID %r', |
143 | 151 |
entity_id) |
144 | 152 |
return |
145 |
return filepath
|
|
153 |
return default_storage.path(filepath)
|
|
146 | 154 | |
147 | 155 | |
148 | 156 |
def idp_metadata_load(entity_id): |
... | ... | |
150 | 158 |
filepath = idp_metadata_filepath(entity_id) |
151 | 159 |
if default_storage.exists(filepath): |
152 | 160 |
logger.info('Loading metadata for EntityID %r', entity_id) |
153 |
with open(filepath, 'r') as f:
|
|
161 |
with open(default_storage.path(filepath), 'r') as f:
|
|
154 | 162 |
return f.read() |
155 | 163 |
else: |
156 | 164 |
logger.warning('No metadata file for EntityID %r', entity_id) |
... | ... | |
168 | 176 |
if not entity_id: |
169 | 177 |
return |
170 | 178 | |
179 |
dirname = os.path.dirname(filepath) |
|
180 |
if not default_storage.exists(dirname): |
|
181 |
os.makedirs(default_storage.path(dirname)) |
|
182 | ||
171 | 183 |
for key, value in idp.items(): |
172 | 184 |
if key not in ('METADATA', 'ENTITY_ID'): |
173 | 185 |
idp_settings.update({key: value}) |
174 | 186 | |
175 | 187 |
if idp_settings_needs_refresh(entity_id) and idp_settings: |
176 |
with open(filepath, 'w') as f:
|
|
188 |
with open(default_storage.path(filepath), 'w') as f:
|
|
177 | 189 |
try: |
178 | 190 |
fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB) |
179 | 191 |
f.write(json.dumps(idp_settings)) |
... | ... | |
181 | 193 |
except: |
182 | 194 |
logger.error('Couldn\'t store settings for EntityID %r', |
183 | 195 |
entity_id) |
184 |
return |
|
185 |
return filepath |
|
186 | 196 | |
187 | 197 | |
188 | 198 |
def idp_settings_load(entity_id): |
... | ... | |
190 | 200 |
filepath = idp_settings_filepath(entity_id) |
191 | 201 |
if default_storage.exists(filepath): |
192 | 202 |
logger.info('Loading JSON settings for EntityID %r', entity_id) |
193 |
with open(filepath, 'r') as f:
|
|
203 |
with open(default_storage.path(filepath), 'r') as f:
|
|
194 | 204 |
try: |
195 | 205 |
idp_settings = json.loads(f.read()) |
196 | 206 |
except: |
tests/conftest.py | ||
---|---|---|
42 | 42 |
caplog.handler.stream = py.io.TextIO() |
43 | 43 |
caplog.handler.records = [] |
44 | 44 |
return caplog |
45 | ||
46 | ||
47 |
# XXX temporary workaround |
|
48 |
# non-federated IdPs shouldn't have their MD cached |
|
49 |
@pytest.fixture(autouse=True) |
|
50 |
def mellon_settings(settings, tmpdir): |
|
51 |
settings.MEDIA_ROOT = str(tmpdir) |
tests/test_default_adapter.py | ||
---|---|---|
89 | 89 |
assert user.email == 'test@example.net' |
90 | 90 |
assert user.is_superuser is False |
91 | 91 |
assert user.is_staff is False |
92 |
assert len(caplog.records) == 6
|
|
92 |
assert len(caplog.records) == 5
|
|
93 | 93 |
assert 'created new user' in caplog.text |
94 | 94 |
assert 'set field first_name' in caplog.text |
95 | 95 |
assert 'set field last_name' in caplog.text |
... | ... | |
102 | 102 |
user = SAMLBackend().authenticate(saml_attributes=saml_attributes) |
103 | 103 |
assert user.groups.count() == 3 |
104 | 104 |
assert set(user.groups.values_list('name', flat=True)) == set(saml_attributes['group']) |
105 |
assert len(caplog.records) == 6
|
|
105 |
assert len(caplog.records) == 5
|
|
106 | 106 |
assert 'created new user' in caplog.text |
107 | 107 |
assert 'adding group GroupA' in caplog.text |
108 | 108 |
assert 'adding group GroupB' in caplog.text |
... | ... | |
112 | 112 |
user = SAMLBackend().authenticate(saml_attributes=saml_attributes2) |
113 | 113 |
assert user.groups.count() == 2 |
114 | 114 |
assert set(user.groups.values_list('name', flat=True)) == set(saml_attributes2['group']) |
115 |
assert len(caplog.records) == 9
|
|
115 |
assert len(caplog.records) == 7
|
|
116 | 116 |
assert 'removing group GroupA' in caplog.records[-1].message |
117 | 117 | |
118 | 118 | |
... | ... | |
142 | 142 |
del local_saml_attributes['email'] |
143 | 143 |
user = SAMLBackend().authenticate(saml_attributes=local_saml_attributes) |
144 | 144 |
assert not user.email |
145 |
assert len(caplog.records) == 6
|
|
145 |
assert len(caplog.records) == 5
|
|
146 | 146 |
assert 'created new user' in caplog.text |
147 | 147 |
assert re.search(r'invalid reference.*email', caplog.text) |
148 | 148 |
assert 'set field first_name' in caplog.text |
... | ... | |
160 | 160 |
local_saml_attributes['first_name'] = [('y' * 32)] |
161 | 161 |
user = SAMLBackend().authenticate(saml_attributes=local_saml_attributes) |
162 | 162 |
assert user.first_name == 'y' * 30 |
163 |
assert len(caplog.records) == 6
|
|
163 |
assert len(caplog.records) == 5
|
|
164 | 164 |
assert 'created new user' in caplog.text |
165 | 165 |
assert 'set field first_name' in caplog.text |
166 | 166 |
assert 'to value %r ' % (u'y' * 30) in caplog.text |
tests/test_federations_utils.py | ||
---|---|---|
1 | 1 |
import os |
2 | 2 |
import time |
3 | 3 | |
4 |
from django.core.files.storage import default_storage |
|
4 | 5 |
from django.utils.text import slugify |
5 | 6 |
from httmock import HTTMock |
6 | 7 | |
... | ... | |
10 | 11 | |
11 | 12 |
def test_mock_fedmd_caching(): |
12 | 13 |
url = u'https://dummy.mdserver/metadata.xml' |
13 |
filepath = os.path.join('metadata-cache/', truncate_unique(slugify(url))) |
|
14 | ||
15 |
if os.path.isfile(filepath): |
|
16 |
os.remove(filepath) |
|
14 |
filepath = default_storage.path(os.path.join('metadata-cache/', truncate_unique(slugify(url)))) |
|
17 | 15 | |
18 | 16 |
with HTTMock(sample_federation_response): |
19 | 17 |
tmp = get_federation_from_url(url) |
20 | 18 | |
21 |
assert tmp == filepath
|
|
19 |
assert default_storage.path(tmp) == filepath
|
|
22 | 20 | |
23 | 21 |
st = os.stat(filepath) |
24 | 22 | |
... | ... | |
35 | 33 |
storig = os.stat(os.path.join('tests', 'federation-sample.xml')) |
36 | 34 | |
37 | 35 |
assert storig.st_size == st.st_size |
38 | ||
39 |
os.remove(filepath) |