Révision aa90f191
Ajouté par Serghei Mihai (congés, retour 15/05) il y a environ 7 ans
corbo/forms.py | ||
---|---|---|
1 |
import csv |
|
2 |
|
|
1 | 3 |
from django import forms |
2 | 4 |
from django.utils.translation import ugettext_lazy as _ |
3 | 5 |
from django.utils.text import slugify |
4 | 6 |
from django.core.exceptions import ObjectDoesNotExist |
7 |
from django.core import validators |
|
8 |
from django.core.exceptions import ValidationError |
|
5 | 9 |
|
6 | 10 |
from .models import Announce, Category, Broadcast, channel_choices |
7 | 11 |
|
... | ... | |
45 | 49 |
slug = '%s-%s' % (base_slug, i) |
46 | 50 |
self.instance.slug = slug |
47 | 51 |
return super(CategoryForm, self).save(commit=commit) |
52 |
|
|
53 |
|
|
54 |
class SubscriptionsImportForm(forms.Form): |
|
55 |
subscribers = forms.FileField(_('Subscribers'), |
|
56 |
help_text=_('utf-8 encoded, comma separated file with email addresses on first column')) |
|
57 |
|
|
58 |
def clean_subscribers(self, *args, **kwargs): |
|
59 |
subscribers = [] |
|
60 |
reader = csv.reader(self.cleaned_data['subscribers']) |
|
61 |
for idx, row in enumerate(reader, 1): |
|
62 |
if not row or not row[0]: |
|
63 |
continue |
|
64 |
try: |
|
65 |
validators.validate_email(row[0]) |
|
66 |
except ValidationError: |
|
67 |
raise ValidationError(_('Invalid email address at line %d' % idx)) |
|
68 |
subscribers.append(row[0]) |
|
69 |
return subscribers |
corbo/manage_urls.py | ||
---|---|---|
2 | 2 |
|
3 | 3 |
from .views import add_announce, edit_announce, delete_announce, \ |
4 | 4 |
add_category, edit_category, view_category, delete_category, manage, \ |
5 |
menu_json |
|
5 |
subscriptions_import, menu_json
|
|
6 | 6 |
|
7 | 7 |
urlpatterns = patterns('', |
8 | 8 |
url(r'^$', manage, name='manage'), |
... | ... | |
20 | 20 |
name='edit_category'), |
21 | 21 |
url(r'^category/delete/(?P<slug>[\w-]+)$', delete_category, |
22 | 22 |
name='delete_category'), |
23 |
url(r'^category/(?P<slug>[\w-]+)/import-subscriptions/$', subscriptions_import, |
|
24 |
name='subscriptions-import'), |
|
23 | 25 |
url(r'^menu.json$', menu_json), |
24 | 26 |
) |
corbo/templates/corbo/category_detail.html | ||
---|---|---|
10 | 10 |
|
11 | 11 |
{% block appbar %} |
12 | 12 |
<h2>{{ object.name }}</h2> |
13 |
<a class="extra-actions-menu-opener">☰</a> |
|
14 |
<ul class="extra-actions-menu"> |
|
15 |
<li><a href="{% url 'subscriptions-import' slug=category.slug %}" rel="popup">{% trans 'Import subscriptions' %}</a></li> |
|
16 |
</ul> |
|
17 |
|
|
13 | 18 |
<a href="{% url 'delete_category' slug=object.slug %}" rel="popup">{% trans 'Delete' %}</a> |
14 | 19 |
<a href="{% url 'edit_category' slug=object.slug %}" rel="popup">{% trans 'Edit' %}</a> |
15 | 20 |
<a href="{% url 'add_announce' slug=object.slug %}">{% trans 'New announce' %}</a> |
corbo/templates/corbo/subscriptions_import_form.html | ||
---|---|---|
1 |
{% extends "corbo/category_detail.html" %} |
|
2 |
{% load i18n static %} |
|
3 |
|
|
4 |
{% block breadcrumb %} |
|
5 |
{{ block.super }} |
|
6 |
<a href='{% url "view_category" slug=category.slug %}'>{{ category }}</a> |
|
7 |
{% endblock %} |
|
8 |
|
|
9 |
{% block appbar %} |
|
10 |
<h2>{% trans "Import subscriptions" %}</h2> |
|
11 |
{% endblock %} |
|
12 |
|
|
13 |
|
|
14 |
{% block content %} |
|
15 |
<form method="post" enctype="multipart/form-data"> |
|
16 |
{% csrf_token %} |
|
17 |
{{ form.as_p }} |
|
18 |
<div class="buttons"> |
|
19 |
<button>{% trans "Save" %}</button> |
|
20 |
<a href="{% url "view_category" slug=category.slug %}" class="cancel">{% trans "Cancel" %}</a> |
|
21 |
</div> |
|
22 |
</form> |
|
23 |
{% endblock %} |
corbo/views.py | ||
---|---|---|
6 | 6 |
from django.utils import timezone |
7 | 7 |
from django.core.urlresolvers import reverse |
8 | 8 |
from django.views.generic import CreateView, UpdateView, DeleteView, \ |
9 |
ListView, TemplateView, RedirectView, DetailView |
|
9 |
ListView, TemplateView, RedirectView, DetailView, FormView
|
|
10 | 10 |
from django.contrib.syndication.views import Feed |
11 | 11 |
from django.shortcuts import resolve_url |
12 | 12 |
from django.utils.encoding import force_text |
... | ... | |
14 | 14 |
from django.http import HttpResponseRedirect, HttpResponse, Http404 |
15 | 15 |
from django.contrib.auth import logout as auth_logout |
16 | 16 |
from django.contrib.auth import views as auth_views |
17 |
from django.contrib import messages |
|
17 | 18 |
from django.utils.translation import ugettext_lazy as _ |
19 |
from django.utils.translation import ngettext |
|
18 | 20 |
|
19 | 21 |
import models |
20 |
from .forms import AnnounceForm, CategoryForm |
|
22 |
from .forms import AnnounceForm, CategoryForm, SubscriptionsImportForm
|
|
21 | 23 |
|
22 | 24 |
try: |
23 | 25 |
from mellon.utils import get_idps |
... | ... | |
222 | 224 |
atom = AtomView() |
223 | 225 |
|
224 | 226 |
|
227 |
class SubscriptionsImportView(FormView): |
|
228 |
form_class = SubscriptionsImportForm |
|
229 |
template_name = 'corbo/subscriptions_import_form.html' |
|
230 |
|
|
231 |
def get_context_data(self, **kwargs): |
|
232 |
context = super(SubscriptionsImportView, self).get_context_data(**kwargs) |
|
233 |
context['category'] = models.Category.objects.get(slug=self.kwargs['slug']) |
|
234 |
return context |
|
235 |
|
|
236 |
def get_success_url(self): |
|
237 |
category = models.Category.objects.get(slug=self.kwargs['slug']) |
|
238 |
return reverse('view_category', kwargs={'slug': category.slug}) |
|
239 |
|
|
240 |
def form_valid(self, form): |
|
241 |
new = 0 |
|
242 |
c = models.Category.objects.get(slug=self.kwargs['slug']) |
|
243 |
for email in form.cleaned_data['subscribers']: |
|
244 |
obj, created = models.Subscription.objects.get_or_create(category=c, identifier='mailto:%s' % email) |
|
245 |
if created: |
|
246 |
new += 1 |
|
247 |
messages.info(self.request, ngettext('%(new)d subscriber added', '%(new)d subscribers added', new) % {'new': new}) |
|
248 |
return super(SubscriptionsImportView, self).form_valid(form) |
|
249 |
|
|
250 |
subscriptions_import = SubscriptionsImportView.as_view() |
|
251 |
|
|
252 |
|
|
225 | 253 |
def menu_json(request): |
226 | 254 |
label = _('Announces') |
227 | 255 |
json_str = json.dumps([{'label': force_text(label), |
tests/test_subscribers.py | ||
---|---|---|
1 |
import pytest |
|
2 |
from webtest import TestApp, Upload |
|
3 |
|
|
4 |
from django.core.wsgi import get_wsgi_application |
|
5 |
from django.utils.text import slugify |
|
6 |
from django.core.urlresolvers import reverse |
|
7 |
from django.contrib.auth import get_user_model |
|
8 |
|
|
9 |
from corbo.models import Category, Subscription |
|
10 |
|
|
11 |
pytestmark = pytest.mark.django_db |
|
12 |
|
|
13 |
CATEGORIES = (u'Alerts',) |
|
14 |
|
|
15 |
CSV_CONTENT = """foo@example.net, Foo, |
|
16 |
john.doe@example.net, John Doe, |
|
17 |
bar@localhost, Bar, |
|
18 |
""" |
|
19 |
|
|
20 |
|
|
21 |
@pytest.fixture |
|
22 |
def categories(): |
|
23 |
categories = [] |
|
24 |
for category in CATEGORIES: |
|
25 |
c, created = Category.objects.get_or_create(name=category, slug=slugify(category)) |
|
26 |
categories.append(c) |
|
27 |
return categories |
|
28 |
|
|
29 |
@pytest.fixture |
|
30 |
def admin(): |
|
31 |
User = get_user_model() |
|
32 |
admin = User.objects.create_superuser(username='admin', |
|
33 |
password='password', email='admin@example.net') |
|
34 |
return admin |
|
35 |
|
|
36 |
def login(app, username='admin', password='password'): |
|
37 |
login_page = app.get('/login/') |
|
38 |
login_form = login_page.forms[0] |
|
39 |
login_form['username'] = username |
|
40 |
login_form['password'] = password |
|
41 |
resp = login_form.submit() |
|
42 |
assert resp.status_int == 302 |
|
43 |
return app |
|
44 |
|
|
45 |
def test_subscribe_from_csv(admin, categories): |
|
46 |
app = login(TestApp(get_wsgi_application())) |
|
47 |
for c in categories: |
|
48 |
page = app.get(reverse('subscriptions-import', kwargs={'slug': c.slug})) |
|
49 |
form = page.form |
|
50 |
form['subscribers'] = Upload('users.csv', CSV_CONTENT) |
|
51 |
res = form.submit() |
|
52 |
assert res.status_code == 302 |
|
53 |
assert Subscription.objects.filter(category=c).count() == len(CSV_CONTENT.splitlines()) |
|
54 |
|
|
55 |
def test_subscribe_from_csv_with_empty_lines(admin, categories): |
|
56 |
app = login(TestApp(get_wsgi_application())) |
|
57 |
content = CSV_CONTENT + '\n\n\n' |
|
58 |
for c in categories: |
|
59 |
page = app.get(reverse('subscriptions-import', kwargs={'slug': c.slug})) |
|
60 |
form = page.form |
|
61 |
form['subscribers'] = Upload('users.csv', content) |
|
62 |
res = form.submit() |
|
63 |
assert res.status_code == 302 |
|
64 |
assert Subscription.objects.filter(category=c).count() == len(CSV_CONTENT.splitlines()) |
|
65 |
|
|
66 |
def test_subscribe_with_invalid_email(admin, categories): |
|
67 |
app = login(TestApp(get_wsgi_application())) |
|
68 |
content = CSV_CONTENT + '\nwrong, Wrong user,' |
|
69 |
for category in categories: |
|
70 |
page = app.get(reverse('subscriptions-import', kwargs={'slug': category.slug})) |
|
71 |
form = page.form |
|
72 |
form['subscribers'] = Upload('users.csv', content) |
|
73 |
page = form.submit() |
|
74 |
assert page.status_code == 200 |
|
75 |
page.mustcontain('Invalid email address at line') |
Formats disponibles : Unified diff
import subscribers from csv (#14010)