0002-admin-manage-api-access-48751.patch
tests/admin_pages/test_all.py | ||
---|---|---|
1394 | 1394 |
pub.cfg['postgresql']['port'] = '5432' # from an old convert-to-sql (before #10170) |
1395 | 1395 |
pub.write_cfg() |
1396 | 1396 |
resp = app.get('/backoffice/settings/postgresql') |
1397 | 1397 |
assert resp.form['port'].value == '5432' |
1398 | 1398 |
resp = resp.form.submit() |
1399 | 1399 |
assert pub.cfg['postgresql']['port'] == 5432 |
1400 | 1400 | |
1401 | 1401 | |
1402 |
def test_settings_api_access(pub): |
|
1403 |
create_superuser(pub) |
|
1404 |
app = login(get_app(pub)) |
|
1405 | ||
1406 |
resp = app.get('/backoffice/settings/api-access/') |
|
1407 |
assert 'There are no API access defined at the moment.' in resp.text |
|
1408 |
resp.forms[0]['user'] = 'jhon' |
|
1409 |
resp.forms[0]['key'] = 'xxx' |
|
1410 |
resp = resp.forms[0].submit('submit') |
|
1411 |
assert resp.location == 'http://example.net/backoffice/settings/api-access/' |
|
1412 |
resp = resp.follow() |
|
1413 |
assert 'jhon' in resp.text |
|
1414 |
assert pub.get_site_parameter('jhon', 'api-secrets') == 'xxx' |
|
1415 | ||
1416 |
resp = resp.click('jhon') |
|
1417 |
assert resp.forms[0]['user'].value == 'jhon' |
|
1418 |
resp.forms[0]['user'] = 'jdoe' |
|
1419 |
resp.forms[0]['key'] = 'yyy' |
|
1420 |
resp = resp.forms[0].submit('submit') |
|
1421 |
assert resp.location == 'http://example.net/backoffice/settings/api-access/' |
|
1422 |
resp = resp.follow() |
|
1423 |
assert 'jhon' not in resp.text |
|
1424 |
assert 'jdoe' in resp.text |
|
1425 |
assert not pub.get_site_parameter('jhon', 'api-secrets') |
|
1426 |
assert pub.get_site_parameter('jdoe', 'api-secrets') == 'yyy' |
|
1427 | ||
1428 |
resp.forms[0]['user'] = 'smith' |
|
1429 |
resp.forms[0]['key'] = 'zzz' |
|
1430 |
resp = resp.forms[0].submit('submit') |
|
1431 |
resp = resp.follow() |
|
1432 |
resp = resp.click('smith') # go to form |
|
1433 |
resp = resp.forms[0].submit('cancel') # and cancel |
|
1434 |
assert resp.location == 'http://example.net/backoffice/settings/api-access/' |
|
1435 |
resp = resp.follow() |
|
1436 |
assert 'smith' in resp.text |
|
1437 | ||
1438 |
resp = resp.click('smith') # go to form |
|
1439 |
resp = resp.forms[0].submit('delete') # and delete |
|
1440 |
assert resp.location == 'http://example.net/backoffice/settings/api-access/' |
|
1441 |
resp = resp.follow() |
|
1442 |
assert 'smith' not in resp.text |
|
1443 | ||
1444 | ||
1402 | 1445 |
def test_studio_home(pub, studio): |
1403 | 1446 |
create_superuser(pub) |
1404 | 1447 |
app = login(get_app(pub)) |
1405 | 1448 |
resp = app.get('/backoffice/') |
1406 | 1449 |
assert 'studio' in resp.text |
1407 | 1450 |
resp = app.get('/backoffice/studio/') |
1408 | 1451 |
assert '../forms/' in resp.text |
1409 | 1452 |
assert '../cards/' in resp.text |
wcs/admin/settings.py | ||
---|---|---|
417 | 417 |
# restore original branding in case it has been changed |
418 | 418 |
get_publisher().cfg['branding'] = original_branding |
419 | 419 |
get_publisher().write_cfg() |
420 | 420 |
response.filter['raw'] = True |
421 | 421 | |
422 | 422 |
return theme_preview |
423 | 423 | |
424 | 424 | |
425 |
class ApiAccessDirectory(Directory): |
|
426 |
_q_exports = ['', 'add'] |
|
427 | ||
428 |
def get_form(self, user=None, key=None): |
|
429 |
form = Form(enctype='multipart/form-data') |
|
430 |
form.add(StringWidget, 'user', title=_('User'), required=True, |
|
431 |
value=user) |
|
432 |
form.add(StringWidget, 'key', title=_('Key'), |
|
433 |
required=True, size=70, |
|
434 |
value=key) |
|
435 |
form.add_submit('submit', _('Save')) |
|
436 |
return form |
|
437 | ||
438 |
def _q_index(self): |
|
439 |
html_top('settings', title=_('File Types')) |
|
440 | ||
441 |
filetypes_cfg = get_cfg('filetypes', {}) |
|
442 |
access_cfg = get_publisher().get_site_parameter('secrets', 'api') |
|
443 | ||
444 |
form = self.get_form() |
|
445 |
if form.get_submit() == 'submit': |
|
446 |
user = form.get_widget('user').parse() |
|
447 |
key = form.get_widget('key').parse() |
|
448 |
get_publisher().set_site_parameter(user, key, 'api-secrets') |
|
449 |
get_publisher().save_site_parameters() |
|
450 |
return redirect('.') |
|
451 | ||
452 |
r = TemplateIO(html=True) |
|
453 |
r += htmltext('<h2>%s</h2>') % _('API access') |
|
454 |
if access_cfg: |
|
455 |
r += htmltext('<ul class="objects-list biglist api-access">') |
|
456 |
for access_id, access in enumerate(access_cfg.items()): |
|
457 |
r += htmltext('<li>') |
|
458 |
r += htmltext(' <a href="%s">%s <span class="extra-info"></span></a>') % ( |
|
459 |
access_id, |
|
460 |
access[0], |
|
461 |
) |
|
462 |
r += htmltext('</li>') |
|
463 |
r += htmltext('</ul>') |
|
464 |
else: |
|
465 |
r += htmltext('<div class="infonotice"><p>') |
|
466 |
r += _('There are no API access defined at the moment.') |
|
467 |
r += htmltext('</p></div>') |
|
468 | ||
469 |
r += htmltext('<h3>%s</h3>') % _('New API access') |
|
470 |
r += form.render() |
|
471 | ||
472 |
return r.getvalue() |
|
473 | ||
474 |
def _q_traverse(self, path): |
|
475 |
get_response().breadcrumb.append(('api-access/', _('API Access'))) |
|
476 |
return Directory._q_traverse(self, path) |
|
477 | ||
478 |
def _q_lookup(self, component): |
|
479 |
access_cfg = get_publisher().get_site_parameter('secrets', 'api') |
|
480 |
try: |
|
481 |
access_id = int(component) |
|
482 |
old_user, old_key = list(access_cfg.items())[access_id] |
|
483 |
except (ValueError, KeyError): |
|
484 |
raise errors.TraversalError() |
|
485 | ||
486 |
form = self.get_form(old_user, old_key) |
|
487 |
form.add_submit('cancel', _('Cancel')) |
|
488 |
form.add_submit('delete', _('Delete')) |
|
489 | ||
490 |
if form.get_widget('cancel').parse(): |
|
491 |
return redirect('.') |
|
492 | ||
493 |
if form.get_submit() == 'submit': |
|
494 |
del access_cfg[old_user] |
|
495 |
user = form.get_widget('user').parse() |
|
496 |
key = form.get_widget('key').parse() |
|
497 |
get_publisher().set_site_parameter(user, key, 'api-secrets') |
|
498 |
get_publisher().save_site_parameters() |
|
499 |
return redirect('.') |
|
500 | ||
501 |
if form.get_submit() == 'delete': |
|
502 |
del access_cfg[old_user] |
|
503 |
get_publisher().save_site_parameters() |
|
504 |
return redirect('.') |
|
505 | ||
506 |
get_response().breadcrumb.append((component + '/', old_user)) |
|
507 |
html_top('settings', title=_('File Types')) |
|
508 |
r = TemplateIO(html=True) |
|
509 |
r += htmltext('<h2>%s - %s</h2>') % (_('API Access'), old_user) |
|
510 |
r += form.render() |
|
511 |
return r.getvalue() |
|
512 | ||
513 | ||
425 | 514 |
class SettingsDirectory(QommonSettingsDirectory): |
426 | 515 |
_q_exports = ['', 'themes', 'users', |
427 | 516 |
'template', 'emails', 'debug_options', 'language', |
428 | 517 |
('import', 'p_import'), 'export', 'identification', 'sitename', |
429 | 518 |
'sms', 'certificates', 'texts', 'install_theme', |
430 | 519 |
'session', 'download_theme', 'smstest', 'postgresql', |
431 | 520 |
('admin-permissions', 'admin_permissions'), 'geolocation', |
432 | 521 |
'theme_preview', 'filetypes', |
433 | 522 |
('user-template', 'user_template'), |
434 |
('data-sources', 'data_sources'), 'wscalls', 'logs'] |
|
523 |
('data-sources', 'data_sources'), 'wscalls', 'logs', |
|
524 |
('api-access', 'api_access')] |
|
435 | 525 | |
436 | 526 |
emails = EmailsDirectory() |
437 | 527 |
identification = IdentificationDirectory() |
438 | 528 |
users = UsersDirectory() |
439 | 529 |
texts = TextsDirectory() |
440 | 530 |
theme_preview = ThemePreviewDirectory() |
441 | 531 |
filetypes = FileTypesDirectory() |
532 |
api_access = ApiAccessDirectory() |
|
442 | 533 |
data_sources = NamedDataSourcesDirectory() |
443 | 534 |
wscalls = NamedWsCallsDirectory() |
444 | 535 |
logs = LoggerDirectory() |
445 | 536 | |
446 | 537 |
def _q_index(self): |
447 | 538 |
html_top('settings', title = _('Settings')) |
448 | 539 |
r = TemplateIO(html=True) |
449 | 540 | |
... | ... | |
486 | 577 |
_('Session'), _('Configure session management')) |
487 | 578 | |
488 | 579 |
if enabled('permissions'): |
489 | 580 |
roles = list(Role.select()) |
490 | 581 |
if roles: |
491 | 582 |
r += htmltext('<dt><a href="admin-permissions">%s</a></dt> <dd>%s</dd>') % ( |
492 | 583 |
_('Admin Permissions'), _('Configure access to the administration interface')) |
493 | 584 | |
585 |
if enabled('api-access'): |
|
586 |
r += htmltext('<dt><a href="api-access">%s</a></dt> <dd>%s</dd>') % ( |
|
587 |
_('API Access'), _('Configure access to the application programming interface')) |
|
588 | ||
494 | 589 |
r += htmltext('</dl></div>') |
495 | 590 | |
496 | 591 |
if enabled('import-export'): |
497 | 592 |
r += htmltext('<div class="section">') |
498 | 593 |
r += htmltext('<h2>%s</h2>') % _('Import / Export') |
499 | 594 | |
500 | 595 |
r += htmltext('<dl>') |
501 | 596 |
r += htmltext('<dt><a href="import">%s</a></dt> <dd>%s</dd>') % ( |
wcs/parameters.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU General Public License |
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 | 17 |
from collections import OrderedDict |
18 | 18 |
import xml.etree.ElementTree as ET |
19 | 19 | |
20 | 20 |
from quixote import get_publisher |
21 | 21 | |
22 |
from wcs.qommon import force_str |
|
22 | 23 |
from wcs.qommon.xml_storage import XmlStorableObject |
23 | 24 | |
24 | 25 | |
25 | 26 |
class Parameters(XmlStorableObject): |
26 | 27 | |
27 | 28 |
def __init__(self): |
28 | 29 |
self.parameters = OrderedDict([('options', OrderedDict())]) |
29 | 30 |
self.id = None |
... | ... | |
46 | 47 |
d[child.tag.lower()] = etree_to_dict(child) |
47 | 48 |
if not d: |
48 | 49 |
return element.text |
49 | 50 |
return d |
50 | 51 | |
51 | 52 |
obj.parameters = etree_to_dict(tree) or OrderedDict() |
52 | 53 |
return obj |
53 | 54 | |
55 |
def export_to_xml(self, include_id=False): |
|
56 |
charset = get_publisher().site_charset |
|
57 |
root = ET.Element('parameters') |
|
58 | ||
59 |
def dict_to_etree(parent, d): |
|
60 |
if not isinstance(d, dict): |
|
61 |
parent.text = d |
|
62 |
return |
|
63 |
for key in d: |
|
64 |
dict_to_etree(ET.SubElement(parent, key), d[key]) |
|
65 | ||
66 |
def remove_empty_nodes(element): |
|
67 |
if element.text: |
|
68 |
return 1 |
|
69 |
nb_childs = 0 |
|
70 |
for child in element.getchildren(): |
|
71 |
if remove_empty_nodes(child) == 0: |
|
72 |
element.remove(child) |
|
73 |
else: |
|
74 |
nb_childs += 1 |
|
75 |
return nb_childs |
|
76 | ||
77 |
dict_to_etree(root, self.parameters) |
|
78 |
remove_empty_nodes(root) |
|
79 |
return root |
|
80 | ||
54 | 81 |
def get(self, option, section='options'): |
55 | 82 |
d = self.parameters |
56 | 83 |
for tag in section.split('-'): |
57 | 84 |
d = d.get(tag) |
58 | 85 |
if not d: |
59 | 86 |
return None |
60 | 87 |
value = d.get(option) |
61 | 88 |
return value |
89 | ||
90 |
def set(self, option, value, section='options'): |
|
91 |
d = self.parameters |
|
92 |
for tag in section.split('-'): |
|
93 |
if not d.get(tag): |
|
94 |
d[tag] = OrderedDict() |
|
95 |
d = d[tag] |
|
96 |
d[option] = value |
wcs/qommon/publisher.py | ||
---|---|---|
420 | 420 |
self.get_app_logger().error('failed to read site options XML file') |
421 | 421 |
return |
422 | 422 | |
423 | 423 |
def get_site_parameter(self, option, section='options'): |
424 | 424 |
if self.site_parameters is None: |
425 | 425 |
self.load_site_parameters() |
426 | 426 |
return self.site_parameters.get(option, section) |
427 | 427 | |
428 |
def set_site_parameter(self, option, value, section='options'): |
|
429 |
if self.site_parameters is None: |
|
430 |
self.load_site_parameters() |
|
431 |
self.site_parameters.set(option, value, section) |
|
432 | ||
433 |
def save_site_parameters(self): |
|
434 |
if not self.site_parameters: |
|
435 |
return |
|
436 |
try: |
|
437 |
dump = b'<?xml version="1.0"?>\n' + self.site_parameters.export_to_xml_string() |
|
438 |
except: |
|
439 |
self.get_app_logger().error('failed to write site options XML file') |
|
440 |
return |
|
441 |
filename = os.path.join(self.app_dir, 'site-parameters.xml') |
|
442 |
storage.atomic_write(filename, dump, async_op=False) |
|
443 | ||
428 | 444 |
def get_site_option(self, option, section='options'): |
429 | 445 |
return self.get_site_parameter(option, section) or self._get_site_option(option, section) |
430 | 446 | |
431 | 447 |
def set_config(self, request = None): |
432 | 448 |
self.reload_cfg() |
433 | 449 |
self.site_options = None # reset at the beginning of a request |
434 | 450 |
self.site_parameters = None |
435 | 451 |
debug_cfg = self.cfg.get('debug', {}) |
436 |
- |