Project

General

Profile

Download (4.96 KB) Statistics
| Branch: | Tag: | Revision:
import os
import pytest
import mock
import feedparser
import tempfile

from django.core.files.storage import DefaultStorage
from django.utils import timezone
from django.urls import reverse
from django.conf import settings
from django.test import override_settings

from corbo.models import Category, Announce

pytestmark = pytest.mark.django_db

CATEGORIES = ('Alerts',)

ATOM_FEED = """<?xml version="1.0" encoding="UTF-8"?>
<feed xml:lang="en-US" xmlns="http://www.w3.org/2005/Atom" xmlns:wfw="http://wellformedweb.org/CommentAPI/">
<id>tag:linuxfr.org,2005:/news</id>
<title>Sample RSS Feeds</title>
<updated>2016-09-16T10:29:46+02:00</updated>
<entry>
<id>tag:linuxfr.org,2005:News/37537</id>
<published>2016-09-16T10:29:46+02:00</published>
<updated>2016-09-16T11:27:00+02:00</updated>
<title>Feed entry sample</title>
<content type="html">
&lt;img src="http://example.com/logo.png"&gt;
Feed entry content
&lt;img src="http://site.com/error.png"&gt;
Another logo
&lt;img src="http://example.com/folder/logo.png"&gt;
</content>
<author>
<name>Foo Bar</name>
</author>
</entry>
</feed>
"""


def mocked_request_get(*args, **kwargs):

class MockResponse:

def __init__(self, content, ok=True):
self.ok = ok
self.content = content

if args[0] == 'http://example.com/atom':
return MockResponse(ATOM_FEED)
elif args[0] == 'http://example.com/logo.png':
logo_path = os.path.join(os.path.dirname(__file__), 'media', 'logo.png')
return MockResponse(open(logo_path, 'rb').read())
elif args[0] == 'http://example.com/folder/logo.png':
logo_path = os.path.join(os.path.dirname(__file__), 'media', 'another_logo.png')
return MockResponse(open(logo_path, 'rb').read())
else:
return MockResponse(None, ok=False)


@mock.patch('corbo.models.requests.get', side_effect=mocked_request_get)
def test_announces_from_feed(mocked_get):
tmpdirname = tempfile.mkdtemp(suffix='_corbo')
with override_settings(MEDIA_ROOT=tmpdirname):
storage = DefaultStorage()
feed_content = feedparser.parse(ATOM_FEED)
for category in CATEGORIES:
c = Category.objects.create(name=category, rss_feed_url='http://example.com/atom')
assert c.announce_set.count() == len(feed_content['entries'])
for announce in c.announce_set.all():
assert announce.title in [feed['title'] for feed in feed_content['entries']]
assert storage.url(os.path.join('images', str(announce.id), '01_logo.png')) in announce.text
assert os.path.exists(storage.path(os.path.join('images', str(announce.id), '01_logo.png')))
assert storage.url(os.path.join('images', str(announce.id), '02_logo.png')) in announce.text
assert os.path.exists(storage.path(os.path.join('images', str(announce.id), '02_logo.png')))
assert storage.url('error.png') not in announce.text
assert 'http://site.com/error.png' in announce.text
# cleanup uploaded images
os.unlink(storage.path(os.path.join('images', str(announce.id), '01_logo.png')))
os.unlink(storage.path(os.path.join('images', str(announce.id), '02_logo.png')))
os.rmdir(storage.path(os.path.join('images', str(announce.id))))
os.rmdir(storage.path('images'))
os.rmdir(tmpdirname)


def test_announces_publishing(app):
c = Category.objects.create(name='Test announces')
a = Announce.objects.create(category=c, title='Test 1',
text='text')
feed_content = feedparser.parse(app.get(reverse('atom')).text)
assert len(feed_content['entries']) == 0
a.publication_time = timezone.now()
a.save()
feed_content = feedparser.parse(app.get(reverse('atom')).text)
assert len(feed_content['entries']) == 1
a.publication_time = timezone.now() + timezone.timedelta(days=1)
a.save()
feed_content = feedparser.parse(app.get(reverse('atom')).text)
assert len(feed_content['entries']) == 0
a.publication_time = timezone.now()
a.expiration_time = timezone.now()
a.save()
feed_content = feedparser.parse(app.get(reverse('atom')).text)
assert len(feed_content['entries']) == 0

def test_rss_feed_items(app):
c = Category.objects.create(name='Test announces')
for i in range(10):
a = Announce.objects.create(category=c, title='Test %s' % i,
publication_time=timezone.now(),
text='text of announce %s' % i)
feed_content = feedparser.parse(app.get(reverse('atom')).text)
assert len(feed_content['entries']) <= settings.RSS_ITEMS_LIMIT
for i in range(i, 10):
a = Announce.objects.create(category=c, title='Test %s' % i,
publication_time=timezone.now(),
text='text of announce %s' % i)
assert len(feed_content['entries']) <= settings.RSS_ITEMS_LIMIT
(3-3/10)