0001-misc-fix-pep8-errors.patch
corbo/api_urls.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU Affero General Public License |
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
from django.conf.urls import patterns, include, url
|
|
17 |
from django.conf.urls import patterns, url |
|
18 | 18 | |
19 | 19 |
from .api_views import NewslettersView, SubscriptionsView |
20 | 20 |
corbo/api_views.py | ||
---|---|---|
43 | 43 | |
44 | 44 |
def get_subscriptions(self, email, uuid=None): |
45 | 45 |
subscriptions = defaultdict(dict) |
46 |
identifier = 'mailto:'+email
|
|
47 |
for s in Subscription.objects.filter(Q(identifier=identifier)|Q(uuid=uuid)):
|
|
46 |
identifier = 'mailto:' + email
|
|
47 |
for s in Subscription.objects.filter(Q(identifier=identifier) | Q(uuid=uuid)):
|
|
48 | 48 |
cat_id = s.category.pk |
49 | 49 |
subscriptions[cat_id]['id'] = str(cat_id) |
50 | 50 |
subscriptions[cat_id]['text'] = s.category.name |
corbo/models.py | ||
---|---|---|
63 | 63 |
if feed_response.ok: |
64 | 64 |
content = feedparser.parse(feed_response.content) |
65 | 65 |
for entry in content.get('entries', []): |
66 |
substitutions = [] |
|
67 | 66 |
published = datetime.fromtimestamp(mktime(entry.published_parsed)) |
68 | 67 |
html_tree = etree.HTML(entry['summary']) |
69 | 68 |
storage = DefaultStorage() |
... | ... | |
82 | 81 |
img.attrib['src'] = storage.url(new_image_name) |
83 | 82 | |
84 | 83 |
announce, created = Announce.objects.get_or_create(identifier=entry['id'], |
85 |
category=self) |
|
84 |
category=self)
|
|
86 | 85 |
announce.title = entry['title'] |
87 | 86 |
announce.text = etree.tostring(html_tree) |
88 | 87 |
announce.publication_time = published |
... | ... | |
93 | 92 | |
94 | 93 | |
95 | 94 |
class Announce(models.Model): |
96 |
category = models.ForeignKey('Category', verbose_name=_('category'))
|
|
95 |
category = models.ForeignKey('Category', verbose_name=_('category')) |
|
97 | 96 |
title = models.CharField(_('title'), max_length=256, |
98 | 97 |
help_text=_('maximum 256 characters')) |
99 | 98 |
identifier = models.CharField(max_length=256, null=True, blank=True) |
... | ... | |
106 | 105 |
mtime = models.DateTimeField(_('modification time'), auto_now=True) |
107 | 106 | |
108 | 107 |
def __unicode__(self): |
109 |
return u'{title} ({id}) at {mtime}'.format(title=self.title,
|
|
110 |
id=self.id, mtime=self.mtime)
|
|
108 |
return u'{title} ({id}) at {mtime}'.format( |
|
109 |
title=self.title, id=self.id, mtime=self.mtime)
|
|
111 | 110 | |
112 | 111 |
def is_expired(self): |
113 | 112 |
if self.expiration_time: |
... | ... | |
141 | 140 |
handler = HTML2Text() |
142 | 141 |
template = loader.get_template('corbo/announce.html') |
143 | 142 |
message = Message(subject=self.announce.title, mail_from=settings.CORBO_DEFAULT_FROM_EMAIL, |
144 |
html=template.render(Context({'content': self.announce.text, |
|
145 |
'unsubscribe_link_placeholder': UNSUBSCRIBE_LINK_PLACEHOLDER}))) |
|
143 |
html=template.render( |
|
144 |
Context({'content': self.announce.text, |
|
145 |
'unsubscribe_link_placeholder': UNSUBSCRIBE_LINK_PLACEHOLDER}))) |
|
146 | 146 |
html_tree = etree.HTML(self.announce.text) |
147 | 147 |
storage = DefaultStorage() |
148 | 148 |
for img in html_tree.xpath('//img/@src'): |
... | ... | |
160 | 160 |
continue |
161 | 161 |
unsubscribe_token = signing.dumps({'category': self.announce.category.pk, |
162 | 162 |
'identifier': s.identifier}) |
163 |
unsubscribe_link = urlparse.urljoin(settings.SITE_BASE_URL, reverse('unsubscribe',
|
|
164 |
kwargs={'unsubscription_token': unsubscribe_token}))
|
|
163 |
unsubscribe_link = urlparse.urljoin(settings.SITE_BASE_URL, reverse( |
|
164 |
'unsubscribe', kwargs={'unsubscription_token': unsubscribe_token}))
|
|
165 | 165 |
message.html = message.html.replace(UNSUBSCRIBE_LINK_PLACEHOLDER, unsubscribe_link) |
166 | 166 |
handler.body_width = 0 |
167 | 167 |
message.text = handler.handle(message.html) |
... | ... | |
185 | 185 |
category = models.ForeignKey('Category', verbose_name=_('Category')) |
186 | 186 |
uuid = models.CharField(_('User identifier'), max_length=128, blank=True) |
187 | 187 |
identifier = models.CharField(_('identifier'), max_length=128, blank=True, |
188 |
help_text=_('ex.: mailto, ...')) |
|
188 |
help_text=_('ex.: mailto, ...'))
|
|
189 | 189 | |
190 | 190 |
def get_identifier_display(self): |
191 | 191 |
try: |
corbo/views.py | ||
---|---|---|
6 | 6 |
from django.core import signing |
7 | 7 |
from django.core.urlresolvers import reverse |
8 | 8 |
from django.views.generic import CreateView, UpdateView, DeleteView, \ |
9 |
ListView, TemplateView, RedirectView, \ |
|
10 |
DetailView |
|
9 |
ListView, TemplateView, RedirectView, DetailView |
|
11 | 10 |
from django.contrib.syndication.views import Feed |
12 | 11 |
from django.shortcuts import resolve_url |
13 | 12 |
from django.utils.encoding import force_text |
14 | 13 |
from django.utils.feedgenerator import Atom1Feed as DjangoAtom1Feed |
15 |
from django.utils.http import urlencode |
|
16 | 14 |
from django.http import HttpResponseRedirect, HttpResponse, Http404 |
17 | 15 |
from django.contrib.auth import logout as auth_logout |
18 | 16 |
from django.contrib.auth import views as auth_views |
... | ... | |
26 | 24 |
except ImportError: |
27 | 25 |
get_idps = lambda: [] |
28 | 26 | |
27 | ||
29 | 28 |
def login(request, *args, **kwargs): |
30 | 29 |
if any(get_idps()): |
31 |
if not 'next' in request.GET:
|
|
30 |
if 'next' not in request.GET:
|
|
32 | 31 |
return HttpResponseRedirect(resolve_url('mellon_login')) |
33 |
return HttpResponseRedirect(resolve_url('mellon_login') + '?next=' |
|
34 |
+ urllib.quote(request.GET.get('next')))
|
|
32 |
return HttpResponseRedirect(resolve_url('mellon_login') + '?next=' +
|
|
33 |
urllib.quote(request.GET.get('next'))) |
|
35 | 34 |
return auth_views.login(request, *args, **kwargs) |
36 | 35 | |
36 | ||
37 | 37 |
def logout(request, next_page=None): |
38 | 38 |
if any(get_idps()): |
39 | 39 |
return HttpResponseRedirect(resolve_url('mellon_logout')) |
... | ... | |
48 | 48 |
class HomepageView(RedirectView): |
49 | 49 |
pattern_name = 'manage' |
50 | 50 | |
51 | ||
51 | 52 |
homepage = HomepageView.as_view() |
52 | 53 | |
53 | 54 | |
... | ... | |
69 | 70 |
context['category'] = kwargs['form'].initial['category'] |
70 | 71 |
return context |
71 | 72 | |
73 | ||
72 | 74 |
add_announce = AnnounceCreateView.as_view() |
73 | 75 | |
74 | 76 | |
... | ... | |
85 | 87 |
def get_success_url(self): |
86 | 88 |
return reverse('view_category', kwargs={'pk': self.object.category.pk}) |
87 | 89 | |
90 | ||
88 | 91 |
edit_announce = AnnounceEditView.as_view() |
89 | 92 | |
93 | ||
90 | 94 |
class AnnounceDeleteView(DeleteView): |
91 | 95 |
model = models.Announce |
92 | 96 | |
93 | 97 |
def get_success_url(self): |
94 | 98 |
return reverse('view_category', kwargs={'pk': self.object.category.pk}) |
95 | 99 | |
100 | ||
96 | 101 |
delete_announce = AnnounceDeleteView.as_view() |
97 | 102 | |
103 | ||
98 | 104 |
class CategoryCreateView(CreateView): |
99 | 105 |
form_class = CategoryForm |
100 | 106 |
template_name = 'corbo/category_form.html' |
... | ... | |
106 | 112 |
form.save() |
107 | 113 |
return super(CategoryCreateView, self).form_valid(form) |
108 | 114 | |
115 | ||
109 | 116 |
add_category = CategoryCreateView.as_view() |
110 | 117 | |
111 | 118 | |
... | ... | |
116 | 123 |
def get_success_url(self): |
117 | 124 |
return reverse('view_category', kwargs={'pk': self.object.pk}) |
118 | 125 | |
126 | ||
119 | 127 |
edit_category = CategoryEditView.as_view() |
120 | 128 | |
121 | 129 | |
... | ... | |
127 | 135 |
context['announces'] = self.object.announce_set.all() |
128 | 136 |
return context |
129 | 137 | |
138 | ||
130 | 139 |
view_category = CategoryView.as_view() |
131 | 140 | |
132 | 141 | |
... | ... | |
136 | 145 |
def get_success_url(self): |
137 | 146 |
return reverse('manage') |
138 | 147 | |
148 | ||
139 | 149 |
delete_category = CategoryDeleteView.as_view() |
140 | 150 | |
141 | 151 | |
... | ... | |
146 | 156 |
data = signing.loads(self.kwargs['unsubscription_token']) |
147 | 157 |
try: |
148 | 158 |
return models.Subscription.objects.get(category__pk=data['category'], |
149 |
identifier=data['identifier']) |
|
159 |
identifier=data['identifier'])
|
|
150 | 160 |
except models.Subscription.DoesNotExist: |
151 | 161 |
raise Http404 |
152 | 162 | |
153 | 163 |
def get_success_url(self): |
154 | 164 |
return reverse('unsubscription_done') |
155 | 165 | |
166 | ||
156 | 167 |
unsubscribe = UnsubscribeView.as_view() |
157 | 168 | |
158 | 169 | |
159 | 170 |
class UnsubscriptionDoneView(TemplateView): |
160 |
template_name='corbo/unsubscription_done.html' |
|
171 |
template_name = 'corbo/unsubscription_done.html' |
|
172 | ||
161 | 173 | |
162 | 174 |
unsubscription_done = UnsubscriptionDoneView.as_view() |
163 | 175 | |
... | ... | |
166 | 178 |
template_name = 'corbo/manage.html' |
167 | 179 |
model = models.Category |
168 | 180 | |
181 | ||
169 | 182 |
manage = ManageView.as_view() |
170 | 183 | |
171 | 184 | |
172 | 185 |
class Atom1Feed(DjangoAtom1Feed): |
186 | ||
173 | 187 |
def root_attributes(self): |
174 | 188 |
attrs = super(Atom1Feed, self).root_attributes() |
175 | 189 |
attrs.update({'xml:base': self.feed['link']}) |
... | ... | |
201 | 215 |
def item_pubdate(self, item): |
202 | 216 |
return item.publication_time or item.mtime |
203 | 217 | |
218 | ||
204 | 219 |
atom = AtomView() |
205 | 220 | |
206 | 221 | |
207 | 222 |
def menu_json(request): |
208 | 223 |
label = _('Announces') |
209 | 224 |
json_str = json.dumps([{'label': force_text(label), |
210 |
'slug': 'announces', |
|
211 |
'url': request.build_absolute_uri(reverse('manage'))
|
|
212 |
}]) |
|
225 |
'slug': 'announces',
|
|
226 |
'url': request.build_absolute_uri(reverse('manage'))}])
|
|
227 | ||
213 | 228 |
for variable in ('jsonpCallback', 'callback'): |
214 | 229 |
if variable in request.GET: |
215 | 230 |
response = HttpResponse(content_type='application/javascript') |
tests/conftest.py | ||
---|---|---|
1 | 1 |
import pytest |
2 | 2 |
import django_webtest |
3 | 3 | |
4 | ||
4 | 5 |
@pytest.fixture |
5 | 6 |
def app(request): |
6 | 7 |
wtm = django_webtest.WebTestMixin() |
tests/test_announces.py | ||
---|---|---|
33 | 33 |
</feed> |
34 | 34 |
""" |
35 | 35 | |
36 | ||
36 | 37 |
def mocked_request_get(*args, **kwargs): |
37 | 38 |
storage = DefaultStorage() |
39 | ||
38 | 40 |
class MockResponse: |
39 | 41 | |
40 | 42 |
def __init__(self, content): |
tests/test_api.py | ||
---|---|---|
1 | 1 |
import pytest |
2 |
import json |
|
3 | 2 |
from uuid import uuid4 |
4 | 3 | |
5 | 4 | |
... | ... | |
23 | 22 |
categories.append(c) |
24 | 23 |
return categories |
25 | 24 | |
25 | ||
26 | 26 |
@pytest.fixture |
27 | 27 |
def announces(): |
28 | 28 |
announces = [] |
... | ... | |
35 | 35 |
announces.append(a) |
36 | 36 |
return announces |
37 | 37 | |
38 | ||
38 | 39 |
@pytest.fixture |
39 | 40 |
def user(): |
40 | 41 |
User = get_user_model() |
41 |
user = User.objects.create(username='john.doe', |
|
42 |
first_name=u'John', last_name=u'Doe', email='john.doe@example.net')
|
|
42 |
user = User.objects.create(username='john.doe', first_name=u'John',
|
|
43 |
last_name=u'Doe', email='john.doe@example.net')
|
|
43 | 44 |
user.set_password('password') |
44 | 45 |
user.save() |
45 | 46 |
return user |
... | ... | |
67 | 68 |
for identifier, name in channel_choices[:1]: |
68 | 69 |
for category in categories: |
69 | 70 |
uri = '%s:%s' % (identifier, foo) |
70 |
subscription = Subscription.objects.create(identifier=uri, |
|
71 |
category=category) |
|
71 |
Subscription.objects.create(identifier=uri, category=category) |
|
72 | 72 |
resp = app.get(reverse('subscriptions'), {'email': foo}) |
73 | 73 |
assert 'data' in resp.json |
74 | 74 |
data = resp.json['data'] |
... | ... | |
92 | 92 |
subscriptions = [{'id': category_id, |
93 | 93 |
'text': category.name, |
94 | 94 |
'transports': transports}] |
95 |
resp = app.post_json(subscriptions_url , subscriptions)
|
|
95 |
resp = app.post_json(subscriptions_url, subscriptions) |
|
96 | 96 |
if resp.json['data']: |
97 | 97 |
resp = app.get(subscriptions_url) |
98 |
print resp.json['data'] |
|
98 | ||
99 | 99 |
for cat in resp.json['data']: |
100 | 100 |
if cat['id'] == category_id: |
101 | 101 |
sub_transports = [c['id'] for c in cat['transports']] |
tests/test_emailing.py | ||
---|---|---|
1 |
import urlparse |
|
2 | 1 |
import pytest |
3 |
import json |
|
4 | 2 |
from uuid import uuid4 |
5 | 3 |
import os |
6 | 4 |
import re |
7 | 5 |
import urllib |
8 | 6 | |
9 | 7 |
from django.core.urlresolvers import reverse |
10 |
from django.utils.http import urlencode |
|
11 | 8 |
from django.core import mail, signing |
12 | 9 |
from django.utils import timezone |
13 | 10 |
from django.core.files.storage import DefaultStorage |
14 |
from django.core.urlresolvers import reverse |
|
15 |
from django.conf import settings |
|
16 | 11 | |
17 |
from corbo.models import Category, Announce, Subscription, Broadcast |
|
18 |
from corbo.models import channel_choices, transform_image_src |
|
12 |
from corbo.models import Category, Announce, Subscription, Broadcast, transform_image_src |
|
19 | 13 | |
20 | 14 |
pytestmark = pytest.mark.django_db |
21 | 15 | |
... | ... | |
30 | 24 |
categories.append(c) |
31 | 25 |
return categories |
32 | 26 | |
27 | ||
33 | 28 |
@pytest.fixture |
34 | 29 |
def announces(): |
35 | 30 |
announces = [] |
... | ... | |
46 | 41 |
announces.append(a) |
47 | 42 |
return announces |
48 | 43 | |
44 | ||
49 | 45 |
def test_emailing_with_no_subscriptions(app, categories, announces): |
50 | 46 |
for announce in announces: |
51 | 47 |
broadcast = Broadcast.objects.get(announce=announce) |
... | ... | |
53 | 49 |
assert not broadcast.result |
54 | 50 |
assert not mail.outbox |
55 | 51 | |
52 | ||
56 | 53 |
def test_send_email(app, categories, announces): |
57 | 54 |
for category in categories: |
58 | 55 |
uuid = uuid4() |
59 |
s = Subscription.objects.create(category=category,
|
|
60 |
identifier='%s@example.net' % uuid, uuid=uuid) |
|
56 |
Subscription.objects.create(category=category, |
|
57 |
identifier='%s@example.net' % uuid, uuid=uuid)
|
|
61 | 58 |
for announce in announces: |
62 |
broadcast= Broadcast.objects.get(announce=announce) |
|
59 |
broadcast = Broadcast.objects.get(announce=announce)
|
|
63 | 60 |
broadcast.send() |
64 | 61 |
assert broadcast.result |
65 | 62 |
assert mail.outbox |
66 | 63 | |
64 | ||
67 | 65 |
def test_check_inline_css(app, categories, announces): |
68 | 66 |
for announce in announces: |
69 | 67 |
announce.text = '<style type="text/css">h2 {color: #F00}</style>' + announce.text |
70 | 68 |
announce.save() |
71 | 69 |
uuid = uuid4() |
72 |
s = Subscription.objects.create(category=announce.category,
|
|
73 |
identifier='%s@example.net' % uuid, uuid=uuid) |
|
70 |
Subscription.objects.create(category=announce.category, |
|
71 |
identifier='%s@example.net' % uuid, uuid=uuid)
|
|
74 | 72 |
broadcast = Broadcast.objects.get(announce=announce) |
75 | 73 |
broadcast.send() |
76 | 74 |
assert broadcast.result |
... | ... | |
78 | 76 |
assert 'h2 style="color:#F00"' in mail.outbox[0].html |
79 | 77 |
mail.outbox = [] |
80 | 78 | |
79 | ||
81 | 80 |
def test_check_inline_images(app, categories, announces): |
82 | 81 |
storage = DefaultStorage() |
83 | 82 |
media_path = os.path.join(os.path.dirname(__file__), 'media') |
... | ... | |
88 | 87 |
announce.text = announce.text + '<img src="%s" />' % img_src |
89 | 88 |
announce.save() |
90 | 89 |
uuid = uuid4() |
91 |
s = Subscription.objects.create(category=announce.category,
|
|
92 |
identifier='%s@example.net' % uuid, uuid=uuid) |
|
90 |
Subscription.objects.create(category=announce.category, |
|
91 |
identifier='%s@example.net' % uuid, uuid=uuid)
|
|
93 | 92 |
broadcast = Broadcast.objects.get(announce=announce) |
94 | 93 |
broadcast.send() |
95 | 94 |
assert broadcast.result |
... | ... | |
101 | 100 |
mail.outbox = [] |
102 | 101 |
storage.delete(image_name) |
103 | 102 | |
103 | ||
104 | 104 |
def test_unsubscription_link(app, categories, announces): |
105 | 105 |
unsubscription_link_sentinel = '' |
106 | 106 |
for category in categories: |
107 | 107 |
uuid = uuid4() |
108 | 108 |
scheme = 'mailto:' |
109 | 109 |
uri = scheme + '%s@example.net' % uuid |
110 |
s = Subscription.objects.create(category=category, |
|
111 |
identifier=uri, |
|
112 |
uuid=str(uuid)) |
|
110 |
Subscription.objects.create(category=category, identifier=uri, uuid=str(uuid)) |
|
113 | 111 |
for announce in announces: |
114 | 112 |
if announce.category != category: |
115 | 113 |
continue |
116 |
broadcast= Broadcast.objects.get(announce=announce) |
|
114 |
broadcast = Broadcast.objects.get(announce=announce)
|
|
117 | 115 |
broadcast.send() |
118 | 116 |
assert broadcast.result |
119 | 117 |
assert mail.outbox |
... | ... | |
121 | 119 |
signature = urllib.unquote(re.findall('/unsubscribe/(.*)"', mail.outbox[0].html)[0]) |
122 | 120 |
unsubscription_link = reverse('unsubscribe', kwargs={'unsubscription_token': signature}) |
123 | 121 |
assert signing.loads(signature) == { |
124 |
'category': announce.category.pk, 'identifier': uri}
|
|
122 |
'category': announce.category.pk, 'identifier': uri} |
|
125 | 123 |
assert mail.outbox[0].subject == announce.title |
126 | 124 |
assert unsubscription_link in mail.outbox[0].html |
127 | 125 |
assert unsubscription_link in mail.outbox[0].text |
128 |
- |