Marian Steinbach 0c0bcbf54e
Mehrere Fixes und Verbesserungen (#343)
* Use UTC for feed item age calculation

* Improvements in script

* Prevent output buffering in job creation

* Remove unused environment variable references

* Print more detailed results count

* Bring back function to execute a single spider job

* Fix 'make spider' command

* Upgrade docker to 5.0.3
2024-03-07 11:31:16 +01:00

122 lines
3.7 KiB

Loads feeds linked from pages and collects information on the contained content
import logging
from time import mktime
from datetime import datetime
from datetime import timezone
import feedparser
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
self.feeds = {}
def depends_on_results(self):
return ['html_head']
def run(self):
assert 'html_head' in self.previous_results
for url in self.config.urls:
for feed_url in self.feeds:
self.feeds[feed_url] = self.analyse_feed(feed_url)
return self.feeds
def collect_feeds(self, url):
This collects the feeds from all urls.
The assumption is that in most cases the urls will reference the same
head = self.previous_results['html_head'][url]
if 'link_rss_atom' not in head:
if not isinstance(head['link_rss_atom'], list):
for feed_url in head['link_rss_atom']:
if feed_url not in self.feeds:
self.feeds[feed_url] = {}
result = {
'feeds': [],
'exception': None,
return result
def analyse_feed(self, feed_url):
result = {
'exception': None,
'title': None,
'latest_entry': None,
'first_entry': None,
'average_interval': None,
'num_entries': None,
logging.debug("Loading feed %s" % feed_url)
data = feedparser.parse(feed_url)
if 'bozo_exception' in data:
result['exception'] = str(data['bozo_exception'])
if 'headers' not in data:
return result
if data['headers'].get('status') not in ('200', '301', '302'):
result['exception'] = 'Server responded with status %s' % data['headers'].get('status')
if 'feed' in data:
result['title'] = data['feed'].get('title')
if 'entries' in data:
result['num_entries'] = len(data['entries'])
result['latest_entry'] = self.find_latest_entry(data['entries'])
result['first_entry'] = self.find_first_entry(data['entries'])
if (result['num_entries'] > 1 and
result['first_entry'] is not None and
result['latest_entry'] is not None and
result['first_entry'] < result['latest_entry']):
result['average_interval'] = round((result['latest_entry'] - result['first_entry']).total_seconds() / (result['num_entries'] - 1))
return result
def find_latest_entry(self, entries):
max_date = None
for entry in entries:
published_parsed = entry.get('published_parsed')
if published_parsed is None:
timestamp = mktime(published_parsed)
if max_date is None or timestamp > max_date:
max_date = timestamp
if max_date is not None:
return datetime.fromtimestamp(max_date).replace(tzinfo=timezone.utc)
def find_first_entry(self, entries):
min_date = None
for entry in entries:
published_parsed = entry.get('published_parsed')
if published_parsed is None:
timestamp = mktime(published_parsed)
if min_date is None or timestamp < min_date:
min_date = timestamp
if min_date is not None:
return datetime.fromtimestamp(min_date).replace(tzinfo=timezone.utc)