Load feeds and gather info (#103)

This commit is contained in:
Marian Steinbach 2018-12-07 16:32:42 +01:00 committed by GitHub
parent 3063a4488d
commit 3b9ead330d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 181 additions and 1 deletions

View file

@ -6,7 +6,7 @@ RUN echo "http://dl-4.alpinelinux.org/alpine/v3.7/main" >> /etc/apk/repositories
apk update && \
apk --no-cache add chromium chromium-chromedriver python3-dev build-base git py3-lxml libxml2 libxml2-dev libxslt libxslt-dev libffi-dev openssl-dev && \
pip3 install --upgrade pip && \
pip3 install selenium==3.8.0 GitPython PyYAML beautifulsoup4==4.6.0 html-similarity==0.3.2 httpretty==0.9.4 pyopenssl==18.0.0 requests==2.18.4 responses==0.9.0 smmap2==2.0.3 urllib3==1.22 google-cloud-datastore==1.7.0 tenacity==5.0.2 && \
pip3 install selenium==3.8.0 GitPython PyYAML beautifulsoup4==4.6.0 html-similarity==0.3.2 httpretty==0.9.4 feedparser==5.2.1 pyopenssl==18.0.0 requests==2.18.4 responses==0.9.0 smmap2==2.0.3 urllib3==1.22 google-cloud-datastore==1.7.0 tenacity==5.0.2 && \
apk del python3-dev build-base
ADD cli.py /

View file

@ -16,6 +16,7 @@ from checks import html_head
from checks import http_and_https
from checks import hyperlinks
from checks import page_content
from checks import load_feeds
from checks import load_in_browser
from checks import url_reachability
from checks import url_canonicalization
@ -45,6 +46,7 @@ def perform_checks(input_url):
('frameset', frameset),
('hyperlinks', hyperlinks),
('generator', generator),
('load_feeds', load_feeds),
('load_in_browser', load_in_browser),
]

104
checks/load_feeds.py Normal file
View file

@ -0,0 +1,104 @@
"""
Loads feeds linked from pages and collects information on the contained content
"""
import logging
from time import mktime
from datetime import datetime
import feedparser
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
self.feeds = {}
def depends_on_results(self):
return ['html_head']
def run(self):
assert 'html_head' in self.previous_results
for url in self.config.urls:
self.collect_feeds(url)
for feed_url in self.feeds:
self.feeds[feed_url] = self.analyse_feed(feed_url)
return self.feeds
def collect_feeds(self, url):
"""
This collects the feeds from all urls.
The assumption is that in most cases the urls will reference the same
feeds.
"""
head = self.previous_results['html_head'][url]
assert 'link_rss_atom' in head
assert isinstance(head['link_rss_atom'], list)
for feed_url in head['link_rss_atom']:
if feed_url not in self.feeds:
self.feeds[feed_url] = {}
result = {
'feeds': [],
'exception': None,
}
return result
def analyse_feed(self, feed_url):
result = {
'exception': None,
'title': None,
'latest_entry': None,
'first_entry': None,
'average_interval': None,
'num_entries': None,
}
logging.debug("Loading feed %s" % feed_url)
data = feedparser.parse(feed_url)
if 'bozo_exception' in data:
result['exception'] = data['bozo_exception']
if data['headers'].get('status') not in ('200', '301', '302'):
result['exception'] = 'Server responded with status %s' % data['headers'].get('status')
if 'feed' in data:
result['title'] = data['feed'].get('title')
if 'entries' in data:
result['num_entries'] = len(data['entries'])
result['latest_entry'] = self.find_latest_entry(data['entries'])
result['first_entry'] = self.find_first_entry(data['entries'])
if result['num_entries'] > 1 and result['first_entry'] < result['latest_entry']:
result['average_interval'] = round((result['latest_entry'] - result['first_entry']).total_seconds() / (result['num_entries'] - 1))
return result
def find_latest_entry(self, entries):
max_date = None
for entry in entries:
timestamp = mktime(entry.get('published_parsed'))
if max_date is None or timestamp > max_date:
max_date = timestamp
return datetime.fromtimestamp(max_date)
def find_first_entry(self, entries):
min_date = None
for entry in entries:
timestamp = mktime(entry.get('published_parsed'))
if min_date is None or timestamp < min_date:
min_date = timestamp
return datetime.fromtimestamp(min_date)

74
checks/load_feeds_test.py Normal file
View file

@ -0,0 +1,74 @@
import httpretty
from httpretty import httprettified
import unittest
from checks import html_head, page_content
from checks import load_feeds
from checks.config import Config
from datetime import datetime
@httprettified
class TestFeed(unittest.TestCase):
def test_feed_rss2(self):
"""
Checks RSS 2.0
"""
feed = """<?xml version="1.0"?>
<rss version="2.0">
<channel>
<title>Liftoff News</title>
<link>http://liftoff.msfc.nasa.gov/</link>
<description>Liftoff to Space Exploration.</description>
<pubDate>Tue, 10 Jun 2003 04:00:00 GMT</pubDate>
<item>
<title>Star City</title>
<link>http://liftoff.msfc.nasa.gov/news/2003/news-starcity.asp</link>
<pubDate>Tue, 03 Jun 2003 09:39:21 GMT</pubDate>
<guid>http://liftoff.msfc.nasa.gov/2003/06/03.html#item573</guid>
</item>
<item>
<description>Sky watchers in Europe, Asia, and parts of Alaska and Canada will experience a &lt;a href="http://science.nasa.gov/headlines/y2003/30may_solareclipse.htm"&gt;partial eclipse of the Sun&lt;/a&gt; on Saturday, May 31st.</description>
<pubDate>Fri, 30 May 2003 11:06:42 GMT</pubDate>
<guid>http://liftoff.msfc.nasa.gov/2003/05/30.html#item572</guid>
</item>
</channel>
</rss>
"""
feed_url = 'http://example.com/feed.xml'
httpretty.register_uri(httpretty.GET, feed_url,
body=feed,
adding_headers={
"Content-type": "application/rss+xml",
})
# mocking a previous result from some page
results = {
'html_head': {
'http://example.com/': {
'link_rss_atom': ['http://example.com/feed.xml']
}
}
}
config = Config(urls=['http://example.com/'])
checker = load_feeds.Checker(config=config, previous_results=results)
result = checker.run()
print(result)
self.assertEqual(result, {
'http://example.com/feed.xml': {
'exception': None,
'title': 'Liftoff News',
'latest_entry': datetime(2003, 6, 3, 10, 39, 21),
'first_entry': datetime(2003, 5, 30, 12, 6, 42),
'average_interval': 340359,
'num_entries': 2,
}
})
if __name__ == '__main__':
unittest.main()