0001-create-announces-from-external-RSS-feed-12919.patch
corbo/forms.py | ||
---|---|---|
8 | 8 | |
9 | 9 |
class Meta: |
10 | 10 |
model = Announce |
11 |
fields = '__all__'
|
|
11 |
exclude = ('identifier',)
|
|
12 | 12 |
widgets = { |
13 | 13 |
'publication_time': forms.TextInput(attrs={'class': 'datetimepicker', |
14 | 14 |
'readonly': True}), |
... | ... | |
26 | 26 | |
27 | 27 |
class CategoryForm(forms.ModelForm): |
28 | 28 |
class Meta: |
29 |
fields = ('name', ) |
|
29 |
fields = ('name', 'rss_feed_url')
|
|
30 | 30 |
model = Category |
corbo/migrations/0007_auto_20160928_1454.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
from __future__ import unicode_literals |
|
3 | ||
4 |
from django.db import migrations, models |
|
5 | ||
6 | ||
7 |
class Migration(migrations.Migration): |
|
8 | ||
9 |
dependencies = [ |
|
10 |
('corbo', '0006_auto_20160928_0833'), |
|
11 |
] |
|
12 | ||
13 |
operations = [ |
|
14 |
migrations.AddField( |
|
15 |
model_name='announce', |
|
16 |
name='identifier', |
|
17 |
field=models.CharField(max_length=256, null=True, blank=True), |
|
18 |
), |
|
19 |
migrations.AddField( |
|
20 |
model_name='category', |
|
21 |
name='rss_feed_url', |
|
22 |
field=models.URLField(help_text='if defined, announces will be automatically created from rss items', null=True, verbose_name='Feed URL', blank=True), |
|
23 |
), |
|
24 |
] |
corbo/models.py | ||
---|---|---|
1 | 1 |
import os |
2 |
import io |
|
2 | 3 |
import hashlib |
4 |
from time import mktime |
|
3 | 5 |
from datetime import datetime |
4 | 6 |
import logging |
5 | 7 |
import urlparse |
6 | 8 |
from html2text import HTML2Text |
7 | 9 |
from emails.django import Message |
8 | 10 |
from lxml.etree import HTML as HTMLTree |
11 |
import requests |
|
12 |
import feedparser |
|
9 | 13 | |
10 | 14 |
from django.utils import timezone |
11 | 15 |
from django.conf import settings |
... | ... | |
39 | 43 | |
40 | 44 |
class Category(models.Model): |
41 | 45 |
name = models.CharField(_('Name'), max_length=64, blank=False, null=False) |
46 |
rss_feed_url = models.URLField(_('Feed URL'), blank=True, null=True, |
|
47 |
help_text=_('if defined, announces will be automatically created from rss items')) |
|
42 | 48 |
ctime = models.DateTimeField(auto_now_add=True) |
43 | 49 | |
44 | 50 |
def __unicode__(self): |
... | ... | |
50 | 56 |
def get_subscriptions_count(self): |
51 | 57 |
return self.subscription_set.all().count() |
52 | 58 | |
59 |
def save(self, *args, **kwargs): |
|
60 |
super(Category, self).save(*args, **kwargs) |
|
61 |
if not self.rss_feed_url: |
|
62 |
return |
|
63 |
feed_response = requests.get(self.rss_feed_url) |
|
64 |
if feed_response.ok: |
|
65 |
content = feedparser.parse(feed_response.content) |
|
66 |
for entry in content.get('entries', []): |
|
67 |
substitutions = [] |
|
68 |
published = datetime.fromtimestamp(mktime(entry.published_parsed)) |
|
69 |
html_tree = HTMLTree(entry['summary']) |
|
70 |
storage = DefaultStorage() |
|
71 |
for img in html_tree.xpath('//img/@src'): |
|
72 |
image_name = os.path.basename(img) |
|
73 |
r = requests.get(img) |
|
74 |
new_content = r.content |
|
75 |
if storage.exists(image_name): |
|
76 |
old_content = storage.open(image_name).read() |
|
77 |
old_hash = hashlib.md5(old_content).hexdigest() |
|
78 |
new_hash = hashlib.md5(new_content).hexdigest() |
|
79 |
substitutions.append((img, storage.url(image_name))) |
|
80 |
if new_hash == old_hash: |
|
81 |
continue |
|
82 |
new_image_name = storage.save(image_name, io.BytesIO(new_content)) |
|
83 |
substitutions.append((img, storage.url(new_image_name))) |
|
84 | ||
85 |
text = entry['summary'] |
|
86 |
for old, new in substitutions: |
|
87 |
text = text.replace(old, new) |
|
88 | ||
89 |
announce, created = Announce.objects.get_or_create(identifier=entry['id'], |
|
90 |
category=self) |
|
91 |
announce.title = entry['title'] |
|
92 |
announce.text = text |
|
93 |
announce.publication_time = published |
|
94 |
announce.save() |
|
95 | ||
96 |
if created: |
|
97 |
Broadcast.objects.get_or_create(announce=announce) |
|
98 | ||
53 | 99 | |
54 | 100 |
class Announce(models.Model): |
55 | 101 |
category = models.ForeignKey('Category', verbose_name=_('category')) |
56 | 102 |
title = models.CharField(_('title'), max_length=256, |
57 | 103 |
help_text=_('maximum 256 characters')) |
104 |
identifier = models.CharField(max_length=256, null=True, blank=True) |
|
58 | 105 |
text = RichTextField(_('Content')) |
59 | 106 |
publication_time = models.DateTimeField(_('publication time'), blank=True, |
60 | 107 |
null=True) |
corbo/templates/corbo/category_detail.html | ||
---|---|---|
11 | 11 |
{% block appbar %} |
12 | 12 |
<h2>{{ object.name }}</h2> |
13 | 13 |
<a href="{% url 'delete_category' object.id %}" rel="popup">{% trans 'Delete' %}</a> |
14 |
<a href="{% url 'edit_category' object.id %}" rel="popup">{% trans 'Rename' %}</a>
|
|
14 |
<a href="{% url 'edit_category' object.id %}" rel="popup">{% trans 'Edit' %}</a>
|
|
15 | 15 |
<a href="{% url 'add_announce' pk=object.pk %}">{% trans 'New announce' %}</a> |
16 | 16 |
{% endblock %} |
17 | 17 |
corbo/templates/corbo/category_form.html | ||
---|---|---|
3 | 3 | |
4 | 4 |
{% block appbar %} |
5 | 5 |
{% if object %} |
6 |
<h2>{% trans "Modify Category" %}</h2>
|
|
6 |
<h2>{% trans "Edit Category" %}</h2>
|
|
7 | 7 |
{% else %} |
8 | 8 |
<h2>{% trans "New Category" %}</h2> |
9 | 9 |
{% endif %} |
debian/control | ||
---|---|---|
12 | 12 |
python-django (>= 1.7), |
13 | 13 |
python-django-ckeditor, |
14 | 14 |
python-gadjo, |
15 |
python-emails |
|
15 |
python-emails, |
|
16 |
python-requests, |
|
17 |
python-feedparser |
|
16 | 18 |
Description: Announces Manager |
17 | 19 | |
18 | 20 |
Package: corbo |
debian/corbo.cron.d | ||
---|---|---|
1 | 1 |
PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin |
2 | 2 | |
3 | 3 |
*/5 * * * * corbo corbo-manage tenant_command send_announces --all-tenants |
4 |
0 * * * * corbo corbo-manage tenant_command sync_external_feeds --all-tenants |
requirements.txt | ||
---|---|---|
3 | 3 |
djangorestframework>=3.3,<3.4 |
4 | 4 |
html2text |
5 | 5 |
emails |
6 |
feedparser |
|
7 |
requests |
|
6 | 8 |
-e git+http://repos.entrouvert.org/gadjo.git/#egg=gadjo |
setup.py | ||
---|---|---|
100 | 100 |
'gadjo', |
101 | 101 |
'emails', |
102 | 102 |
'lxml', |
103 |
'feedparser', |
|
104 |
'requests' |
|
103 | 105 |
], |
104 | 106 |
zip_safe=False, |
105 | 107 |
cmdclass={ |
tests/test_announces.py | ||
---|---|---|
1 |
import os |
|
2 |
import pytest |
|
3 |
import mock |
|
4 |
import feedparser |
|
5 | ||
6 |
from django.core.files.storage import DefaultStorage |
|
7 | ||
8 |
from corbo.models import Category |
|
9 | ||
10 |
pytestmark = pytest.mark.django_db |
|
11 | ||
12 |
CATEGORIES = ('Alerts',) |
|
13 | ||
14 |
ATOM_FEED = """ |
|
15 |
<?xml version="1.0" encoding="UTF-8"?> |
|
16 |
<feed xml:lang="en-US" xmlns="http://www.w3.org/2005/Atom" xmlns:wfw="http://wellformedweb.org/CommentAPI/"> |
|
17 |
<id>tag:linuxfr.org,2005:/news</id> |
|
18 |
<title>Sample RSS Feeds</title> |
|
19 |
<updated>2016-09-16T10:29:46+02:00</updated> |
|
20 |
<entry> |
|
21 |
<id>tag:linuxfr.org,2005:News/37537</id> |
|
22 |
<published>2016-09-16T10:29:46+02:00</published> |
|
23 |
<updated>2016-09-16T11:27:00+02:00</updated> |
|
24 |
<title>Feed entry sample</title> |
|
25 |
<content type="html"> |
|
26 |
<img src="http://example.com/logo.png"> |
|
27 |
Feed entry content |
|
28 |
</content> |
|
29 |
<author> |
|
30 |
<name>Foo Bar</name> |
|
31 |
</author> |
|
32 |
</entry> |
|
33 |
</feed> |
|
34 |
""" |
|
35 | ||
36 |
def mocked_request_get(*args, **kwargs): |
|
37 |
storage = DefaultStorage() |
|
38 |
class MockResponse: |
|
39 | ||
40 |
def __init__(self, content): |
|
41 |
self.ok = True |
|
42 |
self.content = content |
|
43 | ||
44 |
if args[0] == 'http://example.com/atom': |
|
45 |
return MockResponse(ATOM_FEED) |
|
46 |
else: |
|
47 |
logo_path = os.path.join(os.path.dirname(__file__), 'media', 'logo.png') |
|
48 |
print "RETURNING: %s" % logo_path |
|
49 |
return MockResponse(file(logo_path).read()) |
|
50 | ||
51 | ||
52 |
@mock.patch('corbo.models.requests.get', side_effect=mocked_request_get) |
|
53 |
def test_announces_from_feed(mocked_get): |
|
54 |
storage = DefaultStorage() |
|
55 |
feed_content = feedparser.parse(ATOM_FEED) |
|
56 |
for category in CATEGORIES: |
|
57 |
c = Category.objects.create(name=category, rss_feed_url='http://example.com/atom') |
|
58 |
assert c.announce_set.count() == len(feed_content['entries']) |
|
59 |
for announce in c.announce_set.all(): |
|
60 |
assert announce.title in [feed['title'] for feed in feed_content['entries']] |
|
61 |
assert storage.url('logo.png') in announce.text |
|
62 |
# cleanup uploaded images |
|
63 |
os.unlink(storage.path('logo.png')) |
tox.ini | ||
---|---|---|
19 | 19 |
djangorestframework>=3.3,<3.4 |
20 | 20 |
pylint==1.4.0 |
21 | 21 |
astroid==1.3.2 |
22 |
mock |
|
22 | 23 |
commands = |
23 | 24 |
py.test {env:COVERAGE:} {posargs:tests/} |
24 |
- |