0001-pricing-synchronise-agendas-list-detail-views-65361.patch
lingo/agendas/chrono.py | ||
---|---|---|
1 |
# lingo - payment and billing system |
|
2 |
# Copyright (C) 2022 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
import json |
|
18 | ||
19 |
from django.conf import settings |
|
20 |
from requests.exceptions import RequestException |
|
21 | ||
22 |
from lingo.agendas.models import Agenda |
|
23 |
from lingo.utils import requests |
|
24 | ||
25 | ||
26 |
def is_chrono_enabled(): |
|
27 |
return hasattr(settings, 'KNOWN_SERVICES') and settings.KNOWN_SERVICES.get('chrono') |
|
28 | ||
29 | ||
30 |
def get_chrono_service(): |
|
31 |
if not is_chrono_enabled(): |
|
32 |
return {} |
|
33 |
return list(settings.KNOWN_SERVICES.get('chrono').values())[0] |
|
34 | ||
35 | ||
36 |
def get_chrono_json(path, log_errors=True): |
|
37 |
chrono_site = get_chrono_service() |
|
38 |
if chrono_site is None: |
|
39 |
return |
|
40 |
try: |
|
41 |
response = requests.get( |
|
42 |
path, |
|
43 |
remote_service=chrono_site, |
|
44 |
without_user=True, |
|
45 |
headers={'accept': 'application/json'}, |
|
46 |
log_errors=log_errors, |
|
47 |
) |
|
48 |
response.raise_for_status() |
|
49 |
except RequestException as e: |
|
50 |
if e.response is not None: |
|
51 |
try: |
|
52 |
# return json if available (on 404 responses by example) |
|
53 |
return e.response.json() |
|
54 |
except json.JSONDecodeError: |
|
55 |
pass |
|
56 |
return |
|
57 |
return response.json() |
|
58 | ||
59 | ||
60 |
def collect_agenda_data(): |
|
61 |
result = get_chrono_json('api/agenda/') |
|
62 |
if result is None: |
|
63 |
return |
|
64 |
if result.get('data') is None: |
|
65 |
return |
|
66 | ||
67 |
agenda_data = [] |
|
68 |
for agenda in result['data']: |
|
69 |
if agenda['kind'] != 'events': |
|
70 |
continue |
|
71 |
agenda_data.append( |
|
72 |
{ |
|
73 |
'slug': agenda['slug'], |
|
74 |
'label': agenda['text'], |
|
75 |
'category_slug': agenda['category'], |
|
76 |
'category_label': agenda['category_label'], |
|
77 |
} |
|
78 |
) |
|
79 |
return agenda_data |
|
80 | ||
81 | ||
82 |
def refresh_agendas(): |
|
83 |
result = collect_agenda_data() |
|
84 |
if result is None: |
|
85 |
return |
|
86 | ||
87 |
# fetch existing agendas |
|
88 |
existing_agendas = {a.slug: a for a in Agenda.objects.all()} |
|
89 |
seen_agendas = [] |
|
90 | ||
91 |
# build agendas from chrono |
|
92 |
for agenda_data in result: |
|
93 |
slug = agenda_data['slug'] |
|
94 |
agenda = existing_agendas.get(slug) or Agenda(slug=slug) |
|
95 |
for key, value in agenda_data.items(): |
|
96 |
if key == 'slug': |
|
97 |
continue |
|
98 |
setattr(agenda, key, value) |
|
99 |
agenda.save() |
|
100 |
seen_agendas.append(agenda.slug) |
|
101 | ||
102 |
# now check outdated agendas |
|
103 |
for slug, agenda in existing_agendas.items(): |
|
104 |
if slug not in seen_agendas: |
|
105 |
agenda.delete() |
lingo/agendas/migrations/0002_agenda.py | ||
---|---|---|
1 |
from django.db import migrations, models |
|
2 | ||
3 | ||
4 |
class Migration(migrations.Migration): |
|
5 | ||
6 |
dependencies = [ |
|
7 |
('agendas', '0001_initial'), |
|
8 |
] |
|
9 | ||
10 |
operations = [ |
|
11 |
migrations.AddField( |
|
12 |
model_name='agenda', |
|
13 |
name='category_label', |
|
14 |
field=models.CharField(null=True, max_length=150, verbose_name='Category label'), |
|
15 |
), |
|
16 |
migrations.AddField( |
|
17 |
model_name='agenda', |
|
18 |
name='category_slug', |
|
19 |
field=models.SlugField(null=True, max_length=160, verbose_name='Category identifier'), |
|
20 |
), |
|
21 |
] |
lingo/agendas/models.py | ||
---|---|---|
26 | 26 |
class Agenda(models.Model): |
27 | 27 |
label = models.CharField(_('Label'), max_length=150) |
28 | 28 |
slug = models.SlugField(_('Identifier'), max_length=160, unique=True) |
29 |
category_label = models.CharField(_('Category label'), max_length=150, null=True) |
|
30 |
category_slug = models.SlugField(_('Category identifier'), max_length=160, null=True) |
|
29 | 31 | |
30 | 32 |
def __str__(self): |
31 | 33 |
return self.label |
lingo/agendas/views.py | ||
---|---|---|
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 | 17 |
from django.shortcuts import get_object_or_404 |
18 |
from django.urls import reverse |
|
18 | 19 | |
19 | 20 |
from .models import Agenda |
20 | 21 | |
... | ... | |
34 | 35 |
context['agenda'] = self.agenda |
35 | 36 |
return context |
36 | 37 | |
38 |
def get_form_kwargs(self): |
|
39 |
kwargs = super().get_form_kwargs() |
|
40 |
if not kwargs.get('instance'): |
|
41 |
kwargs['instance'] = self.model() |
|
42 |
kwargs['instance'].agenda = self.agenda |
|
43 |
return kwargs |
|
44 | ||
37 | 45 |
def get_success_url(self): |
38 |
# XXX: return reverse('lingo-manager-agenda-settings', kwargs={'pk': self.agenda.id}) |
|
39 |
return '/' |
|
46 |
return reverse('lingo-manager-agenda-detail', kwargs={'pk': self.agenda.pk}) |
lingo/manager/static/css/style.scss | ||
---|---|---|
1 |
li span.identifier { |
|
2 |
font-size: 80%; |
|
3 |
opacity: 0.6; |
|
4 |
} |
|
5 | ||
6 |
h2 span.identifier { |
|
7 |
font-size: 1rem; |
|
8 |
opacity: 0.6; |
|
9 |
} |
|
10 | ||
1 | 11 |
div.paragraph { |
2 | 12 |
background: white; |
3 | 13 |
box-sizing: border-box; |
lingo/manager/templates/lingo/manager_agenda_settings.html | ||
---|---|---|
1 |
{% extends "lingo/manager_agenda_view.html" %} |
lingo/manager/templates/lingo/manager_agenda_view.html | ||
---|---|---|
1 |
{% extends "lingo/manager_base.html" %} |
lingo/pricing/templates/lingo/pricing/manager_agenda_detail.html | ||
---|---|---|
1 |
{% extends "lingo/pricing/manager_agenda_list.html" %} |
|
2 |
{% load i18n %} |
|
3 | ||
4 |
{% block breadcrumb %} |
|
5 |
{{ block.super }} |
|
6 |
<a href="{% url 'lingo-manager-agenda-detail' agenda.pk %}">{{ agenda }}</a> |
|
7 |
{% endblock %} |
|
8 | ||
9 |
{% block appbar %} |
|
10 |
<h2>{{ agenda }} |
|
11 |
<span class="identifier">[{% trans "identifier:" %} {{ agenda.slug }}]</span> |
|
12 |
</h2> |
|
13 |
<span class="actions"> |
|
14 |
<a rel="popup" href="{% url 'lingo-manager-agenda-pricing-add' pk=agenda.pk %}">{% trans 'New pricing' %}</a> |
|
15 |
</span> |
|
16 |
{% endblock %} |
|
17 | ||
18 |
{% block content %} |
|
19 |
<div class="section"> |
|
20 |
<h3>{% trans "Pricing" context 'pricing' %}</h3> |
|
21 |
<div> |
|
22 |
{% if agenda_pricings %} |
|
23 |
<ul class="objects-list single-links"> |
|
24 |
{% for agenda_pricing in agenda_pricings %} |
|
25 |
<li><a href="{% url 'lingo-manager-agenda-pricing-detail' agenda.pk agenda_pricing.pk %}">{{ agenda_pricing.pricing }} ({{ agenda_pricing.date_start|date:'d/m/Y' }} - {{ agenda_pricing.date_end|date:'d/m/Y' }})</a></li> |
|
26 |
{% endfor %} |
|
27 |
</ul> |
|
28 |
{% else %} |
|
29 |
<div class="big-msg-info"> |
|
30 |
{% blocktrans %} |
|
31 |
This agenda doesn't have any pricing defined yet. Click on the "New pricing" button in |
|
32 |
the top right of the page to add a first one. |
|
33 |
{% endblocktrans %} |
|
34 |
</div> |
|
35 |
{% endif %} |
|
36 |
</div> |
|
37 |
</div> |
|
38 |
{% endblock %} |
lingo/pricing/templates/lingo/pricing/manager_agenda_list.html | ||
---|---|---|
1 |
{% extends "lingo/pricing/manager_pricing_list.html" %} |
|
2 |
{% load i18n %} |
|
3 | ||
4 |
{% block breadcrumb %} |
|
5 |
{{ block.super }} |
|
6 |
<a href="{% url 'lingo-manager-agenda-list' %}">{% trans 'Agendas' %}</a> |
|
7 |
{% endblock %} |
|
8 | ||
9 |
{% block appbar %} |
|
10 |
<h2>{% trans 'Agendas' %}</h2> |
|
11 |
<span class="actions"> |
|
12 |
<a href="{% url 'lingo-manager-agenda-sync' %}">{% trans 'Refresh agendas' %}</a> |
|
13 |
</span> |
|
14 |
{% endblock %} |
|
15 | ||
16 |
{% block content %} |
|
17 |
{% if object_list %} |
|
18 |
{% regroup object_list by category_label as agenda_groups %} |
|
19 |
{% for group in agenda_groups %} |
|
20 |
<div class="section"> |
|
21 |
{% if group.grouper %}<h3>{{ group.grouper }}</h3>{% elif not forloop.first %}<h3>{% trans "Misc" %}</h3>{% endif %} |
|
22 |
<ul class="objects-list single-links"> |
|
23 |
{% for object in group.list %} |
|
24 |
<li> |
|
25 |
<a href="{% url 'lingo-manager-agenda-detail' object.pk %}">{{ object.label }} <span class="identifier">[{% trans "identifier:" %} {{ object.slug }}]</a> |
|
26 |
</li> |
|
27 |
{% endfor %} |
|
28 |
</ul> |
|
29 |
</div> |
|
30 |
{% endfor %} |
|
31 |
{% else %} |
|
32 |
<div class="big-msg-info"> |
|
33 |
{% blocktrans %} |
|
34 |
This site doesn't have any agenda yet. Click on the "Refresh agendas" button in the top |
|
35 |
right of the page to synchronize them. |
|
36 |
{% endblocktrans %} |
|
37 |
</div> |
|
38 |
{% endif %} |
|
39 |
{% endblock %} |
lingo/pricing/templates/lingo/pricing/manager_agenda_pricing_detail.html | ||
---|---|---|
1 |
{% extends "lingo/manager_agenda_settings.html" %}
|
|
1 |
{% extends "lingo/pricing/manager_agenda_detail.html" %}
|
|
2 | 2 |
{% load i18n %} |
3 | 3 | |
4 | 4 |
{% block breadcrumb %} |
lingo/pricing/templates/lingo/pricing/manager_agenda_pricing_form.html | ||
---|---|---|
1 |
{% extends "lingo/manager_agenda_settings.html" %}
|
|
1 |
{% extends "lingo/pricing/manager_agenda_detail.html" %}
|
|
2 | 2 |
{% load i18n %} |
3 | 3 | |
4 | 4 |
{% block breadcrumb %} |
... | ... | |
28 | 28 |
{% if object.pk %} |
29 | 29 |
<a class="cancel" href="{% url 'lingo-manager-agenda-pricing-detail' agenda.pk object.pk %}">{% trans 'Cancel' %}</a> |
30 | 30 |
{% else %} |
31 |
<a class="cancel" href="{% url 'lingo-manager-agenda-settings' agenda.pk %}">{% trans 'Cancel' %}</a>
|
|
31 |
<a class="cancel" href="{% url 'lingo-manager-agenda-detail' agenda.pk %}">{% trans 'Cancel' %}</a>
|
|
32 | 32 |
{% endif %} |
33 | 33 |
</div> |
34 | 34 |
</form> |
lingo/pricing/templates/lingo/pricing/manager_pricing_list.html | ||
---|---|---|
14 | 14 |
<li><a rel="popup" href="{% url 'lingo-manager-config-import' %}">{% trans 'Import' %}</a></li> |
15 | 15 |
<li><a rel="popup" href="{% url 'lingo-manager-config-export' %}" data-autoclose-dialog="true">{% trans 'Export' %}</a></li> |
16 | 16 |
</ul> |
17 |
<a href="{% url 'lingo-manager-agenda-list' %}">{% trans 'Agendas' %}</a> |
|
17 | 18 |
<a href="{% url 'lingo-manager-pricing-criteria-list' %}">{% trans 'Criterias' %}</a> |
18 | 19 |
<a rel="popup" href="{% url 'lingo-manager-pricing-add' %}">{% trans 'New pricing model' %}</a> |
19 | 20 |
</span> |
lingo/pricing/urls.py | ||
---|---|---|
123 | 123 |
staff_member_required(views.criteria_delete), |
124 | 124 |
name='lingo-manager-pricing-criteria-delete', |
125 | 125 |
), |
126 |
url( |
|
127 |
r'^agendas/sync/$', |
|
128 |
staff_member_required(views.agenda_sync), |
|
129 |
name='lingo-manager-agenda-sync', |
|
130 |
), |
|
131 |
url( |
|
132 |
r'^agendas/$', |
|
133 |
staff_member_required(views.agenda_list), |
|
134 |
name='lingo-manager-agenda-list', |
|
135 |
), |
|
136 |
url( |
|
137 |
r'^agenda/(?P<pk>\d+)/$', |
|
138 |
staff_member_required(views.agenda_detail), |
|
139 |
name='lingo-manager-agenda-detail', |
|
140 |
), |
|
126 | 141 |
url( |
127 | 142 |
r'^agenda/(?P<pk>\d+)/pricing/add/$', |
128 | 143 |
staff_member_required(views.agenda_pricing_add), |
lingo/pricing/views.py | ||
---|---|---|
28 | 28 |
from django.utils.encoding import force_text |
29 | 29 |
from django.utils.translation import ugettext_lazy as _ |
30 | 30 |
from django.utils.translation import ungettext |
31 |
from django.views.generic import CreateView, DeleteView, DetailView, FormView, ListView, UpdateView |
|
31 |
from django.views.generic import ( |
|
32 |
CreateView, |
|
33 |
DeleteView, |
|
34 |
DetailView, |
|
35 |
FormView, |
|
36 |
ListView, |
|
37 |
RedirectView, |
|
38 |
UpdateView, |
|
39 |
) |
|
32 | 40 |
from django.views.generic.detail import SingleObjectMixin |
33 | 41 | |
42 |
from lingo.agendas.chrono import refresh_agendas |
|
34 | 43 |
from lingo.agendas.models import Agenda |
35 | 44 |
from lingo.agendas.views import AgendaMixin |
36 | 45 |
from lingo.pricing.forms import ( |
... | ... | |
569 | 578 |
criteria_delete = CriteriaDeleteView.as_view() |
570 | 579 | |
571 | 580 | |
581 |
class AgendaListView(ListView): |
|
582 |
template_name = 'lingo/pricing/manager_agenda_list.html' |
|
583 |
model = Agenda |
|
584 | ||
585 |
def get_queryset(self): |
|
586 |
queryset = super().get_queryset() |
|
587 |
return queryset.order_by('category_label', 'label') |
|
588 | ||
589 | ||
590 |
agenda_list = AgendaListView.as_view() |
|
591 | ||
592 | ||
593 |
class AgendaSyncView(RedirectView): |
|
594 |
def get(self, request, *args, **kwargs): |
|
595 |
refresh_agendas() |
|
596 |
messages.info(self.request, _('Agendas refreshed.')) |
|
597 |
return super().get(request, *args, **kwargs) |
|
598 | ||
599 |
def get_redirect_url(self, *args, **kwargs): |
|
600 |
return reverse('lingo-manager-agenda-list') |
|
601 | ||
602 | ||
603 |
agenda_sync = AgendaSyncView.as_view() |
|
604 | ||
605 | ||
606 |
class AgendaDetailView(AgendaMixin, DetailView): |
|
607 |
template_name = 'lingo/pricing/manager_agenda_detail.html' |
|
608 |
model = Agenda |
|
609 | ||
610 |
def get_context_data(self, **kwargs): |
|
611 |
kwargs['agenda_pricings'] = ( |
|
612 |
AgendaPricing.objects.filter(agenda=self.agenda) |
|
613 |
.select_related('pricing') |
|
614 |
.order_by('date_start', 'date_end') |
|
615 |
) |
|
616 |
return super().get_context_data(**kwargs) |
|
617 | ||
618 | ||
619 |
agenda_detail = AgendaDetailView.as_view() |
|
620 | ||
621 | ||
572 | 622 |
class AgendaPricingAddView(AgendaMixin, CreateView): |
573 | 623 |
template_name = 'lingo/pricing/manager_agenda_pricing_form.html' |
574 | 624 |
model = AgendaPricing |
lingo/settings.py | ||
---|---|---|
168 | 168 | |
169 | 169 |
MELLON_IDENTITY_PROVIDERS = [] |
170 | 170 | |
171 |
# proxies argument passed to all python-request methods |
|
172 |
# (see http://docs.python-requests.org/en/master/user/advanced/#proxies) |
|
173 |
REQUESTS_PROXIES = None |
|
174 | ||
175 |
# timeout used in python-requests call, in seconds |
|
176 |
# we use 28s by default: timeout just before web server, which is usually 30s |
|
177 |
REQUESTS_TIMEOUT = 28 |
|
171 | 178 | |
172 | 179 |
# default site |
173 | 180 |
SITE_BASE_URL = 'http://localhost' |
lingo/utils/__init__.py | ||
---|---|---|
1 |
# lingo - payment and billing system |
|
2 |
# Copyright (C) 2022 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 | ||
18 |
from .requests_wrapper import requests |
lingo/utils/misc.py | ||
---|---|---|
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 | 17 |
import copy |
18 |
import urllib.parse |
|
18 | 19 | |
20 |
from django.conf import settings |
|
19 | 21 |
from django.core.exceptions import FieldDoesNotExist, ValidationError |
20 | 22 |
from django.utils.translation import ugettext_lazy as _ |
21 | 23 | |
... | ... | |
69 | 71 |
except ValidationError: |
70 | 72 |
raise AgendaImportError(_('Bad slug format "%s"') % value) |
71 | 73 |
return cleaned_data |
74 | ||
75 | ||
76 |
def get_known_service_for_url(url): |
|
77 |
netloc = urllib.parse.urlparse(url).netloc |
|
78 |
for services in settings.KNOWN_SERVICES.values(): |
|
79 |
for service in services.values(): |
|
80 |
remote_url = service.get('url') |
|
81 |
if urllib.parse.urlparse(remote_url).netloc == netloc: |
|
82 |
return service |
|
83 |
return None |
lingo/utils/requests_wrapper.py | ||
---|---|---|
1 |
# lingo - payment and billing system |
|
2 |
# Copyright (C) 2022 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
import hashlib |
|
18 |
import logging |
|
19 |
import urllib.parse |
|
20 |
from io import BytesIO |
|
21 | ||
22 |
from django.conf import settings |
|
23 |
from django.core.cache import cache |
|
24 |
from django.utils.encoding import smart_bytes |
|
25 |
from django.utils.http import urlencode |
|
26 |
from requests import Response |
|
27 |
from requests import Session as RequestsSession |
|
28 |
from requests.auth import AuthBase |
|
29 | ||
30 |
from .misc import get_known_service_for_url |
|
31 |
from .signature import sign_url |
|
32 | ||
33 | ||
34 |
class NothingInCacheException(Exception): |
|
35 |
pass |
|
36 | ||
37 | ||
38 |
class PublikSignature(AuthBase): |
|
39 |
def __init__(self, secret): |
|
40 |
self.secret = secret |
|
41 | ||
42 |
def __call__(self, request): |
|
43 |
request.url = sign_url(request.url, self.secret) |
|
44 |
return request |
|
45 | ||
46 | ||
47 |
class Requests(RequestsSession): |
|
48 |
def request(self, method, url, **kwargs): |
|
49 |
remote_service = kwargs.pop('remote_service', None) |
|
50 |
cache_duration = kwargs.pop('cache_duration', 15) |
|
51 |
invalidate_cache = kwargs.pop('invalidate_cache', False) |
|
52 |
user = kwargs.pop('user', None) |
|
53 |
django_request = kwargs.pop('django_request', None) |
|
54 |
without_user = kwargs.pop('without_user', False) |
|
55 |
federation_key = kwargs.pop('federation_key', 'auto') # 'auto', 'email', 'nameid' |
|
56 |
raise_if_not_cached = kwargs.pop('raise_if_not_cached', False) |
|
57 |
log_errors = kwargs.pop('log_errors', True) |
|
58 | ||
59 |
# don't use persistent cookies |
|
60 |
self.cookies.clear() |
|
61 | ||
62 |
# search in legacy urls |
|
63 |
legacy_urls_mapping = getattr(settings, 'LEGACY_URLS_MAPPING', None) |
|
64 |
if legacy_urls_mapping: |
|
65 |
splitted_url = urllib.parse.urlparse(url) |
|
66 |
hostname = splitted_url.netloc |
|
67 |
if hostname in legacy_urls_mapping: |
|
68 |
url = splitted_url._replace(netloc=legacy_urls_mapping[hostname]).geturl() |
|
69 | ||
70 |
if remote_service == 'auto': |
|
71 |
remote_service = get_known_service_for_url(url) |
|
72 |
if remote_service: |
|
73 |
# only keeps the path (URI) in url parameter, scheme and netloc are |
|
74 |
# in remote_service |
|
75 |
scheme, netloc, path, params, query, fragment = urllib.parse.urlparse(url) |
|
76 |
url = urllib.parse.urlunparse(('', '', path, params, query, fragment)) |
|
77 |
else: |
|
78 |
logging.warning('service not found in settings.KNOWN_SERVICES for %s', url) |
|
79 | ||
80 |
if remote_service: |
|
81 |
if isinstance(user, dict): |
|
82 |
query_params = user.copy() |
|
83 |
elif not user or not user.is_authenticated: |
|
84 |
if without_user: |
|
85 |
query_params = {} |
|
86 |
else: |
|
87 |
query_params = {'NameID': '', 'email': ''} |
|
88 |
else: |
|
89 |
query_params = {} |
|
90 |
if federation_key == 'nameid': |
|
91 |
query_params['NameID'] = user.get_name_id() |
|
92 |
elif federation_key == 'email': |
|
93 |
query_params['email'] = user.email |
|
94 |
else: # 'auto' |
|
95 |
user_name_id = user.get_name_id() |
|
96 |
if user_name_id: |
|
97 |
query_params['NameID'] = user_name_id |
|
98 |
else: |
|
99 |
query_params['email'] = user.email |
|
100 | ||
101 |
if remote_service.get('orig'): |
|
102 |
query_params['orig'] = remote_service.get('orig') |
|
103 | ||
104 |
remote_service_base_url = remote_service.get('url') |
|
105 |
scheme, netloc, dummy, params, old_query, fragment = urllib.parse.urlparse( |
|
106 |
remote_service_base_url |
|
107 |
) |
|
108 | ||
109 |
query = urlencode(query_params) |
|
110 |
if '?' in url: |
|
111 |
path, old_query = url.split('?', 1) |
|
112 |
query += '&' + old_query |
|
113 |
else: |
|
114 |
path = url |
|
115 | ||
116 |
url = urllib.parse.urlunparse((scheme, netloc, path, params, query, fragment)) |
|
117 | ||
118 |
if method == 'GET' and cache_duration: |
|
119 |
# handle cache |
|
120 |
params = urlencode(kwargs.get('params', {})) |
|
121 |
cache_key = hashlib.md5(smart_bytes(url + params)).hexdigest() |
|
122 |
cache_content = cache.get(cache_key) |
|
123 |
if cache_content and not invalidate_cache: |
|
124 |
response = Response() |
|
125 |
response.status_code = 200 |
|
126 |
response.raw = BytesIO(smart_bytes(cache_content)) |
|
127 |
return response |
|
128 |
elif raise_if_not_cached: |
|
129 |
raise NothingInCacheException() |
|
130 | ||
131 |
if remote_service: # sign |
|
132 |
kwargs['auth'] = PublikSignature(remote_service.get('secret')) |
|
133 | ||
134 |
kwargs['timeout'] = kwargs.get('timeout') or settings.REQUESTS_TIMEOUT |
|
135 | ||
136 |
response = super().request(method, url, **kwargs) |
|
137 |
if log_errors and (response.status_code // 100 != 2): |
|
138 |
extra = {} |
|
139 |
if django_request: |
|
140 |
extra['request'] = django_request |
|
141 |
if log_errors == 'warn': |
|
142 |
logging.warning( |
|
143 |
'failed to %s %s (%s)', method, response.request.url, response.status_code, extra=extra |
|
144 |
) |
|
145 |
else: |
|
146 |
logging.error( |
|
147 |
'failed to %s %s (%s)', method, response.request.url, response.status_code, extra=extra |
|
148 |
) |
|
149 |
if method == 'GET' and cache_duration and (response.status_code // 100 == 2): |
|
150 |
cache.set(cache_key, response.content, cache_duration) |
|
151 | ||
152 |
return response |
|
153 | ||
154 | ||
155 |
requests = Requests() |
lingo/utils/signature.py | ||
---|---|---|
1 |
# lingo - payment and billing system |
|
2 |
# Copyright (C) 2022 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
import base64 |
|
18 |
import datetime |
|
19 |
import hashlib |
|
20 |
import hmac |
|
21 |
import random |
|
22 |
import urllib.parse |
|
23 | ||
24 |
from django.utils.encoding import smart_bytes |
|
25 |
from django.utils.http import quote, urlencode |
|
26 | ||
27 | ||
28 |
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None): |
|
29 |
parsed = urllib.parse.urlparse(url) |
|
30 |
new_query = sign_query(parsed.query, key, algo, timestamp, nonce) |
|
31 |
return urllib.parse.urlunparse(parsed[:4] + (new_query,) + parsed[5:]) |
|
32 | ||
33 | ||
34 |
def sign_query(query, key, algo='sha256', timestamp=None, nonce=None): |
|
35 |
if timestamp is None: |
|
36 |
timestamp = datetime.datetime.utcnow() |
|
37 |
timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ') |
|
38 |
if nonce is None: |
|
39 |
nonce = hex(random.getrandbits(128))[2:] |
|
40 |
new_query = query |
|
41 |
if new_query: |
|
42 |
new_query += '&' |
|
43 |
new_query += urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce))) |
|
44 |
signature = base64.b64encode(sign_string(new_query, key, algo=algo)) |
|
45 |
new_query += '&signature=' + quote(signature) |
|
46 |
return new_query |
|
47 | ||
48 | ||
49 |
def sign_string(s, key, algo='sha256', timedelta=30): |
|
50 |
digestmod = getattr(hashlib, algo) |
|
51 |
hash = hmac.HMAC(smart_bytes(key), digestmod=digestmod, msg=smart_bytes(s)) |
|
52 |
return hash.digest() |
tests/agendas/test_chrono.py | ||
---|---|---|
1 |
import json |
|
2 |
from unittest import mock |
|
3 | ||
4 |
import pytest |
|
5 |
from requests.exceptions import ConnectionError |
|
6 |
from requests.models import Response |
|
7 | ||
8 |
from lingo.agendas.chrono import collect_agenda_data, refresh_agendas |
|
9 |
from lingo.agendas.models import Agenda |
|
10 | ||
11 |
pytestmark = pytest.mark.django_db |
|
12 | ||
13 |
AGENDA_DATA = [ |
|
14 |
{ |
|
15 |
"slug": "events-a", |
|
16 |
"kind": "events", |
|
17 |
"text": "Events A", |
|
18 |
"category": None, |
|
19 |
"category_label": None, |
|
20 |
}, |
|
21 |
{ |
|
22 |
"slug": "events-b", |
|
23 |
"kind": "events", |
|
24 |
"text": "Events B", |
|
25 |
"category": "foo", |
|
26 |
"category_label": "Foo", |
|
27 |
}, |
|
28 |
{ |
|
29 |
"slug": "meetings-a", |
|
30 |
"kind": "meetings", |
|
31 |
"text": "Meetings A", |
|
32 |
"category": None, |
|
33 |
"category_label": None, |
|
34 |
}, |
|
35 |
{ |
|
36 |
"slug": "virtual-b", |
|
37 |
"kind": "virtual", |
|
38 |
"text": "Virtual B", |
|
39 |
"category": "foo", |
|
40 |
"category_label": "Foo", |
|
41 |
}, |
|
42 |
] |
|
43 | ||
44 | ||
45 |
class MockedRequestResponse(mock.Mock): |
|
46 |
status_code = 200 |
|
47 | ||
48 |
def json(self): |
|
49 |
return json.loads(self.content) |
|
50 | ||
51 | ||
52 |
def test_collect_agenda_data_no_service(settings): |
|
53 |
settings.KNOWN_SERVICES = {} |
|
54 |
assert collect_agenda_data() is None |
|
55 | ||
56 |
settings.KNOWN_SERVICES = {'other': []} |
|
57 |
assert collect_agenda_data() is None |
|
58 | ||
59 | ||
60 |
def test_collect_agenda_data(): |
|
61 |
with mock.patch('requests.Session.get') as requests_get: |
|
62 |
requests_get.side_effect = ConnectionError() |
|
63 |
assert collect_agenda_data() is None |
|
64 | ||
65 |
with mock.patch('requests.Session.get') as requests_get: |
|
66 |
mock_resp = Response() |
|
67 |
mock_resp.status_code = 500 |
|
68 |
requests_get.return_value = mock_resp |
|
69 |
assert collect_agenda_data() is None |
|
70 | ||
71 |
with mock.patch('requests.Session.get') as requests_get: |
|
72 |
mock_resp = Response() |
|
73 |
mock_resp.status_code = 404 |
|
74 |
requests_get.return_value = mock_resp |
|
75 |
assert collect_agenda_data() is None |
|
76 | ||
77 |
with mock.patch('requests.Session.get') as requests_get: |
|
78 |
requests_get.return_value = MockedRequestResponse(content=json.dumps({'foo': 'bar'})) |
|
79 |
assert collect_agenda_data() is None |
|
80 | ||
81 |
data = {'data': []} |
|
82 |
with mock.patch('requests.Session.get') as requests_get: |
|
83 |
requests_get.return_value = MockedRequestResponse(content=json.dumps(data)) |
|
84 |
assert collect_agenda_data() == [] |
|
85 |
assert requests_get.call_args_list[0][0] == ('api/agenda/',) |
|
86 |
assert requests_get.call_args_list[0][1]['remote_service']['url'] == 'http://chrono.example.org' |
|
87 | ||
88 |
data = {'data': AGENDA_DATA} |
|
89 |
with mock.patch('requests.Session.get') as requests_get: |
|
90 |
requests_get.return_value = MockedRequestResponse(content=json.dumps(data)) |
|
91 |
assert collect_agenda_data() == [ |
|
92 |
{'category_label': None, 'category_slug': None, 'label': 'Events A', 'slug': 'events-a'}, |
|
93 |
{'category_label': 'Foo', 'category_slug': 'foo', 'label': 'Events B', 'slug': 'events-b'}, |
|
94 |
] |
|
95 | ||
96 | ||
97 |
@mock.patch('lingo.agendas.chrono.collect_agenda_data') |
|
98 |
def test_refresh_agendas(mock_collect): |
|
99 |
Agenda.objects.create(label='foo') |
|
100 | ||
101 |
# error during collect |
|
102 |
mock_collect.return_value = None |
|
103 |
refresh_agendas() |
|
104 |
assert Agenda.objects.count() == 1 # no changes |
|
105 | ||
106 |
# 2 agendas found |
|
107 |
mock_collect.return_value = [ |
|
108 |
{'category_label': None, 'category_slug': None, 'label': 'Events A', 'slug': 'events-a'}, |
|
109 |
{'category_label': 'Foo', 'category_slug': 'foo', 'label': 'Events B', 'slug': 'events-b'}, |
|
110 |
] |
|
111 | ||
112 |
# agendas don't exist, create them |
|
113 |
refresh_agendas() |
|
114 |
assert Agenda.objects.count() == 2 |
|
115 |
agenda1 = Agenda.objects.all().order_by('pk')[0] |
|
116 |
agenda2 = Agenda.objects.all().order_by('pk')[1] |
|
117 |
assert agenda1.label == "Events A" |
|
118 |
assert agenda1.slug == "events-a" |
|
119 |
assert agenda1.category_label is None |
|
120 |
assert agenda1.category_slug is None |
|
121 |
assert agenda2.label == "Events B" |
|
122 |
assert agenda2.slug == "events-b" |
|
123 |
assert agenda2.category_label == 'Foo' |
|
124 |
assert agenda2.category_slug == 'foo' |
|
125 | ||
126 |
# again, but some attributes are wrong |
|
127 |
agenda1.label = "Wrong" |
|
128 |
agenda1.category_label = 'Foo' |
|
129 |
agenda1.category_slug = 'foo' |
|
130 |
agenda1.save() |
|
131 |
agenda2.label = "Wrong" |
|
132 |
agenda2.category_label is None |
|
133 |
agenda2.category_slug is None |
|
134 |
agenda2.save() |
|
135 |
refresh_agendas() |
|
136 |
assert Agenda.objects.count() == 2 |
|
137 |
new_agenda1 = Agenda.objects.all().order_by('pk')[0] |
|
138 |
new_agenda2 = Agenda.objects.all().order_by('pk')[1] |
|
139 |
assert new_agenda1.pk == agenda1.pk |
|
140 |
assert new_agenda1.label == "Events A" |
|
141 |
assert new_agenda1.slug == "events-a" |
|
142 |
assert new_agenda1.category_label is None |
|
143 |
assert new_agenda1.category_slug is None |
|
144 |
assert new_agenda2.pk == agenda2.pk |
|
145 |
assert new_agenda2.label == "Events B" |
|
146 |
assert new_agenda2.slug == "events-b" |
|
147 |
assert new_agenda2.category_label == 'Foo' |
|
148 |
assert new_agenda2.category_slug == 'foo' |
|
149 | ||
150 |
# no agenda in chrono |
|
151 |
mock_collect.return_value = [] |
|
152 |
refresh_agendas() |
|
153 |
assert Agenda.objects.count() == 0 |
tests/pricing/test_import_export.py | ||
---|---|---|
51 | 51 | |
52 | 52 |
old_stdin = sys.stdin |
53 | 53 |
sys.stdin = StringIO(json.dumps({})) |
54 |
agenda = Agenda.objects.create(label='Foo Bar') |
|
55 | 54 |
pricing = Pricing.objects.create(label='Foo') |
56 | 55 |
AgendaPricing.objects.create( |
57 | 56 |
agenda=agenda, |
tests/pricing/test_manager.py | ||
---|---|---|
1 | 1 |
import copy |
2 | 2 |
import datetime |
3 | 3 |
import json |
4 |
from unittest import mock |
|
4 | 5 | |
5 | 6 |
import pytest |
6 | 7 |
from webtest import Upload |
... | ... | |
12 | 13 |
pytestmark = pytest.mark.django_db |
13 | 14 | |
14 | 15 | |
15 |
def test_export_site(settings, freezer, app, admin_user):
|
|
16 |
def test_export_site(freezer, app, admin_user): |
|
16 | 17 |
freezer.move_to('2020-06-15') |
17 | 18 |
login(app) |
18 | 19 |
resp = app.get('/manage/pricing/') |
... | ... | |
48 | 49 |
assert 'pricing_models' not in site_json |
49 | 50 | |
50 | 51 | |
51 |
def test_add_pricing(settings, app, admin_user):
|
|
52 |
def test_add_pricing(app, admin_user): |
|
52 | 53 |
app = login(app) |
53 | 54 |
resp = app.get('/manage/') |
54 | 55 |
resp = resp.click('Pricing') |
... | ... | |
425 | 426 |
] |
426 | 427 | |
427 | 428 | |
428 |
def test_add_category(settings, app, admin_user):
|
|
429 |
def test_add_category(app, admin_user): |
|
429 | 430 |
app = login(app) |
430 | 431 |
resp = app.get('/manage/') |
431 | 432 |
resp = resp.click('Pricing') |
... | ... | |
685 | 686 |
assert Criteria.objects.count() == 6 |
686 | 687 | |
687 | 688 | |
688 |
@pytest.mark.xfail(reason='agenda views not yet implemented') |
|
689 |
def test_add_agenda_pricing(settings, app, admin_user): |
|
689 |
@mock.patch('lingo.pricing.views.refresh_agendas') |
|
690 |
def test_refresh_agendas(mock_refresh, app, admin_user): |
|
691 |
app = login(app) |
|
692 |
resp = app.get('/manage/pricing/agendas/') |
|
693 |
resp = resp.click('Refresh agendas') |
|
694 |
assert resp.location.endswith('/manage/pricing/agendas/') |
|
695 |
resp = resp.follow() |
|
696 |
assert "Agendas refreshed." in resp |
|
697 |
assert mock_refresh.call_args_list == [mock.call()] |
|
698 | ||
699 | ||
700 |
def test_add_agenda_pricing(app, admin_user): |
|
690 | 701 |
agenda = Agenda.objects.create(label='Foo Bar') |
691 | 702 |
pricing = Pricing.objects.create(label='Model') |
692 | 703 | |
693 | 704 |
app = login(app) |
694 |
resp = app.get('/manage/agendas/%s/settings' % agenda.pk) |
|
705 |
resp = app.get('/manage/pricing/agendas/') |
|
706 |
resp = resp.click(href='/manage/pricing/agenda/%s/' % agenda.pk) |
|
695 | 707 |
resp = resp.click('New pricing') |
696 | 708 |
resp.form['pricing'] = pricing.pk |
697 | 709 |
resp.form['date_start'] = '2021-09-01' |
... | ... | |
799 | 811 |
resp = app.get('/manage/pricing/agenda/%s/pricing/%s/' % (agenda.pk, agenda_pricing.pk)) |
800 | 812 |
resp = resp.click(href='/manage/pricing/agenda/%s/pricing/%s/delete/' % (agenda.pk, agenda_pricing.pk)) |
801 | 813 |
resp = resp.form.submit() |
802 |
# XXX: assert resp.location.endswith('/manage/agendas/%s/settings' % agenda.pk)
|
|
814 |
assert resp.location.endswith('/manage/pricing/agenda/%s/' % agenda.pk)
|
|
803 | 815 |
assert AgendaPricing.objects.filter(pk=agenda_pricing.pk).exists() is False |
804 | 816 | |
805 | 817 | |
806 |
@pytest.mark.xfail(reason='agenda settings view not yet implemented') |
|
807 | 818 |
def test_detail_agenda_pricing(app, admin_user): |
808 | 819 |
agenda = Agenda.objects.create(label='Foo Bar') |
809 | 820 |
pricing = Pricing.objects.create(label='Model') |
... | ... | |
822 | 833 |
) |
823 | 834 | |
824 | 835 |
app = login(app) |
825 |
resp = app.get('/manage/agendas/%s/settings' % agenda.pk)
|
|
836 |
resp = app.get('/manage/pricing/agenda/%s/' % agenda.pk)
|
|
826 | 837 |
resp = resp.click(href='/manage/pricing/agenda/%s/pricing/%s/' % (agenda.pk, agenda_pricing.pk)) |
827 | 838 | |
828 | 839 |
app.get('/manage/pricing/agenda/%s/pricing/%s/' % (agenda.pk, agenda_pricing2.pk), status=404) |
... | ... | |
1377 | 1388 |
) |
1378 | 1389 | |
1379 | 1390 | |
1380 |
@pytest.mark.xfail(reason='agenda views not yet implemented') |
|
1381 | 1391 |
def test_edit_agenda_pricing_matrix_empty(app, admin_user): |
1382 | 1392 |
agenda = Agenda.objects.create(label='Foo bar') |
1383 | 1393 |
pricing = Pricing.objects.create(label='Foo bar') |
tests/settings.py | ||
---|---|---|
18 | 18 |
}, |
19 | 19 |
} |
20 | 20 |
} |
21 | ||
22 |
KNOWN_SERVICES = { |
|
23 |
'chrono': { |
|
24 |
'default': { |
|
25 |
'title': 'test', |
|
26 |
'url': 'http://chrono.example.org', |
|
27 |
'secret': 'lingo', |
|
28 |
'orig': 'lingo', |
|
29 |
'backoffice-menu-url': 'http://chrono.example.org/manage/', |
|
30 |
'secondary': False, |
|
31 |
}, |
|
32 |
'other': { |
|
33 |
'title': 'other', |
|
34 |
'url': 'http://other.chrono.example.org', |
|
35 |
'secret': 'lingo', |
|
36 |
'orig': 'lingo', |
|
37 |
'backoffice-menu-url': 'http://other.chrono.example.org/manage/', |
|
38 |
'secondary': True, |
|
39 |
}, |
|
40 |
}, |
|
41 |
} |
|
21 |
- |