Projet

Général

Profil

0001-pricing-synchronise-agendas-list-detail-views-65361.patch

Lauréline Guérin, 23 mai 2022 17:00

Télécharger (38,5 ko)

Voir les différences:

Subject: [PATCH 1/3] pricing: synchronise agendas, list & detail views
 (#65361)

 lingo/agendas/chrono.py                       | 105 ++++++++++++
 lingo/agendas/migrations/0002_agenda.py       |  21 +++
 lingo/agendas/models.py                       |   2 +
 lingo/agendas/views.py                        |  11 +-
 lingo/manager/static/css/style.scss           |  10 ++
 .../lingo/manager_agenda_settings.html        |   1 -
 .../templates/lingo/manager_agenda_view.html  |   1 -
 .../lingo/pricing/manager_agenda_detail.html  |  38 +++++
 .../lingo/pricing/manager_agenda_list.html    |  39 +++++
 .../manager_agenda_pricing_detail.html        |   2 +-
 .../pricing/manager_agenda_pricing_form.html  |   4 +-
 .../lingo/pricing/manager_pricing_list.html   |   1 +
 lingo/pricing/urls.py                         |  15 ++
 lingo/pricing/views.py                        |  52 +++++-
 lingo/settings.py                             |   7 +
 lingo/utils/__init__.py                       |  18 ++
 lingo/utils/misc.py                           |  12 ++
 lingo/utils/requests_wrapper.py               | 155 ++++++++++++++++++
 lingo/utils/signature.py                      |  52 ++++++
 tests/agendas/__init__.py                     |   0
 tests/agendas/test_chrono.py                  | 153 +++++++++++++++++
 tests/pricing/test_import_export.py           |   1 -
 tests/pricing/test_manager.py                 |  30 ++--
 tests/settings.py                             |  21 +++
 24 files changed, 732 insertions(+), 19 deletions(-)
 create mode 100644 lingo/agendas/chrono.py
 create mode 100644 lingo/agendas/migrations/0002_agenda.py
 delete mode 100644 lingo/manager/templates/lingo/manager_agenda_settings.html
 delete mode 100644 lingo/manager/templates/lingo/manager_agenda_view.html
 create mode 100644 lingo/pricing/templates/lingo/pricing/manager_agenda_detail.html
 create mode 100644 lingo/pricing/templates/lingo/pricing/manager_agenda_list.html
 create mode 100644 lingo/utils/requests_wrapper.py
 create mode 100644 lingo/utils/signature.py
 create mode 100644 tests/agendas/__init__.py
 create mode 100644 tests/agendas/test_chrono.py
lingo/agendas/chrono.py
1
# lingo - payment and billing system
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import json
18

  
19
from django.conf import settings
20
from requests.exceptions import RequestException
21

  
22
from lingo.agendas.models import Agenda
23
from lingo.utils import requests
24

  
25

  
26
def is_chrono_enabled():
27
    return hasattr(settings, 'KNOWN_SERVICES') and settings.KNOWN_SERVICES.get('chrono')
28

  
29

  
30
def get_chrono_service():
31
    if not is_chrono_enabled():
32
        return {}
33
    return list(settings.KNOWN_SERVICES.get('chrono').values())[0]
34

  
35

  
36
def get_chrono_json(path, log_errors=True):
37
    chrono_site = get_chrono_service()
38
    if chrono_site is None:
39
        return
40
    try:
41
        response = requests.get(
42
            path,
43
            remote_service=chrono_site,
44
            without_user=True,
45
            headers={'accept': 'application/json'},
46
            log_errors=log_errors,
47
        )
48
        response.raise_for_status()
49
    except RequestException as e:
50
        if e.response is not None:
51
            try:
52
                # return json if available (on 404 responses by example)
53
                return e.response.json()
54
            except json.JSONDecodeError:
55
                pass
56
        return
57
    return response.json()
58

  
59

  
60
def collect_agenda_data():
61
    result = get_chrono_json('api/agenda/')
62
    if result is None:
63
        return
64
    if result.get('data') is None:
65
        return
66

  
67
    agenda_data = []
68
    for agenda in result['data']:
69
        if agenda['kind'] != 'events':
70
            continue
71
        agenda_data.append(
72
            {
73
                'slug': agenda['slug'],
74
                'label': agenda['text'],
75
                'category_slug': agenda['category'],
76
                'category_label': agenda['category_label'],
77
            }
78
        )
79
    return agenda_data
80

  
81

  
82
def refresh_agendas():
83
    result = collect_agenda_data()
84
    if result is None:
85
        return
86

  
87
    # fetch existing agendas
88
    existing_agendas = {a.slug: a for a in Agenda.objects.all()}
89
    seen_agendas = []
90

  
91
    # build agendas from chrono
92
    for agenda_data in result:
93
        slug = agenda_data['slug']
94
        agenda = existing_agendas.get(slug) or Agenda(slug=slug)
95
        for key, value in agenda_data.items():
96
            if key == 'slug':
97
                continue
98
            setattr(agenda, key, value)
99
        agenda.save()
100
        seen_agendas.append(agenda.slug)
101

  
102
    # now check outdated agendas
103
    for slug, agenda in existing_agendas.items():
104
        if slug not in seen_agendas:
105
            agenda.delete()
lingo/agendas/migrations/0002_agenda.py
1
from django.db import migrations, models
2

  
3

  
4
class Migration(migrations.Migration):
5

  
6
    dependencies = [
7
        ('agendas', '0001_initial'),
8
    ]
9

  
10
    operations = [
11
        migrations.AddField(
12
            model_name='agenda',
13
            name='category_label',
14
            field=models.CharField(null=True, max_length=150, verbose_name='Category label'),
15
        ),
16
        migrations.AddField(
17
            model_name='agenda',
18
            name='category_slug',
19
            field=models.SlugField(null=True, max_length=160, verbose_name='Category identifier'),
20
        ),
21
    ]
lingo/agendas/models.py
26 26
class Agenda(models.Model):
27 27
    label = models.CharField(_('Label'), max_length=150)
28 28
    slug = models.SlugField(_('Identifier'), max_length=160, unique=True)
29
    category_label = models.CharField(_('Category label'), max_length=150, null=True)
30
    category_slug = models.SlugField(_('Category identifier'), max_length=160, null=True)
29 31

  
30 32
    def __str__(self):
31 33
        return self.label
lingo/agendas/views.py
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16

  
17 17
from django.shortcuts import get_object_or_404
18
from django.urls import reverse
18 19

  
19 20
from .models import Agenda
20 21

  
......
34 35
        context['agenda'] = self.agenda
35 36
        return context
36 37

  
38
    def get_form_kwargs(self):
39
        kwargs = super().get_form_kwargs()
40
        if not kwargs.get('instance'):
41
            kwargs['instance'] = self.model()
42
        kwargs['instance'].agenda = self.agenda
43
        return kwargs
44

  
37 45
    def get_success_url(self):
38
        # XXX:  return reverse('lingo-manager-agenda-settings', kwargs={'pk': self.agenda.id})
39
        return '/'
46
        return reverse('lingo-manager-agenda-detail', kwargs={'pk': self.agenda.pk})
lingo/manager/static/css/style.scss
1
li span.identifier {
2
	font-size: 80%;
3
	opacity: 0.6;
4
}
5

  
6
h2 span.identifier {
7
	font-size: 1rem;
8
	opacity: 0.6;
9
}
10

  
1 11
div.paragraph {
2 12
	background: white;
3 13
	box-sizing: border-box;
lingo/manager/templates/lingo/manager_agenda_settings.html
1
{% extends "lingo/manager_agenda_view.html" %}
lingo/manager/templates/lingo/manager_agenda_view.html
1
{% extends "lingo/manager_base.html" %}
lingo/pricing/templates/lingo/pricing/manager_agenda_detail.html
1
{% extends "lingo/pricing/manager_agenda_list.html" %}
2
{% load i18n %}
3

  
4
{% block breadcrumb %}
5
{{ block.super }}
6
<a href="{% url 'lingo-manager-agenda-detail' agenda.pk %}">{{ agenda }}</a>
7
{% endblock %}
8

  
9
{% block appbar %}
10
<h2>{{ agenda }}
11
    <span class="identifier">[{% trans "identifier:" %} {{ agenda.slug }}]</span>
12
</h2>
13
<span class="actions">
14
  <a rel="popup" href="{% url 'lingo-manager-agenda-pricing-add' pk=agenda.pk %}">{% trans 'New pricing' %}</a>
15
</span>
16
{% endblock %}
17

  
18
{% block content %}
19
<div class="section">
20
<h3>{% trans "Pricing" context 'pricing' %}</h3>
21
<div>
22
  {% if agenda_pricings %}
23
  <ul class="objects-list single-links">
24
  {% for agenda_pricing in agenda_pricings %}
25
    <li><a href="{% url 'lingo-manager-agenda-pricing-detail' agenda.pk agenda_pricing.pk %}">{{ agenda_pricing.pricing }} ({{ agenda_pricing.date_start|date:'d/m/Y' }} - {{ agenda_pricing.date_end|date:'d/m/Y' }})</a></li>
26
  {% endfor %}
27
  </ul>
28
  {% else %}
29
  <div class="big-msg-info">
30
    {% blocktrans %}
31
    This agenda doesn't have any pricing defined yet. Click on the "New pricing" button in
32
    the top right of the page to add a first one.
33
    {% endblocktrans %}
34
  </div>
35
  {% endif %}
36
</div>
37
</div>
38
{% endblock %}
lingo/pricing/templates/lingo/pricing/manager_agenda_list.html
1
{% extends "lingo/pricing/manager_pricing_list.html" %}
2
{% load i18n %}
3

  
4
{% block breadcrumb %}
5
{{ block.super }}
6
<a href="{% url 'lingo-manager-agenda-list' %}">{% trans 'Agendas' %}</a>
7
{% endblock %}
8

  
9
{% block appbar %}
10
<h2>{% trans 'Agendas' %}</h2>
11
<span class="actions">
12
  <a href="{% url 'lingo-manager-agenda-sync' %}">{% trans 'Refresh agendas' %}</a>
13
</span>
14
{% endblock %}
15

  
16
{% block content %}
17
{% if object_list %}
18
{% regroup object_list by category_label as agenda_groups %}
19
{% for group in agenda_groups %}
20
<div class="section">
21
  {% if group.grouper %}<h3>{{ group.grouper }}</h3>{% elif not forloop.first %}<h3>{% trans "Misc" %}</h3>{% endif %}
22
  <ul class="objects-list single-links">
23
    {% for object in group.list %}
24
    <li>
25
      <a href="{% url 'lingo-manager-agenda-detail' object.pk %}">{{ object.label }} <span class="identifier">[{% trans "identifier:" %} {{ object.slug }}]</a>
26
    </li>
27
    {% endfor %}
28
  </ul>
29
</div>
30
{% endfor %}
31
{% else %}
32
<div class="big-msg-info">
33
  {% blocktrans %}
34
  This site doesn't have any agenda yet. Click on the "Refresh agendas" button in the top
35
  right of the page to synchronize them.
36
  {% endblocktrans %}
37
</div>
38
{% endif %}
39
{% endblock %}
lingo/pricing/templates/lingo/pricing/manager_agenda_pricing_detail.html
1
{% extends "lingo/manager_agenda_settings.html" %}
1
{% extends "lingo/pricing/manager_agenda_detail.html" %}
2 2
{% load i18n %}
3 3

  
4 4
{% block breadcrumb %}
lingo/pricing/templates/lingo/pricing/manager_agenda_pricing_form.html
1
{% extends "lingo/manager_agenda_settings.html" %}
1
{% extends "lingo/pricing/manager_agenda_detail.html" %}
2 2
{% load i18n %}
3 3

  
4 4
{% block breadcrumb %}
......
28 28
    {% if object.pk %}
29 29
    <a class="cancel" href="{% url 'lingo-manager-agenda-pricing-detail' agenda.pk object.pk %}">{% trans 'Cancel' %}</a>
30 30
    {% else %}
31
    <a class="cancel" href="{% url 'lingo-manager-agenda-settings' agenda.pk %}">{% trans 'Cancel' %}</a>
31
    <a class="cancel" href="{% url 'lingo-manager-agenda-detail' agenda.pk %}">{% trans 'Cancel' %}</a>
32 32
    {% endif %}
33 33
  </div>
34 34
</form>
lingo/pricing/templates/lingo/pricing/manager_pricing_list.html
14 14
    <li><a rel="popup" href="{% url 'lingo-manager-config-import' %}">{% trans 'Import' %}</a></li>
15 15
    <li><a rel="popup" href="{% url 'lingo-manager-config-export' %}" data-autoclose-dialog="true">{% trans 'Export' %}</a></li>
16 16
  </ul>
17
  <a href="{% url 'lingo-manager-agenda-list' %}">{% trans 'Agendas' %}</a>
17 18
  <a href="{% url 'lingo-manager-pricing-criteria-list' %}">{% trans 'Criterias' %}</a>
18 19
  <a rel="popup" href="{% url 'lingo-manager-pricing-add' %}">{% trans 'New pricing model' %}</a>
19 20
</span>
lingo/pricing/urls.py
123 123
        staff_member_required(views.criteria_delete),
124 124
        name='lingo-manager-pricing-criteria-delete',
125 125
    ),
126
    url(
127
        r'^agendas/sync/$',
128
        staff_member_required(views.agenda_sync),
129
        name='lingo-manager-agenda-sync',
130
    ),
131
    url(
132
        r'^agendas/$',
133
        staff_member_required(views.agenda_list),
134
        name='lingo-manager-agenda-list',
135
    ),
136
    url(
137
        r'^agenda/(?P<pk>\d+)/$',
138
        staff_member_required(views.agenda_detail),
139
        name='lingo-manager-agenda-detail',
140
    ),
126 141
    url(
127 142
        r'^agenda/(?P<pk>\d+)/pricing/add/$',
128 143
        staff_member_required(views.agenda_pricing_add),
lingo/pricing/views.py
28 28
from django.utils.encoding import force_text
29 29
from django.utils.translation import ugettext_lazy as _
30 30
from django.utils.translation import ungettext
31
from django.views.generic import CreateView, DeleteView, DetailView, FormView, ListView, UpdateView
31
from django.views.generic import (
32
    CreateView,
33
    DeleteView,
34
    DetailView,
35
    FormView,
36
    ListView,
37
    RedirectView,
38
    UpdateView,
39
)
32 40
from django.views.generic.detail import SingleObjectMixin
33 41

  
42
from lingo.agendas.chrono import refresh_agendas
34 43
from lingo.agendas.models import Agenda
35 44
from lingo.agendas.views import AgendaMixin
36 45
from lingo.pricing.forms import (
......
569 578
criteria_delete = CriteriaDeleteView.as_view()
570 579

  
571 580

  
581
class AgendaListView(ListView):
582
    template_name = 'lingo/pricing/manager_agenda_list.html'
583
    model = Agenda
584

  
585
    def get_queryset(self):
586
        queryset = super().get_queryset()
587
        return queryset.order_by('category_label', 'label')
588

  
589

  
590
agenda_list = AgendaListView.as_view()
591

  
592

  
593
class AgendaSyncView(RedirectView):
594
    def get(self, request, *args, **kwargs):
595
        refresh_agendas()
596
        messages.info(self.request, _('Agendas refreshed.'))
597
        return super().get(request, *args, **kwargs)
598

  
599
    def get_redirect_url(self, *args, **kwargs):
600
        return reverse('lingo-manager-agenda-list')
601

  
602

  
603
agenda_sync = AgendaSyncView.as_view()
604

  
605

  
606
class AgendaDetailView(AgendaMixin, DetailView):
607
    template_name = 'lingo/pricing/manager_agenda_detail.html'
608
    model = Agenda
609

  
610
    def get_context_data(self, **kwargs):
611
        kwargs['agenda_pricings'] = (
612
            AgendaPricing.objects.filter(agenda=self.agenda)
613
            .select_related('pricing')
614
            .order_by('date_start', 'date_end')
615
        )
616
        return super().get_context_data(**kwargs)
617

  
618

  
619
agenda_detail = AgendaDetailView.as_view()
620

  
621

  
572 622
class AgendaPricingAddView(AgendaMixin, CreateView):
573 623
    template_name = 'lingo/pricing/manager_agenda_pricing_form.html'
574 624
    model = AgendaPricing
lingo/settings.py
168 168

  
169 169
MELLON_IDENTITY_PROVIDERS = []
170 170

  
171
# proxies argument passed to all python-request methods
172
# (see http://docs.python-requests.org/en/master/user/advanced/#proxies)
173
REQUESTS_PROXIES = None
174

  
175
# timeout used in python-requests call, in seconds
176
# we use 28s by default: timeout just before web server, which is usually 30s
177
REQUESTS_TIMEOUT = 28
171 178

  
172 179
# default site
173 180
SITE_BASE_URL = 'http://localhost'
lingo/utils/__init__.py
1
# lingo - payment and billing system
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17

  
18
from .requests_wrapper import requests
lingo/utils/misc.py
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16

  
17 17
import copy
18
import urllib.parse
18 19

  
20
from django.conf import settings
19 21
from django.core.exceptions import FieldDoesNotExist, ValidationError
20 22
from django.utils.translation import ugettext_lazy as _
21 23

  
......
69 71
            except ValidationError:
70 72
                raise AgendaImportError(_('Bad slug format "%s"') % value)
71 73
    return cleaned_data
74

  
75

  
76
def get_known_service_for_url(url):
77
    netloc = urllib.parse.urlparse(url).netloc
78
    for services in settings.KNOWN_SERVICES.values():
79
        for service in services.values():
80
            remote_url = service.get('url')
81
            if urllib.parse.urlparse(remote_url).netloc == netloc:
82
                return service
83
    return None
lingo/utils/requests_wrapper.py
1
# lingo - payment and billing system
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import hashlib
18
import logging
19
import urllib.parse
20
from io import BytesIO
21

  
22
from django.conf import settings
23
from django.core.cache import cache
24
from django.utils.encoding import smart_bytes
25
from django.utils.http import urlencode
26
from requests import Response
27
from requests import Session as RequestsSession
28
from requests.auth import AuthBase
29

  
30
from .misc import get_known_service_for_url
31
from .signature import sign_url
32

  
33

  
34
class NothingInCacheException(Exception):
35
    pass
36

  
37

  
38
class PublikSignature(AuthBase):
39
    def __init__(self, secret):
40
        self.secret = secret
41

  
42
    def __call__(self, request):
43
        request.url = sign_url(request.url, self.secret)
44
        return request
45

  
46

  
47
class Requests(RequestsSession):
48
    def request(self, method, url, **kwargs):
49
        remote_service = kwargs.pop('remote_service', None)
50
        cache_duration = kwargs.pop('cache_duration', 15)
51
        invalidate_cache = kwargs.pop('invalidate_cache', False)
52
        user = kwargs.pop('user', None)
53
        django_request = kwargs.pop('django_request', None)
54
        without_user = kwargs.pop('without_user', False)
55
        federation_key = kwargs.pop('federation_key', 'auto')  # 'auto', 'email', 'nameid'
56
        raise_if_not_cached = kwargs.pop('raise_if_not_cached', False)
57
        log_errors = kwargs.pop('log_errors', True)
58

  
59
        # don't use persistent cookies
60
        self.cookies.clear()
61

  
62
        # search in legacy urls
63
        legacy_urls_mapping = getattr(settings, 'LEGACY_URLS_MAPPING', None)
64
        if legacy_urls_mapping:
65
            splitted_url = urllib.parse.urlparse(url)
66
            hostname = splitted_url.netloc
67
            if hostname in legacy_urls_mapping:
68
                url = splitted_url._replace(netloc=legacy_urls_mapping[hostname]).geturl()
69

  
70
        if remote_service == 'auto':
71
            remote_service = get_known_service_for_url(url)
72
            if remote_service:
73
                # only keeps the path (URI) in url parameter, scheme and netloc are
74
                # in remote_service
75
                scheme, netloc, path, params, query, fragment = urllib.parse.urlparse(url)
76
                url = urllib.parse.urlunparse(('', '', path, params, query, fragment))
77
            else:
78
                logging.warning('service not found in settings.KNOWN_SERVICES for %s', url)
79

  
80
        if remote_service:
81
            if isinstance(user, dict):
82
                query_params = user.copy()
83
            elif not user or not user.is_authenticated:
84
                if without_user:
85
                    query_params = {}
86
                else:
87
                    query_params = {'NameID': '', 'email': ''}
88
            else:
89
                query_params = {}
90
                if federation_key == 'nameid':
91
                    query_params['NameID'] = user.get_name_id()
92
                elif federation_key == 'email':
93
                    query_params['email'] = user.email
94
                else:  # 'auto'
95
                    user_name_id = user.get_name_id()
96
                    if user_name_id:
97
                        query_params['NameID'] = user_name_id
98
                    else:
99
                        query_params['email'] = user.email
100

  
101
            if remote_service.get('orig'):
102
                query_params['orig'] = remote_service.get('orig')
103

  
104
            remote_service_base_url = remote_service.get('url')
105
            scheme, netloc, dummy, params, old_query, fragment = urllib.parse.urlparse(
106
                remote_service_base_url
107
            )
108

  
109
            query = urlencode(query_params)
110
            if '?' in url:
111
                path, old_query = url.split('?', 1)
112
                query += '&' + old_query
113
            else:
114
                path = url
115

  
116
            url = urllib.parse.urlunparse((scheme, netloc, path, params, query, fragment))
117

  
118
        if method == 'GET' and cache_duration:
119
            # handle cache
120
            params = urlencode(kwargs.get('params', {}))
121
            cache_key = hashlib.md5(smart_bytes(url + params)).hexdigest()
122
            cache_content = cache.get(cache_key)
123
            if cache_content and not invalidate_cache:
124
                response = Response()
125
                response.status_code = 200
126
                response.raw = BytesIO(smart_bytes(cache_content))
127
                return response
128
            elif raise_if_not_cached:
129
                raise NothingInCacheException()
130

  
131
        if remote_service:  # sign
132
            kwargs['auth'] = PublikSignature(remote_service.get('secret'))
133

  
134
        kwargs['timeout'] = kwargs.get('timeout') or settings.REQUESTS_TIMEOUT
135

  
136
        response = super().request(method, url, **kwargs)
137
        if log_errors and (response.status_code // 100 != 2):
138
            extra = {}
139
            if django_request:
140
                extra['request'] = django_request
141
            if log_errors == 'warn':
142
                logging.warning(
143
                    'failed to %s %s (%s)', method, response.request.url, response.status_code, extra=extra
144
                )
145
            else:
146
                logging.error(
147
                    'failed to %s %s (%s)', method, response.request.url, response.status_code, extra=extra
148
                )
149
        if method == 'GET' and cache_duration and (response.status_code // 100 == 2):
150
            cache.set(cache_key, response.content, cache_duration)
151

  
152
        return response
153

  
154

  
155
requests = Requests()
lingo/utils/signature.py
1
# lingo - payment and billing system
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import base64
18
import datetime
19
import hashlib
20
import hmac
21
import random
22
import urllib.parse
23

  
24
from django.utils.encoding import smart_bytes
25
from django.utils.http import quote, urlencode
26

  
27

  
28
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
29
    parsed = urllib.parse.urlparse(url)
30
    new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
31
    return urllib.parse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
32

  
33

  
34
def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
35
    if timestamp is None:
36
        timestamp = datetime.datetime.utcnow()
37
    timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
38
    if nonce is None:
39
        nonce = hex(random.getrandbits(128))[2:]
40
    new_query = query
41
    if new_query:
42
        new_query += '&'
43
    new_query += urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce)))
44
    signature = base64.b64encode(sign_string(new_query, key, algo=algo))
45
    new_query += '&signature=' + quote(signature)
46
    return new_query
47

  
48

  
49
def sign_string(s, key, algo='sha256', timedelta=30):
50
    digestmod = getattr(hashlib, algo)
51
    hash = hmac.HMAC(smart_bytes(key), digestmod=digestmod, msg=smart_bytes(s))
52
    return hash.digest()
tests/agendas/test_chrono.py
1
import json
2
from unittest import mock
3

  
4
import pytest
5
from requests.exceptions import ConnectionError
6
from requests.models import Response
7

  
8
from lingo.agendas.chrono import collect_agenda_data, refresh_agendas
9
from lingo.agendas.models import Agenda
10

  
11
pytestmark = pytest.mark.django_db
12

  
13
AGENDA_DATA = [
14
    {
15
        "slug": "events-a",
16
        "kind": "events",
17
        "text": "Events A",
18
        "category": None,
19
        "category_label": None,
20
    },
21
    {
22
        "slug": "events-b",
23
        "kind": "events",
24
        "text": "Events B",
25
        "category": "foo",
26
        "category_label": "Foo",
27
    },
28
    {
29
        "slug": "meetings-a",
30
        "kind": "meetings",
31
        "text": "Meetings A",
32
        "category": None,
33
        "category_label": None,
34
    },
35
    {
36
        "slug": "virtual-b",
37
        "kind": "virtual",
38
        "text": "Virtual B",
39
        "category": "foo",
40
        "category_label": "Foo",
41
    },
42
]
43

  
44

  
45
class MockedRequestResponse(mock.Mock):
46
    status_code = 200
47

  
48
    def json(self):
49
        return json.loads(self.content)
50

  
51

  
52
def test_collect_agenda_data_no_service(settings):
53
    settings.KNOWN_SERVICES = {}
54
    assert collect_agenda_data() is None
55

  
56
    settings.KNOWN_SERVICES = {'other': []}
57
    assert collect_agenda_data() is None
58

  
59

  
60
def test_collect_agenda_data():
61
    with mock.patch('requests.Session.get') as requests_get:
62
        requests_get.side_effect = ConnectionError()
63
        assert collect_agenda_data() is None
64

  
65
    with mock.patch('requests.Session.get') as requests_get:
66
        mock_resp = Response()
67
        mock_resp.status_code = 500
68
        requests_get.return_value = mock_resp
69
        assert collect_agenda_data() is None
70

  
71
    with mock.patch('requests.Session.get') as requests_get:
72
        mock_resp = Response()
73
        mock_resp.status_code = 404
74
        requests_get.return_value = mock_resp
75
        assert collect_agenda_data() is None
76

  
77
    with mock.patch('requests.Session.get') as requests_get:
78
        requests_get.return_value = MockedRequestResponse(content=json.dumps({'foo': 'bar'}))
79
        assert collect_agenda_data() is None
80

  
81
    data = {'data': []}
82
    with mock.patch('requests.Session.get') as requests_get:
83
        requests_get.return_value = MockedRequestResponse(content=json.dumps(data))
84
        assert collect_agenda_data() == []
85
        assert requests_get.call_args_list[0][0] == ('api/agenda/',)
86
        assert requests_get.call_args_list[0][1]['remote_service']['url'] == 'http://chrono.example.org'
87

  
88
    data = {'data': AGENDA_DATA}
89
    with mock.patch('requests.Session.get') as requests_get:
90
        requests_get.return_value = MockedRequestResponse(content=json.dumps(data))
91
        assert collect_agenda_data() == [
92
            {'category_label': None, 'category_slug': None, 'label': 'Events A', 'slug': 'events-a'},
93
            {'category_label': 'Foo', 'category_slug': 'foo', 'label': 'Events B', 'slug': 'events-b'},
94
        ]
95

  
96

  
97
@mock.patch('lingo.agendas.chrono.collect_agenda_data')
98
def test_refresh_agendas(mock_collect):
99
    Agenda.objects.create(label='foo')
100

  
101
    # error during collect
102
    mock_collect.return_value = None
103
    refresh_agendas()
104
    assert Agenda.objects.count() == 1  # no changes
105

  
106
    # 2 agendas found
107
    mock_collect.return_value = [
108
        {'category_label': None, 'category_slug': None, 'label': 'Events A', 'slug': 'events-a'},
109
        {'category_label': 'Foo', 'category_slug': 'foo', 'label': 'Events B', 'slug': 'events-b'},
110
    ]
111

  
112
    # agendas don't exist, create them
113
    refresh_agendas()
114
    assert Agenda.objects.count() == 2
115
    agenda1 = Agenda.objects.all().order_by('pk')[0]
116
    agenda2 = Agenda.objects.all().order_by('pk')[1]
117
    assert agenda1.label == "Events A"
118
    assert agenda1.slug == "events-a"
119
    assert agenda1.category_label is None
120
    assert agenda1.category_slug is None
121
    assert agenda2.label == "Events B"
122
    assert agenda2.slug == "events-b"
123
    assert agenda2.category_label == 'Foo'
124
    assert agenda2.category_slug == 'foo'
125

  
126
    # again, but some attributes are wrong
127
    agenda1.label = "Wrong"
128
    agenda1.category_label = 'Foo'
129
    agenda1.category_slug = 'foo'
130
    agenda1.save()
131
    agenda2.label = "Wrong"
132
    agenda2.category_label is None
133
    agenda2.category_slug is None
134
    agenda2.save()
135
    refresh_agendas()
136
    assert Agenda.objects.count() == 2
137
    new_agenda1 = Agenda.objects.all().order_by('pk')[0]
138
    new_agenda2 = Agenda.objects.all().order_by('pk')[1]
139
    assert new_agenda1.pk == agenda1.pk
140
    assert new_agenda1.label == "Events A"
141
    assert new_agenda1.slug == "events-a"
142
    assert new_agenda1.category_label is None
143
    assert new_agenda1.category_slug is None
144
    assert new_agenda2.pk == agenda2.pk
145
    assert new_agenda2.label == "Events B"
146
    assert new_agenda2.slug == "events-b"
147
    assert new_agenda2.category_label == 'Foo'
148
    assert new_agenda2.category_slug == 'foo'
149

  
150
    # no agenda in chrono
151
    mock_collect.return_value = []
152
    refresh_agendas()
153
    assert Agenda.objects.count() == 0
tests/pricing/test_import_export.py
51 51

  
52 52
    old_stdin = sys.stdin
53 53
    sys.stdin = StringIO(json.dumps({}))
54
    agenda = Agenda.objects.create(label='Foo Bar')
55 54
    pricing = Pricing.objects.create(label='Foo')
56 55
    AgendaPricing.objects.create(
57 56
        agenda=agenda,
tests/pricing/test_manager.py
1 1
import copy
2 2
import datetime
3 3
import json
4
from unittest import mock
4 5

  
5 6
import pytest
6 7
from webtest import Upload
......
12 13
pytestmark = pytest.mark.django_db
13 14

  
14 15

  
15
def test_export_site(settings, freezer, app, admin_user):
16
def test_export_site(freezer, app, admin_user):
16 17
    freezer.move_to('2020-06-15')
17 18
    login(app)
18 19
    resp = app.get('/manage/pricing/')
......
48 49
    assert 'pricing_models' not in site_json
49 50

  
50 51

  
51
def test_add_pricing(settings, app, admin_user):
52
def test_add_pricing(app, admin_user):
52 53
    app = login(app)
53 54
    resp = app.get('/manage/')
54 55
    resp = resp.click('Pricing')
......
425 426
    ]
426 427

  
427 428

  
428
def test_add_category(settings, app, admin_user):
429
def test_add_category(app, admin_user):
429 430
    app = login(app)
430 431
    resp = app.get('/manage/')
431 432
    resp = resp.click('Pricing')
......
685 686
    assert Criteria.objects.count() == 6
686 687

  
687 688

  
688
@pytest.mark.xfail(reason='agenda views not yet implemented')
689
def test_add_agenda_pricing(settings, app, admin_user):
689
@mock.patch('lingo.pricing.views.refresh_agendas')
690
def test_refresh_agendas(mock_refresh, app, admin_user):
691
    app = login(app)
692
    resp = app.get('/manage/pricing/agendas/')
693
    resp = resp.click('Refresh agendas')
694
    assert resp.location.endswith('/manage/pricing/agendas/')
695
    resp = resp.follow()
696
    assert "Agendas refreshed." in resp
697
    assert mock_refresh.call_args_list == [mock.call()]
698

  
699

  
700
def test_add_agenda_pricing(app, admin_user):
690 701
    agenda = Agenda.objects.create(label='Foo Bar')
691 702
    pricing = Pricing.objects.create(label='Model')
692 703

  
693 704
    app = login(app)
694
    resp = app.get('/manage/agendas/%s/settings' % agenda.pk)
705
    resp = app.get('/manage/pricing/agendas/')
706
    resp = resp.click(href='/manage/pricing/agenda/%s/' % agenda.pk)
695 707
    resp = resp.click('New pricing')
696 708
    resp.form['pricing'] = pricing.pk
697 709
    resp.form['date_start'] = '2021-09-01'
......
799 811
    resp = app.get('/manage/pricing/agenda/%s/pricing/%s/' % (agenda.pk, agenda_pricing.pk))
800 812
    resp = resp.click(href='/manage/pricing/agenda/%s/pricing/%s/delete/' % (agenda.pk, agenda_pricing.pk))
801 813
    resp = resp.form.submit()
802
    # XXX: assert resp.location.endswith('/manage/agendas/%s/settings' % agenda.pk)
814
    assert resp.location.endswith('/manage/pricing/agenda/%s/' % agenda.pk)
803 815
    assert AgendaPricing.objects.filter(pk=agenda_pricing.pk).exists() is False
804 816

  
805 817

  
806
@pytest.mark.xfail(reason='agenda settings view not yet implemented')
807 818
def test_detail_agenda_pricing(app, admin_user):
808 819
    agenda = Agenda.objects.create(label='Foo Bar')
809 820
    pricing = Pricing.objects.create(label='Model')
......
822 833
    )
823 834

  
824 835
    app = login(app)
825
    resp = app.get('/manage/agendas/%s/settings' % agenda.pk)
836
    resp = app.get('/manage/pricing/agenda/%s/' % agenda.pk)
826 837
    resp = resp.click(href='/manage/pricing/agenda/%s/pricing/%s/' % (agenda.pk, agenda_pricing.pk))
827 838

  
828 839
    app.get('/manage/pricing/agenda/%s/pricing/%s/' % (agenda.pk, agenda_pricing2.pk), status=404)
......
1377 1388
        )
1378 1389

  
1379 1390

  
1380
@pytest.mark.xfail(reason='agenda views not yet implemented')
1381 1391
def test_edit_agenda_pricing_matrix_empty(app, admin_user):
1382 1392
    agenda = Agenda.objects.create(label='Foo bar')
1383 1393
    pricing = Pricing.objects.create(label='Foo bar')
tests/settings.py
18 18
        },
19 19
    }
20 20
}
21

  
22
KNOWN_SERVICES = {
23
    'chrono': {
24
        'default': {
25
            'title': 'test',
26
            'url': 'http://chrono.example.org',
27
            'secret': 'lingo',
28
            'orig': 'lingo',
29
            'backoffice-menu-url': 'http://chrono.example.org/manage/',
30
            'secondary': False,
31
        },
32
        'other': {
33
            'title': 'other',
34
            'url': 'http://other.chrono.example.org',
35
            'secret': 'lingo',
36
            'orig': 'lingo',
37
            'backoffice-menu-url': 'http://other.chrono.example.org/manage/',
38
            'secondary': True,
39
        },
40
    },
41
}
21
-