Project

General

Profile

Download (4.96 KB) Statistics
| Branch: | Tag: | Revision:

root / tests / test_announces.py @ e01c978b

1
import os
2
import pytest
3
import mock
4
import feedparser
5
import tempfile
6

    
7
from django.core.files.storage import DefaultStorage
8
from django.utils import timezone
9
from django.urls import reverse
10
from django.conf import settings
11
from django.test import override_settings
12

    
13
from corbo.models import Category, Announce
14

    
15
pytestmark = pytest.mark.django_db
16

    
17
CATEGORIES = ('Alerts',)
18

    
19
ATOM_FEED = """<?xml version="1.0" encoding="UTF-8"?>
20
<feed xml:lang="en-US" xmlns="http://www.w3.org/2005/Atom" xmlns:wfw="http://wellformedweb.org/CommentAPI/">
21
  <id>tag:linuxfr.org,2005:/news</id>
22
  <title>Sample RSS Feeds</title>
23
  <updated>2016-09-16T10:29:46+02:00</updated>
24
  <entry>
25
    <id>tag:linuxfr.org,2005:News/37537</id>
26
    <published>2016-09-16T10:29:46+02:00</published>
27
    <updated>2016-09-16T11:27:00+02:00</updated>
28
    <title>Feed entry sample</title>
29
    <content type="html">
30
      &lt;img src="http://example.com/logo.png"&gt;
31
      Feed entry content
32
      &lt;img src="http://site.com/error.png"&gt;
33
      Another logo
34
      &lt;img src="http://example.com/folder/logo.png"&gt;
35
    </content>
36
    <author>
37
      <name>Foo Bar</name>
38
    </author>
39
  </entry>
40
</feed>
41
"""
42

    
43

    
44
def mocked_request_get(*args, **kwargs):
45

    
46
    class MockResponse:
47

    
48
        def __init__(self, content, ok=True):
49
            self.ok = ok
50
            self.content = content
51

    
52
    if args[0] == 'http://example.com/atom':
53
        return MockResponse(ATOM_FEED)
54
    elif args[0] == 'http://example.com/logo.png':
55
        logo_path = os.path.join(os.path.dirname(__file__), 'media', 'logo.png')
56
        return MockResponse(open(logo_path, 'rb').read())
57
    elif args[0] == 'http://example.com/folder/logo.png':
58
        logo_path = os.path.join(os.path.dirname(__file__), 'media', 'another_logo.png')
59
        return MockResponse(open(logo_path, 'rb').read())
60
    else:
61
        return MockResponse(None, ok=False)
62

    
63

    
64
@mock.patch('corbo.models.requests.get', side_effect=mocked_request_get)
65
def test_announces_from_feed(mocked_get):
66
    tmpdirname = tempfile.mkdtemp(suffix='_corbo')
67
    with override_settings(MEDIA_ROOT=tmpdirname):
68
        storage = DefaultStorage()
69
        feed_content = feedparser.parse(ATOM_FEED)
70
        for category in CATEGORIES:
71
            c = Category.objects.create(name=category, rss_feed_url='http://example.com/atom')
72
            assert c.announce_set.count() == len(feed_content['entries'])
73
            for announce in c.announce_set.all():
74
                assert announce.title in [feed['title'] for feed in feed_content['entries']]
75
                assert storage.url(os.path.join('images', str(announce.id), '01_logo.png')) in announce.text
76
                assert os.path.exists(storage.path(os.path.join('images', str(announce.id), '01_logo.png')))
77
                assert storage.url(os.path.join('images', str(announce.id), '02_logo.png')) in announce.text
78
                assert os.path.exists(storage.path(os.path.join('images', str(announce.id), '02_logo.png')))
79
                assert storage.url('error.png') not in announce.text
80
                assert 'http://site.com/error.png' in announce.text
81
        # cleanup uploaded images
82
        os.unlink(storage.path(os.path.join('images', str(announce.id), '01_logo.png')))
83
        os.unlink(storage.path(os.path.join('images', str(announce.id), '02_logo.png')))
84
        os.rmdir(storage.path(os.path.join('images', str(announce.id))))
85
        os.rmdir(storage.path('images'))
86
    os.rmdir(tmpdirname)
87

    
88

    
89
def test_announces_publishing(app):
90
    c = Category.objects.create(name='Test announces')
91
    a = Announce.objects.create(category=c, title='Test 1',
92
                                text='text')
93
    feed_content = feedparser.parse(app.get(reverse('atom')).text)
94
    assert len(feed_content['entries']) == 0
95
    a.publication_time = timezone.now()
96
    a.save()
97
    feed_content = feedparser.parse(app.get(reverse('atom')).text)
98
    assert len(feed_content['entries']) == 1
99
    a.publication_time = timezone.now() + timezone.timedelta(days=1)
100
    a.save()
101
    feed_content = feedparser.parse(app.get(reverse('atom')).text)
102
    assert len(feed_content['entries']) == 0
103
    a.publication_time = timezone.now()
104
    a.expiration_time = timezone.now()
105
    a.save()
106
    feed_content = feedparser.parse(app.get(reverse('atom')).text)
107
    assert len(feed_content['entries']) == 0
108

    
109
def test_rss_feed_items(app):
110
    c = Category.objects.create(name='Test announces')
111
    for i in range(10):
112
        a = Announce.objects.create(category=c, title='Test %s' % i,
113
                                    publication_time=timezone.now(),
114
                                    text='text of announce %s' % i)
115
    feed_content = feedparser.parse(app.get(reverse('atom')).text)
116
    assert len(feed_content['entries']) <= settings.RSS_ITEMS_LIMIT
117
    for i in range(i, 10):
118
        a = Announce.objects.create(category=c, title='Test %s' % i,
119
                                    publication_time=timezone.now(),
120
                                    text='text of announce %s' % i)
121
    assert len(feed_content['entries']) <= settings.RSS_ITEMS_LIMIT
(3-3/10)