Refactor and modularize spider (#70)

See PR description for details
pull/72/head
Marian Steinbach 4 years ago committed by GitHub
parent 7514aeb542
commit ae6a2e83e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      .dockerignore
  2. 1
      .gitignore
  3. 7
      .travis.yml
  4. 19
      Dockerfile
  5. 31
      Makefile
  6. 64
      checks/__init__.py
  7. 23
      checks/abstract_checker.py
  8. 62
      checks/certificate.py
  9. 27
      checks/certificate_test.py
  10. 77
      checks/charset.py
  11. 49
      checks/charset_test.py
  12. 29
      checks/config.py
  13. 55
      checks/dns_resolution.py
  14. 44
      checks/domain_variations.py
  15. 107
      checks/duplicate_content.py
  16. 76
      checks/generator.py
  17. 152
      checks/html_head.py
  18. 27
      checks/http_and_https.py
  19. 134
      checks/load_in_browser.py
  20. 94
      checks/page_content.py
  21. 13
      checks/url_canonicalization.py
  22. 104
      checks/url_reachability.py
  23. 71
      checks/url_reachability_test.py
  24. 83
      cli.py
  25. 23
      config/__init__.py
  26. 33
      devops/run-job.sh
  27. 89
      export/__init__.py
  28. 180
      jobs/__init__.py
  29. 53
      rating/__init__.py
  30. 22
      rating/abstract_rater.py
  31. 31
      rating/canonical_url.py
  32. 32
      rating/favicon.py
  33. 35
      rating/feeds.py
  34. 47
      rating/https.py
  35. 48
      rating/no_network_errors.py
  36. 42
      rating/no_script_errors.py
  37. 36
      rating/reachable.py
  38. 35
      rating/resolvable.py
  39. 46
      rating/response_duration.py
  40. 35
      rating/responsive_layout.py
  41. 41
      rating/use_specific_fonts.py
  42. 44
      rating/www_optional.py
  43. 814
      spider.py
  44. 0
      spider/__init__.py
  45. 106
      spider/spider.py
  46. 26
      spider/spider_test.py
  47. 125
      spider_test.py

@ -4,3 +4,4 @@ docs
secrets
temp
venv
/export-*

1
.gitignore vendored

@ -7,3 +7,4 @@ __pycache__
.vscode/settings.json
webapp/dist/bundle.js
dev-shm
/export-*

@ -6,5 +6,12 @@ services:
notifications:
email: false
language: python
python:
- "3.6"
script:
- pip install --upgrade pip
- pip install --upgrade codecov
- make test
- codecov

@ -1,17 +1,20 @@
FROM python:3.6-alpine3.7
FROM python:3.6-alpine3.8
# Note: we pin selenium to 3.8.0 because of https://github.com/SeleniumHQ/selenium/issues/5296
RUN echo "http://dl-4.alpinelinux.org/alpine/v3.7/main" >> /etc/apk/repositories && \
echo "http://dl-4.alpinelinux.org/alpine/v3.7/community" >> /etc/apk/repositories && \
apk update && \
apk --no-cache add chromium chromium-chromedriver python3-dev build-base git && \
apk --no-cache add chromium chromium-chromedriver python3-dev build-base git py3-lxml libxml2 libxml2-dev libxslt libxslt-dev libffi-dev openssl-dev && \
pip3 install --upgrade pip && \
pip3 install selenium==3.8.0 GitPython PyYAML beautifulsoup4==4.6.0 requests==2.18.4 responses==0.9.0 smmap2==2.0.3 urllib3==1.22 google-cloud-datastore==1.7.0 tenacity==5.0.2 && \
pip3 install selenium==3.8.0 GitPython PyYAML beautifulsoup4==4.6.0 html-similarity==0.3.2 httpretty==0.9.4 pyopenssl==18.0.0 requests==2.18.4 responses==0.9.0 smmap2==2.0.3 urllib3==1.22 google-cloud-datastore==1.7.0 tenacity==5.0.2 && \
apk del python3-dev build-base
ADD spider.py /
ADD spider_test.py /
ADD data_export.py /
ADD cli.py /
ADD config /config
ADD jobs /jobs
ADD checks /checks
ADD rating /rating
ADD spider /spider
ADD export /export
ENTRYPOINT ["python3"]
CMD ["/spider.py"]
ENTRYPOINT ["python3", "/cli.py"]

@ -1,18 +1,20 @@
IMAGE := quay.io/netzbegruenung/green-spider:latest
DB_ENTITY := spider-results
.PHONY: dockerimage
# Build docker image
dockerimage:
docker build -t quay.io/netzbegruenung/green-spider:latest .
docker build -t $(IMAGE) .
# Create spider job queue
spiderjobs: dockerimage
docker run --rm -ti \
-v $(PWD)/secrets:/secrets \
quay.io/netzbegruenung/green-spider:latest spider.py \
$(IMAGE) \
--credentials-path /secrets/datastore-writer.json \
--loglevel debug \
--loglevel info \
jobs
# Run spider in docker image
@ -21,11 +23,26 @@ spider: dockerimage
-v $(PWD)/dev-shm:/dev/shm \
-v $(PWD)/webapp/dist/data:/out \
-v $(PWD)/secrets:/secrets \
quay.io/netzbegruenung/green-spider:latest spider.py \
$(IMAGE) \
--credentials-path /secrets/datastore-writer.json \
--loglevel info \
spider
--loglevel debug \
spider --kind $(DB_ENTITY)
export: dockerimage
docker run --rm -ti \
-v $(PWD)/export-json:/out \
-v $(PWD)/secrets:/secrets \
-v $(PWD)/export-siteicons:/icons \
$(IMAGE) \
--credentials-path /secrets/datastore-reader.json \
--loglevel debug \
export --kind $(DB_ENTITY)
# run spider tests
# FIXME
test: dockerimage
docker run --rm -ti quay.io/netzbegruenung/green-spider:latest /spider_test.py
docker run --rm -ti \
--entrypoint "python3" \
$(IMAGE) \
-m unittest discover -p '*_test.py'

@ -0,0 +1,64 @@
"""
The checks module contains the functionality to get information and test certain
functionality of a site or individual pages.
"""
import logging
from checks import charset
from checks import certificate
from checks import dns_resolution
from checks import duplicate_content
from checks import domain_variations
from checks import generator
from checks import html_head
from checks import http_and_https
from checks import page_content
from checks import load_in_browser
from checks import url_reachability
from checks import url_canonicalization
from checks.config import Config
def perform_checks(input_url):
"""
Executes all our URL/site checks and returns a big-ass result dict.
"""
# The sequence of checks to run. Order is important!
# Checks which expand the URLs list must come first.
# After that, dependencies (encoded in the checks) have to be fulfilled.
check_modules = [
('domain_variations', domain_variations),
('http_and_https', http_and_https),
('dns_resolution', dns_resolution),
('url_reachability', url_reachability),
('certificate', certificate),
('url_canonicalization', url_canonicalization),
('page_content', page_content),
('duplicate_content', duplicate_content),
('charset', charset),
('html_head', html_head),
('generator', generator),
('load_in_browser', load_in_browser),
]
results = {}
config = Config(urls=[input_url],
user_agent='Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) ' +
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 ' +
'Safari/537.36 green-spider/0.2')
for check_name, check in check_modules:
checker = check.Checker(config=config,
previous_results=results)
result = checker.run()
results[check_name] = result
# update config for the next check
config = checker.config
logging.debug("config after check %s: %r" % (check_name, config))
return results

@ -0,0 +1,23 @@
class AbstractChecker(object):
"""
Our blueprint for checks
"""
def __init__(self, config, previous_results=None):
self._config = config
# A dictionary of results from previous checkers.
# Key is the name of the checker that has generated the result.
self._previous_results = previous_results
def run(self):
"""Executes the check routine, returns result dict"""
raise NotImplementedError()
@property
def config(self):
return self._config
@property
def previous_results(self):
return self._previous_results

@ -0,0 +1,62 @@
"""
Gathers information on the TLS/SSL certificate used by a server
"""
from urllib.parse import urlparse
import logging
import ssl
from datetime import datetime
from datetime import timezone
from OpenSSL import crypto
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
def run(self):
results = {}
for url in self.config.urls:
if url.startswith('https://'):
results[url] = self.get_certificate(url)
return results
def get_certificate(self, url):
result = {
'exception': None,
'serial_number': None,
'subject': None,
'issuer': None,
'not_before': None,
'not_after': None
}
parsed = urlparse(url)
try:
cert = ssl.get_server_certificate((parsed.hostname, 443))
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
result['serial_number'] = str(x509.get_serial_number())
nb = x509.get_notBefore().decode('utf-8')
na = x509.get_notAfter().decode('utf-8')
# parse '2018 06 27 00 00 00Z'
result['not_before'] = datetime(int(nb[0:4]), int(nb[4:6]), int(nb[6:8]), int(nb[8:10]), int(nb[10:12]), int(nb[12:14]), tzinfo=timezone.utc).isoformat()
result['not_after'] = datetime(int(na[0:4]), int(na[4:6]), int(na[6:8]), int(na[8:10]), int(na[10:12]), int(na[12:14]), tzinfo=timezone.utc).isoformat()
# decode and convert from bytes to unicode
result['subject'] = dict([tuple(map(lambda x: x.decode('utf-8'), tup)) for tup in x509.get_subject().get_components()])
result['issuer'] = dict([tuple(map(lambda x: x.decode('utf-8'), tup)) for tup in x509.get_issuer().get_components()])
except Exception as e:
result['exception'] = {
'type': str(type(e)),
'message': str(e),
}
logging.warning("Error when getting certificate for %s: %r" % (url, e))
return result

@ -0,0 +1,27 @@
from checks import certificate
from checks.config import Config
import unittest
class TestCertificateChecker(unittest.TestCase):
def test_google(self):
url = 'https://www.google.com/'
config = Config(urls=[url])
checker = certificate.Checker(config=config, previous_results={})
result = checker.run()
self.assertIn(url, result)
self.assertIsNone(result[url]['exception'])
self.assertEqual(result[url]['issuer']['O'], 'Google Trust Services')
def test_kaarst(self):
url = 'https://www.gruenekaarst.de/'
config = Config(urls=[url])
checker = certificate.Checker(config=config, previous_results={})
result = checker.run()
self.assertIn(url, result)
self.assertIsNone(result[url]['exception'])
self.assertEqual(result[url]['issuer']['O'], 'COMODO CA Limited')
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,77 @@
"""
Checks which character set a page has.
TODO: Check for http-equiv meta tags like
<meta http-equiv="content-type" content="text/html; charset=iso-8859-1" />
"""
import logging
from bs4 import BeautifulSoup
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
def run(self):
assert 'page_content' in self.previous_results
results = {}
for url in self.config.urls:
results[url] = self.get_charset(url)
return results
def get_charset(self, url):
"""
Expects page_content_dict['content'] to carry the HTML content
"""
page_content = self.previous_results['page_content'][url]
assert 'content' in page_content
assert 'response_headers' in page_content
logging.debug("%r", page_content['response_headers'])
assert 'content-type' in page_content['response_headers']
if page_content['content'] is None:
return
result = {
'meta_charset_tag': None,
'content_type_header_charset': None,
'charset': 'iso-8859-1', # ISO-8859-1 is the default according to https://www.w3.org/International/articles/http-charset/index
'valid': None,
'exception': None,
}
soup = BeautifulSoup(page_content['content'], 'html.parser')
# get response header charset
if ('content-type' in page_content['response_headers']
and 'charset=' in page_content['response_headers']['content-type']):
parts = page_content['response_headers']['content-type'].split("charset=", 1)
result['content_type_header_charset'] = parts[1].lower()
result['charset'] = parts[1].lower()
# get meta tag charset
metatags = soup.find_all('meta')
for tag in metatags:
if 'charset' in tag.attrs:
result['meta_charset_tag'] = tag['charset'].lower()
# meta tag overrules any previous value
result['charset'] = tag['charset'].lower()
# check for charset plausibility (only for most common ones)
if result['charset'] in ('iso-8859-1', 'utf-8'):
try:
_ = page_content['content'].encode(result['charset'])
except UnicodeEncodeError as e:
result['valid'] = False
result['exception'] = str(e)
else:
result['valid'] = True
return result

@ -0,0 +1,49 @@
import httpretty
from httpretty import httprettified
import unittest
from checks import charset
from checks import page_content
from checks.config import Config
@httprettified
class TestCharsetChecker(unittest.TestCase):
def test_http_response(self):
url = 'http://www.example.com/'
httpretty.register_uri(httpretty.GET, url,
body="""<html>
<head>
<meta http-equiv="Content-type" value="text/html; charset=foo">
<meta charset="utf-8">
<title>Hello</title>
</head>
</html>""",
adding_headers={
"Content-Type": "text/html; charset=ISO-8859-1",
})
results = {}
config = Config(urls=[url])
page_content_checker = page_content.Checker(config=config, previous_results={})
results['page_content'] = page_content_checker.run()
self.assertIn(url, results['page_content'])
self.assertIn('response_headers', results['page_content'][url])
self.assertIn('content-type', results['page_content'][url]['response_headers'])
charset_checker = charset.Checker(config=page_content_checker.config, previous_results=results)
result = charset_checker.run()
self.assertIn(url, result)
self.assertEqual(result[url], {
'meta_charset_tag': 'utf-8',
'content_type_header_charset': 'iso-8859-1',
'charset': 'utf-8',
'valid': True,
'exception': None,
})
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,29 @@
class Config(object):
"""
Our configuration to be passed to checks
"""
def __init__(self, urls, user_agent='green-spider/1.0'):
self._urls = set(urls)
self._user_agent = user_agent
def __repr__(self):
return "Config(urls=%r)" % self._urls
@property
def urls(self):
return list(self._urls)
def add_url(self, url):
self._urls.add(url)
def remove_url(self, url):
"""Removes url from urls, if it was in there. Ignores errors."""
try:
self._urls.remove(url)
except KeyError:
pass
@property
def user_agent(self):
return self._user_agent

@ -0,0 +1,55 @@
"""
This check attempts to resolve all hostnames/domains in the input URLs.
URLs which are not resolvable are removed from the config.
"""
import logging
from socket import gethostbyname_ex
from urllib.parse import urlparse
from urllib.parse import urlunparse
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
def run(self):
"""Executes the check routine, returns result dict"""
results = {}
urls = list(self.config.urls)
for url in urls:
parsed = urlparse(url)
results[url] = self.resolve_hostname(parsed.hostname)
# remove URL if non-resolvable
if not results[url]['resolvable']:
self.config.remove_url(url)
return results
def resolve_hostname(self, hostname):
"""
Resolve one to IPv4 address(es)
"""
result = {
'hostname': hostname,
'resolvable': False,
'aliases': [],
'ipv4_addresses': [],
}
try:
hostname, aliases, ipv4_addresses = gethostbyname_ex(hostname)
result['resolvable'] = True
result['aliases'] = aliases
result['ipv4_addresses'] = ipv4_addresses
except Exception as e:
logging.debug("Hostname %s not resolvable. Exception: %r" % (hostname, e))
return result

@ -0,0 +1,44 @@
"""
This adds commonly tried variations of domains/subdomains to the URLs config.
"""
import logging
from urllib.parse import urlparse
from urllib.parse import urlunparse
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
def run(self):
urls = list(self.config.urls)
for url in urls:
parsed = urlparse(url)
hostnames = self.expand_hostname(parsed.hostname)
for hostname in hostnames:
self.config.add_url(urlunparse((parsed.scheme, hostname,
parsed.path, parsed.params, parsed.query, parsed.fragment)))
return None
def expand_hostname(self, hostname):
"""
Create variations of subdomains
"""
hostnames = set()
hostnames.add(hostname)
if hostname.startswith('www.'):
# remove 'www.' prefix
hostnames.add(hostname[4:])
else:
# add 'www.' prefix
hostnames.add('www.' + hostname)
return sorted(list(hostnames))

@ -0,0 +1,107 @@
"""
This checker looks at the similarity between previously downloaded pages
and removes duplicates from the config URLs
"""
import logging
import html_similarity
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
# value above which we consider a page pair a duplicate
similarity_threshold = 0.99999
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
def run(self):
if len(self.config.urls) == 1:
# nothing to do for us
return
urls = list(self.config.urls)
# get content
content = {}
assert 'page_content' in self.previous_results
for url in urls:
page_content = self.previous_results['page_content'][url]
if page_content['content'] is None:
logging.warn("Content for URL %s is None" % url)
content[url] = page_content['content']
pairs = self.compare_pairwise(content)
# remove duplicates
for key in pairs:
if pairs[key]['similarity'] is None:
continue
if pairs[key]['similarity'] > self.similarity_threshold:
# this pair is a duplicate.
# Decide which one to keep
url1, url2 = key.split(" ", 1)
reject = self.select_url_to_reject(url1, url2)
self.config.remove_url(reject)
return pairs
def compare_pairwise(self, content):
# compair pairwise
pairs = {}
for url1 in content:
for url2 in content:
if url1 == url2:
continue
# avoid checking pairs twice
pair_key = " ".join(sorted([url1, url2]))
if pair_key in pairs:
continue
try:
s = html_similarity.similarity(content[url1], content[url2])
logging.debug("Comparing pages for URLs %s and %s: similarity=%s", url1, url2, s)
pairs[pair_key] = {
'similarity': s,
'exception': None,
}
except (AttributeError, ValueError) as e:
logging.error("html_similarity.similarity thre exception for URL pair %s and %s: %s", url1, url2, e)
pairs[pair_key] = {
'similarity': None,
'exception': str(e),
}
return pairs
def select_url_to_reject(self, url1, url2):
"""Determine which of two URLs to keep, which to reject"""
# HTTPS takes precedence
if url1.startswith('https://') and not url2.startswith('https://'):
return url2
elif url2.startswith('https://') and not url1.startswith('https://'):
return url1
# Shorter URL wins
if len(url1) < len(url2):
return url2
elif len(url1) > len(url2):
return url1
# default behaviour
return url1

@ -0,0 +1,76 @@
"""
Checks the 'generator' meta tag and page content properties
to detect well-known content management systems, themes etc.
"""
import logging
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
# IP address of the newthinking GCMS server
gcms_ip = "91.102.13.20"
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
def run(self):
assert 'page_content' in self.previous_results
assert 'html_head' in self.previous_results
results = {}
for url in self.config.urls:
results[url] = self.get_generator(url)
return results
def get_generator(self, url):
page_content = self.previous_results['page_content'][url]
assert 'content' in page_content
assert 'dns_resolution' in self.previous_results
dns_resolution = self.previous_results['dns_resolution']
head = self.previous_results['html_head'][url]
generator = None
if 'generator' in head and head['generator'] is not None:
generator = head['generator'].lower()
if 'typo3' in generator:
generator = 'typo3'
if 'wordpress' in generator:
generator = 'wordpress'
if 'drupal' in generator:
generator = 'drupal'
if 'joomla' in generator:
generator = 'joomla'
# Qualify certain CMS flavours in more detail
if generator == "typo3":
# Typo3-Gruene advertises in the page content
if 'typo3-gruene.de' in page_content['content']:
generator = "typo3-gruene"
# newthinking GCMS in some page hrefs
elif 'ntc_gcms' in page_content['content']:
generator = "typo3-gcms"
# check if one of the IPs matches the well-known GCMS Server IP
elif url in dns_resolution:
for addr in dns_resolution[url]['ipv4_addresses']:
if addr == self.gcms_ip:
generator = "typo3-gcms"
elif 'Urwahl3000' in page_content['content']:
generator = "wordpress-urwahl"
elif ('josephknowsbest' in page_content['content'] or
'Joseph-knows-best' in page_content['content']):
generator = "wordpress-josephknowsbest"
elif 'wordpress' in page_content['content']:
generator = "wordpress"
return generator

@ -0,0 +1,152 @@
"""
Extracts information from the html <head>, like existence and value
of certain meta tags, link tags, title, etc.
"""
import logging
import re
from urllib.parse import urljoin
from urllib.parse import urlparse
from bs4 import BeautifulSoup
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
def run(self):
results = {}
for url in self.config.urls:
results[url] = self.get_content(url)
return results
def get_content(self, url):
"""
Expects page_content_dict['content'] to carry the HTML content
"""
page_content = self.previous_results['page_content'][url]
assert 'content' in page_content
assert 'response_headers' in page_content
assert 'content-type' in page_content['response_headers']
if page_content['content'] is None:
return
soup = BeautifulSoup(page_content['content'], 'html.parser')
head = soup.find('head')
result = {
'title': self.get_title(head),
'link_canonical': self.get_link_canonical(head, url),
'link_rss_atom': self.get_link_rss_atom(head, url),
'link_icon': self.get_link_icon(head, url),
'generator': self.get_generator(head),
'opengraph': self.get_opengraph(head),
'viewport': self.get_viewport(head),
}
return result
def get_title(self, head):
"""Extract and clean up page title"""
if head is None:
return
title = None
tag = head.find('title')
if tag is None:
return
title = tag.get_text()
# clean up
title = title.replace(u'\u00a0', ' ')
title = title.replace(' ', ' ')
title = title.strip()
return title
def get_link_canonical(self, head, url):
if head is None:
return
link = head.find('link', rel='canonical')
if link:
return urljoin(url, link.get('href'))
def get_link_rss_atom(self, head, url):
if head is None:
return
hrefs = []
rss_links = head.find_all('link', type='application/rss+xml')
atom_links = head.find_all('link', type='application/atom+xml')
if rss_links:
for link in rss_links:
hrefs.append(link.get('href'))
if atom_links:
for link in rss_links:
hrefs.append(link.get('href'))
# make URLs absolute
for i in range(len(hrefs)):
parsed = urlparse(hrefs[i])
if parsed.scheme == '':
hrefs[i] = urljoin(url, hrefs[i])
return hrefs
def get_link_icon(self, head, url):
if head is None:
return
tag = head.find('link', rel=lambda x: x and x.lower() == 'icon')
if tag:
return urljoin(url, tag.get('href'))
tag = head.find('link', rel=lambda x: x and x.lower() == 'shortcut icon')
if tag:
return urljoin(url, tag.get('href'))
def get_generator(self, head):
if head is None:
return
tags = head.select('[name=generator]')
if tags:
return tags[0].get('content')
def get_opengraph(self, head):
if head is None:
return
# we find tags by matching this property/itemprop value regex
property_re = re.compile('^og:')
opengraph = set()
for tag in head.find_all(property=property_re):
opengraph.add(tag.get('property'))
for tag in head.find_all(itemprop=property_re):
opengraph.add(tag.get('itemprop'))
opengraph = sorted(list(opengraph))
if opengraph != []:
return opengraph
def get_viewport(self, head):
if head is None:
return
tags = head.select('[name=viewport]')
if tags:
return tags[0].get('content')

@ -0,0 +1,27 @@
"""
This adds, for every HTTP URL, the HTTPS counterpart,
and vice versa, to config.urls
So it doesn't actually perform tests. It only expands the
URLs to test by other checks.
"""
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
def run(self):
"""
Adds URLs to config.urls, returns nothing
"""
for url in self.config.urls:
if url.startswith('https://'):
self.config.add_url('http://' + url[8:])
elif url.startswith('http://'):
self.config.add_url('https://' + url[7:])
return None

@ -0,0 +1,134 @@
"""
Collects information by loading pages in a browser.
Information includes:
- whether the document width adapts well to viewports as little as 360 pixels wide
- whether javascript errors or errors from missing resources occur
- collects CSS font-family properties in use
"""
import logging
import time
from selenium import webdriver
from selenium.common.exceptions import StaleElementReferenceException
from selenium.common.exceptions import TimeoutException
import tenacity
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
page_load_timeout = 20
# sizes we check for (width, height)
sizes = (
(360, 640), # rather old smartphone
(768, 1024), # older tablet or newer smartphone
(1024, 768), # older desktop or horiz. tablet
(1920, 1080), # Full HD horizontal
)
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
# Our selenium user agent using Chrome headless as an engine
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-extensions')
self.driver = webdriver.Chrome(options=chrome_options)
self.driver.set_page_load_timeout(self.page_load_timeout)
def run(self):
results = {}
for url in self.config.urls:
results[url] = {
'sizes': None,
'min_document_width': None,
'logs': None,
'font_families': None,
}
# responsive check
try:
sizes = self.check_responsiveness(url)
results[url] = {
'sizes': sizes,
'min_document_width': min([s['document_width'] for s in sizes]),
'logs': self.capture_log(),
}
except TimeoutException as e:
logging.warn("TimeoutException when checking responsiveness for %s: %s" % (url, e))
pass
except tenacity.RetryError as re:
logging.warn("RetryError when checking responsiveness for %s: %s" % (url, re))
pass
# CSS collection
font_families = None
try:
elements = self.driver.find_elements_by_xpath("//*")
font_families = set()
for element in elements:
try:
font_family = element.value_of_css_property('font-family')
if font_family is None:
continue
font_families.add(font_family.lower())
except StaleElementReferenceException as e:
logging.warn("StaleElementReferenceException when collecting CSS properties for %s: %s" % (url, e))
continue
results[url]['font_families'] = sorted(list(font_families))
except TimeoutException as e:
logging.warn("TimeoutException when collecting CSS elements for %s: %s" % (url, e))
pass
self.driver.quit()
return results
@tenacity.retry(stop=tenacity.stop_after_attempt(3),
retry=tenacity.retry_if_exception_type(TimeoutException))
def check_responsiveness(self, url):
result = []
# set window to the first size initially
self.driver.set_window_size(self.sizes[0][0], self.sizes[0][1])
self.driver.get(url)
# give the page some time to load
time.sleep(10)
for (width, height) in self.sizes:
self.driver.set_window_size(width, height)
# wait for re-render/re-flow
time.sleep(1.0)
doc_width = self.driver.execute_script("return document.body.scrollWidth")
result.append({
'viewport_width': width,
'document_width': int(doc_width),
})
return result
def capture_log(self):
"""
Returns log elements with level "SEVERE"
"""
entries = []
for entry in self.driver.get_log('browser'):
if entry['level'] in ('WARNING', 'SEVERE'):
entries.append(entry)
return entries

@ -0,0 +1,94 @@
"""
This check downloads the HTML page for each URL
"""
import logging
import requests
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
# connection timeout (seconds)
CONNECT_TIMEOUT = 10
# response timeout (seconds)
READ_TIMEOUT = 20
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
def run(self):
results = {}
self.headers = {
"User-Agent": self.config.user_agent,
}
# copy URLs, as we may be manipulating self.config.urls in the loop
url = list(self.config.urls)
for url in self.config.urls:
result = self.download_page(url)
results[url] = result
# remove bad URLs from config, to avoid later checks using them
if 'exception' in result and result['exception'] is not None:
self.config.remove_url(url)
return results
def download_page(self, url):
result = {
'url': url,
'content': None,
'content_type': None,
'content_length': None,
'status_code': None,
'response_headers': None,
'duration': None,
'exception': None,
}
try:
r = requests.get(url,
headers=self.headers,
timeout=(self.CONNECT_TIMEOUT, self.READ_TIMEOUT))
result['url'] = r.url
result['status_code'] = r.status_code
result['content'] = r.text
result['content_length'] = len(r.text)
result['response_headers'] = self.get_headers(r.headers)
result['duration'] = round(r.elapsed.total_seconds() * 1000)
if r.headers.get("content-type") is not None:
result['content_type'] = r.headers.get("content-type").split(";")[0].strip()
except requests.exceptions.ConnectionError as exc:
logging.error(str(exc) + " " + url)
result['exception'] = "connection"
except requests.exceptions.ReadTimeout as exc:
logging.error(str(exc) + " " + url)
result['exception'] = "read_timeout"
except requests.exceptions.Timeout as exc:
logging.error(str(exc) + " " + url)
result['exception'] = "connection_timeout"
except Exception as exc:
logging.error(str(exc) + " " + url)
result['exception'] = "%s %s" % (str(type(exc)), exc)
return result
def get_headers(self, headers):
"""
Transforms CaseInsensitiveDict into dict with lowercase keys
"""
out = {}
for key in headers:
out[key.lower()] = headers[key]
return out

@ -0,0 +1,13 @@
"""
This check verifies whether there is a single URL
or several variants left at this point.
"""
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
def run(self):
return self.config.urls

@ -0,0 +1,104 @@
"""
This check verifies whether the urls in config are reachable.
Some additional information regarding redirects and SSL problems
are also recorded and returned as results.
Non-accessible URLs are removed from config.urls.
A redirect to facebook.com is not considered reachable, as that
leads to a different website in the sense of this system.
TODO: Parallelize the work done in this test
"""
import logging
from urllib.parse import urlparse
import requests
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
def run(self):
headers = {
"User-Agent": self.config.user_agent
}
results = {}
urls = list(self.config.urls)
for url in urls:
logging.debug("Checking URL reachability for %s", url)
result = {
"url": url,
"redirect_history": [],
"status": None,
"exception": None,
"duration": None,
}
# Perform HEAD requests, recording redirect log
try:
r = requests.head(url, headers=headers, allow_redirects=True)
result['status'] = r.status_code
result['duration'] = round(r.elapsed.total_seconds() * 1000)
if len(r.history):
result['redirect_history'] = self.expand_history(r.history)
logging.debug("Redirects: %r", result['redirect_history'])
if r.url == url:
logging.debug("URL: %s - status %s", url, r.status_code)
else:
logging.debug("URL: %s - status %s - redirects to %s", url,
r.status_code, r.url)
# remove source URL, add target URL to config.urls
self.config.remove_url(url)
self.config.add_url(r.url)
# remove 404 etc
if r.status_code > 400:
self.config.remove_url(url)
except Exception as exc:
logging.info("Exception for URL %s: %s %s", url, str(type(exc)), exc)
result['exception'] = {
'type': str(type(exc)),
'message': str(exc),
}
# remove URL to prevent further checks on unreachable URL
self.config.remove_url(url)
# if redirects end in www.facebook.com or www.denic.de, remove this URL again
# remove if redirect target is facebook
if result['exception'] is not None and result['redirect_history'] is not None and len(result['redirect_history']) > 0:
parsed = urlparse(result['redirect_history'][-1]['redirect_to'])
if parsed.hostname in ('www.facebook.com', 'www.denic.de', 'sedo.com'):
result[url]['exception'] = {
'type': 'Bad target domain',
'message': 'The URL redirects to %s, which is unsupported by green-spider as it doesn\'t qualify as an owned website' % parsed.hostname,
}
self.config.remove_url(url)
results[url] = result
return results
def expand_history(self, history):
"""Extracts primitives from a list of requests.Response objects"""
items = []
for h in history:
item = {
'status': h.status_code,
'duration': round(h.elapsed.total_seconds() * 1000),
'redirect_to': h.headers['location'],
}
items.append(item)
return items

@ -0,0 +1,71 @@
import httpretty
from httpretty import httprettified
import unittest
from checks import url_reachability
from checks.config import Config
@httprettified
class TestCharsetChecker(unittest.TestCase):
def test_success(self):
url = 'http://www.example.com/'
httpretty.register_uri(httpretty.HEAD, url,
status=200, body="<html></html>")
config = Config(urls=[url])
checker = url_reachability.Checker(config=config, previous_results={})
result = checker.run()
self.assertEqual(result[url]['url'], url)
self.assertEqual(result[url]['redirect_history'], [])
self.assertEqual(result[url]['status'], 200)
self.assertIsNone(result[url]['exception'])
self.assertTrue(0 < result[url]['duration'] < 100)
def test_redirect(self):
url = 'http://www.example.com/'
url2 = 'http://www2.example.com/'
httpretty.register_uri(httpretty.HEAD, url,
status=302, body="",
adding_headers={"Location": url2})
httpretty.register_uri(httpretty.HEAD, url2,
status=200, body="<html></html>")
config = Config(urls=[url])
checker = url_reachability.Checker(config=config, previous_results={})
result = checker.run()
self.assertIn(url, result)
self.assertEqual(result[url]['url'], url)
self.assertEqual(result[url]['status'], 200)
self.assertIsNone(result[url]['exception'])
self.assertTrue(0 < result[url]['duration'] < 100)
self.assertEqual(len(result[url]['redirect_history']), 1)
self.assertEqual(result[url]['redirect_history'][0]['status'], 302)
self.assertEqual(result[url]['redirect_history'][0]['redirect_to'], url2)
def test_notfound(self):
url = 'http://www.example.com/'
httpretty.register_uri(httpretty.HEAD, url,
status=404, body="<html><body>Not found</body></html>")
config = Config(urls=[url])
checker = url_reachability.Checker(config=config, previous_results={})
result = checker.run()
self.assertEqual(result[url]['url'], url)
self.assertEqual(result[url]['redirect_history'], [])
self.