WIP commit

This commit is contained in:
Marian Steinbach 2018-09-27 00:34:54 +02:00
parent d8f72f974e
commit 7f29091690
20 changed files with 801 additions and 169 deletions

View file

@ -4,3 +4,4 @@ docs
secrets
temp
venv
/export-*

1
.gitignore vendored
View file

@ -7,3 +7,4 @@ __pycache__
.vscode/settings.json
webapp/dist/bundle.js
dev-shm
/export-*

View file

@ -1,12 +1,12 @@
FROM python:3.6-alpine3.7
FROM python:3.6-alpine3.8
# Note: we pin selenium to 3.8.0 because of https://github.com/SeleniumHQ/selenium/issues/5296
RUN echo "http://dl-4.alpinelinux.org/alpine/v3.7/main" >> /etc/apk/repositories && \
echo "http://dl-4.alpinelinux.org/alpine/v3.7/community" >> /etc/apk/repositories && \
apk update && \
apk --no-cache add chromium chromium-chromedriver python3-dev build-base git && \
apk --no-cache add chromium chromium-chromedriver python3-dev build-base git py3-lxml libxml2 libxml2-dev libxslt libxslt-dev && \
pip3 install --upgrade pip && \
pip3 install selenium==3.8.0 GitPython PyYAML beautifulsoup4==4.6.0 requests==2.18.4 responses==0.9.0 smmap2==2.0.3 urllib3==1.22 google-cloud-datastore==1.7.0 tenacity==5.0.2 && \
pip3 install selenium==3.8.0 GitPython PyYAML beautifulsoup4==4.6.0 html-similarity==0.3.2 requests==2.18.4 responses==0.9.0 smmap2==2.0.3 urllib3==1.22 google-cloud-datastore==1.7.0 tenacity==5.0.2 && \
apk del python3-dev build-base
ADD spider.py /
@ -14,6 +14,7 @@ ADD spider_test.py /
ADD data_export.py /
ADD config /config
ADD jobs /jobs
ADD checks /checks
ENTRYPOINT ["python3"]
CMD ["/spider.py"]

View file

@ -12,7 +12,7 @@ spiderjobs: dockerimage
-v $(PWD)/secrets:/secrets \
quay.io/netzbegruenung/green-spider:latest spider.py \
--credentials-path /secrets/datastore-writer.json \
--loglevel debug \
--loglevel info \
jobs
# Run spider in docker image
@ -23,9 +23,17 @@ spider: dockerimage
-v $(PWD)/secrets:/secrets \
quay.io/netzbegruenung/green-spider:latest spider.py \
--credentials-path /secrets/datastore-writer.json \
--loglevel info \
--loglevel debug \
spider
export: dockerimage
docker run --rm -ti \
-v $(PWD)/export-json:/out \
-v $(PWD)/secrets:/secrets \
-v $(PWD)/export-siteicons:/icons \
quay.io/netzbegruenung/green-spider:latest \
data_export.py /secrets/datastore-reader.json
# run spider tests
test: dockerimage
docker run --rm -ti quay.io/netzbegruenung/green-spider:latest /spider_test.py

View file

@ -4,32 +4,56 @@ The checks module contains the individual checks we perform with a page
import logging
from checks import subdomain_variations
#from checks import home_url_canonicalization
#from checks import http_and_https
from checks import charset
from checks import dns_resolution
from checks import duplicate_content
from checks import domain_variations
from checks import generator
from checks import html_head
from checks import http_and_https
from checks import page_content
from checks import url_reachability
from checks import url_canonicalization
from checks.config import Config
def perform_checks(input_url):
"""
Executes the tests in the right order
Executes all our URL/site checks and returns a big-ass result dict.
"""
# The sequence of checks to run. Order is important!
# Checks which expand the URLs list must come first.
# After that, dependencies (encoded in the checks) have to be fulfilled.
check_modules = [
('subdomain_variations', subdomain_variations),
#("home_url_canonicalization", home_url_canonicalization),
#("http_and_https", http_and_https),
('domain_variations', domain_variations),
('http_and_https', http_and_https),
('dns_resolution', dns_resolution),
('url_reachability', url_reachability),
('url_canonicalization', url_canonicalization),
('page_content', page_content),
('duplicate_content', duplicate_content),
('charset', charset),
('html_head', html_head),
('generator', generator),
]
result = {}
results = {}
config = Config(urls=[input_url])
config = Config(urls=[input_url],
user_agent='Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) ' +
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 ' +
'Safari/537.36 green-spider/0.2')
for check_name, check in check_modules:
checker = check.Checker(config)
result[check_name] = checker.run()
checker = check.Checker(config=config,
previous_results=results)
result = checker.run()
results[check_name] = result
# update config for the next check
config = checker.config
logging.debug("config after check %s: %r" % (check_name, config))
return result
return results

View file

@ -3,13 +3,21 @@ class AbstractChecker(object):
Our blueprint for checks
"""
def __init__(self, config):
def __init__(self, config, previous_results=None):
self._config = config
# A dictionary of results from previous checkers.
# Key is the name of the checker that has generated the result.
self._previous_results = previous_results
def run(self):
"""Executes the check routine, returns result dict"""
raise NotImplementedError()
@property
def config(self):
return self._config
return self._config
@property
def previous_results(self):
return self._previous_results

71
checks/charset.py Normal file
View file

@ -0,0 +1,71 @@
"""
Checks which character set a page has.
"""
import logging
from bs4 import BeautifulSoup
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
def run(self):
results = {}
for url in self.config.urls:
results[url] = self.get_charset(url)
return results
def get_charset(self, url):
"""
Expects page_content_dict['content'] to carry the HTML content
"""
page_content = self.previous_results['page_content'][url]
assert 'content' in page_content
assert 'response_headers' in page_content
logging.debug("%r", page_content['response_headers'])
assert 'content-type' in page_content['response_headers']
if page_content['content'] is None:
return
result = {
'meta_charset_tag': None,
'content_type_header_charset': None,
'charset': 'iso-8859-1', # ISO-8859-1 is the default according to https://www.w3.org/International/articles/http-charset/index
'valid': None,
}
soup = BeautifulSoup(page_content['content'], 'html.parser')
# get response header charset
if ('content-type' in page_content['response_headers']
and 'charset=' in page_content['response_headers']['content-type']):
parts = page_content['response_headers']['content-type'].split("charset=", 1)
result['content_type_header_charset'] = parts[1].lower()
result['charset'] = parts[1].lower()
# get meta tag charset
metatags = soup.find_all('meta')
for tag in metatags:
if 'charset' in tag.attrs:
result['meta_charset_tag'] = tag['charset'].lower()
# meta tag overrules any previous value
result['charset'] = tag['charset'].lower()
# check for charset plausibility (only for most common ones)
if result['charset'] in ('iso-8859-1', 'utf-8'):
try:
_ = page_content['content'].encode(result['charset'])
except UnicodeDecodeError:
result['valid'] = False
else:
result['valid'] = True
return result

View file

@ -3,8 +3,9 @@ class Config(object):
Our configuration to be passed to checks
"""
def __init__(self, urls):
def __init__(self, urls, user_agent):
self._urls = set(urls)
self._user_agent = user_agent
def __repr__(self):
return "Config(urls=%r)" % self._urls
@ -15,3 +16,14 @@ class Config(object):
def add_url(self, url):
self._urls.add(url)
def remove_url(self, url):
"""Removes url from urls, if it was in there. Ignores errors."""
try:
self._urls.remove(url)
except KeyError:
pass
@property
def user_agent(self):
return self._user_agent

73
checks/dns_resolution.py Normal file
View file

@ -0,0 +1,73 @@
"""
This check attempts to resolve all hostnames/domains in the input URLs.
URLs which are not resolvable are removed from the config.
"""
import logging
from socket import gethostbyname_ex
from urllib.parse import urlparse
from urllib.parse import urlunparse
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
def run(self):
"""Executes the check routine, returns result dict"""
results = {}
urls = list(self.config.urls)
for url in urls:
parsed = urlparse(url)
results[url] = self.resolve_hostname(parsed.hostname)
# remove URL if non-resolvable
if not results[url]['resolvable']:
self.config.remove_url(url)
return results
def expand_hostname(self, hostname):
"""
Create variations of subdomains
"""
hostnames = set()
hostnames.add(hostname)
if hostname.startswith('www.'):
# remove 'www.' prefix
hostnames.add(hostname[4:])
else:
# add 'www.' prefix
hostnames.add('www.' + hostname)
return sorted(list(hostnames))
def resolve_hostname(self, hostname):
"""
Resolve one to IPv4 address(es)
"""
result = {
'hostname': hostname,
'resolvable': False,
'aliases': [],
'ipv4_addresses': [],
}
try:
hostname, aliases, ipv4_addresses = gethostbyname_ex(hostname)
result['resolvable'] = True
result['aliases'] = aliases
result['ipv4_addresses'] = ipv4_addresses
except Exception as e:
logging.debug("Hostname %s not resolvable. Exception: %r" % (hostname, e))
return result

View file

@ -0,0 +1,44 @@
"""
This adds commonly tried variations of domains/subdomains to the URLs config.
"""
import logging
from urllib.parse import urlparse
from urllib.parse import urlunparse
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
def run(self):
urls = list(self.config.urls)
for url in urls:
parsed = urlparse(url)
hostnames = self.expand_hostname(parsed.hostname)
for hostname in hostnames:
self.config.add_url(urlunparse((parsed.scheme, hostname,
parsed.path, parsed.params, parsed.query, parsed.fragment)))
return None
def expand_hostname(self, hostname):
"""
Create variations of subdomains
"""
hostnames = set()
hostnames.add(hostname)
if hostname.startswith('www.'):
# remove 'www.' prefix
hostnames.add(hostname[4:])
else:
# add 'www.' prefix
hostnames.add('www.' + hostname)
return sorted(list(hostnames))

View file

@ -0,0 +1,98 @@
"""
This checker looks at the similarity between previously downloaded pages
and removes duplicates from the config URLs
"""
import logging
import html_similarity
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
# value above which we consider a page pair a duplicate
similarity_threshold = 0.99999
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
def run(self):
if len(self.config.urls) == 1:
# nothing to do for us
return
urls = list(self.config.urls)
# get content
content = {}
assert 'page_content' in self.previous_results
for url in urls:
page_content = self.previous_results['page_content'][url]
if page_content['content'] is None:
logging.warn("Content for URL %s is None" % url)
content[url] = page_content['content']
pairs = self.compare_pairwise(content)
# remove duplicates
for key in pairs:
if pairs[key]['similarity'] > self.similarity_threshold:
# this pair is a duplicate.
# Decide which one to keep
url1, url2 = key.split(" ", 1)
reject = self.select_url_to_reject(url1, url2)
self.config.remove_url(reject)
return pairs
def compare_pairwise(self, content):
# compair pairwise
pairs = {}
for url1 in content:
for url2 in content:
if url1 == url2:
continue
# avoid checking pairs twice
pair_key = " ".join(sorted([url1, url2]))
if pair_key in pairs:
continue
s = html_similarity.similarity(content[url1], content[url2])
logging.debug("Comparing pages for URLs %s and %s: similarity=%s", url1, url2, s)
pairs[pair_key] = {
'similarity': s,
}
return pairs
def select_url_to_reject(self, url1, url2):
"""Determine which of two URLs to keep, which to reject"""
# HTTPS takes precedence
if url1.startswith('https://') and not url2.startswith('https://'):
return url2
elif url2.startswith('https://') and not url1.startswith('https://'):
return url1
# Shorter URL wins
if len(url1) < len(url2):
return url2
elif len(url1) > len(url2):
return url1
# default behaviour
return url1

75
checks/generator.py Normal file
View file

@ -0,0 +1,75 @@
"""
Checks the 'generator' meta tag and page content properties
to detect well-known content management systems, themes etc.
"""
import logging
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
# IP address of the newthinking GCMS server
gcms_ip = "91.102.13.20"
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
def run(self):
assert 'page_content' in self.previous_results
assert 'html_head' in self.previous_results
results = {}
for url in self.config.urls:
results[url] = self.get_generator(url)
return results
def get_generator(self, url):
page_content = self.previous_results['page_content'][url]
assert 'content' in page_content
assert 'dns_resolution' in self.previous_results
dns_resolution = self.previous_results['dns_resolution']
head = self.previous_results['html_head'][url]
generator = None
if 'generator' in head and head['generator'] is not None:
generator = head['generator'].lower()
if 'typo3' in generator:
generator = 'typo3'
if 'wordpress' in generator:
generator = 'wordpress'
if 'drupal' in generator:
generator = 'drupal'
if 'joomla' in generator:
generator = 'joomla'
# check content
# Qualify certain CMS flavours in more detail
if generator == "typo3":
# Typo3-Gruene advertises in the page content
if 'typo3-gruene.de' in page_content['content']:
generator = "typo3-gruene"
# newthinking GCMS in some page hrefs
elif 'ntc_gcms' in page_content['content']:
generator = "typo3-gcms"
# check if one of the IPs matches the well-known GCMS Server IP
elif url in dns_resolution:
for addr in dns_resolution[url]['ipv4_addresses']:
if addr == self.gcms_ip:
generator = "typo3-gcms"
elif 'Urwahl3000' in page_content['content']:
generator = "wordpress-urwahl"
# No generator Tag. Use HTML content.
elif ('josephknowsbest' in page_content['content'] or
'Joseph-knows-best' in page_content['content']):
generator = "wordpress-josephknowsbest"
elif 'wordpress' in page_content['content']:
generator = "wordpress"
return generator

144
checks/html_head.py Normal file
View file

@ -0,0 +1,144 @@
"""
Extracts information from the html <head>, like existence and value
of certain meta tags, link tags, title, etc.
"""
import logging
import re
from urllib.parse import urljoin
from urllib.parse import urlparse
from bs4 import BeautifulSoup
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
def run(self):
results = {}
for url in self.config.urls:
results[url] = self.get_content(url)
return results
def get_content(self, url):
"""
Expects page_content_dict['content'] to carry the HTML content
"""
page_content = self.previous_results['page_content'][url]
assert 'content' in page_content
assert 'response_headers' in page_content
logging.debug("%r", page_content['response_headers'])
assert 'content-type' in page_content['response_headers']
if page_content['content'] is None:
return
soup = BeautifulSoup(page_content['content'], 'html.parser')
head = soup.find('head')
result = {
'title': self.get_title(head),
'link_canonical': self.get_link_canonical(head, url),
'link_rss_atom': self.get_link_rss_atom(head, url),
'link_icon': self.get_link_icon(head, url),
'generator': self.get_generator(head),
'get_opengraph': self.get_opengraph(head),
}
return result
def get_title(self, head):
"""Extract and clean up page title"""
if head is None:
return
title = None
tag = head.find('title')
if tag is None:
return
title = tag.get_text()
# clean up
title = title.replace(u'\u00a0', ' ')
title = title.replace(' ', ' ')
title = title.strip()
return title
def get_link_canonical(self, head, url):
if head is None:
return
link = head.find('link', rel='canonical')
if link:
return urljoin(url, link.get('href'))
def get_link_rss_atom(self, head, url):
if head is None:
return
hrefs = []
rss_links = head.find_all('link', type='application/rss+xml')
atom_links = head.find_all('link', type='application/atom+xml')
if rss_links:
for link in rss_links:
hrefs.append(link.get('href'))
if atom_links:
for link in rss_links:
hrefs.append(link.get('href'))
# make URLs absolute
for i in range(len(hrefs)):
parsed = urlparse(hrefs[i])
if parsed.scheme == '':
hrefs[i] = urljoin(url, hrefs[i])
return hrefs
def get_link_icon(self, head, url):
if head is None:
return
tag = head.find('link', rel=lambda x: x and x.lower() == 'icon')
if tag:
return urljoin(url, tag.get('href'))
tag = head.find('link', rel=lambda x: x and x.lower() == 'shortcut icon')
if tag:
return urljoin(url, tag.get('href'))
def get_generator(self, head):
if head is None:
return
tags = head.select('[name=generator]')
if tags:
return tags[0].get('content')
def get_opengraph(self, head):
if head is None:
return
# we find tags by matching this property/itemprop value regex
property_re = re.compile('^og:')
opengraph = set()
for tag in head.find_all(property=property_re):
opengraph.add(tag.get('property'))
for tag in head.find_all(itemprop=property_re):
opengraph.add(tag.get('itemprop'))
opengraph = sorted(list(opengraph))
if opengraph != []:
return opengraph

27
checks/http_and_https.py Normal file
View file

@ -0,0 +1,27 @@
"""
This adds, for every HTTP URL, the HTTPS counterpart,
and vice versa, to config.urls
So it doesn't actually perform tests. It only expands the
URLs to test by other checks.
"""
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
def run(self):
"""
Adds URLs to config.urls, returns nothing
"""
for url in self.config.urls:
if url.startswith('https://'):
self.config.add_url('http://' + url[8:])
elif url.startswith('http://'):
self.config.add_url('https://' + url[7:])
return None

78
checks/page_content.py Normal file
View file

@ -0,0 +1,78 @@
"""
This check downloads the HTML page for each URL
"""
import logging
import requests
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
# connection timeout (seconds)
CONNECT_TIMEOUT = 10
# response timeout (seconds)
READ_TIMEOUT = 20
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
def run(self):
results = {}
self.headers = {
"User-Agent": self.config.user_agent,
}
# copy URLs, as we may be manipulating self.config.urls in the loop
url = list(self.config.urls)
for url in self.config.urls:
result = self.download_page(url)
results[url] = result
# remove bad URLs from config, to avoid later checks using them
if 'exception' in result and result['exception'] is not None:
self.config.remove_url(url)
return results
def download_page(self, url):
result = {
'url': url,
'content': None,
'status_code': None,
'response_headers': None,
'duration': None,
'exception': None,
}
try:
r = requests.get(url,
headers=self.headers,
timeout=(self.CONNECT_TIMEOUT, self.READ_TIMEOUT))
result['status_code'] = r.status_code
result['content'] = r.text
result['response_headers'] = r.headers
result['duration'] = round(r.elapsed.total_seconds() * 1000)
except requests.exceptions.ConnectionError as exc:
logging.error(str(exc) + " " + url)
result['exception'] = "connection"
except requests.exceptions.ReadTimeout as exc:
logging.error(str(exc) + " " + url)
result['exception'] = "read_timeout"
except requests.exceptions.Timeout as exc:
logging.error(str(exc) + " " + url)
result['exception'] = "connection_timeout"
except Exception as exc:
logging.error(str(exc) + " " + url)
result['exception'] = "%s %s" % (str(type(exc)), exc)
return result

View file

@ -1,84 +0,0 @@
"""
This check makes sure that commmonly used variations of a (sub)domain are resolvable.
Example: input_url = 'http://example.com'
will check: ['example.com', 'www.example.com']
Resolvable subdomains are added to config.urls.
Details on the resolution are returns as a result from the run() method.
"""
import logging
from socket import gethostbyname_ex
from urllib.parse import urlparse
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
def __init__(self, config):
super().__init__(config)
def run(self):
"""Executes the check routine, returns result dict"""
logging.debug("subdomain_variations.Checker.run() called with Config: %r" % self.config)
hostnames = self.expand_hostnames()
results = self.resolve_hostnames(hostnames)
# pass resolvable hostnames on as URLs for further checks
for item in results:
if item['resolvable']:
self.config.add_url('http://%s/' % item['hostname'])
return results
def expand_hostnames(self):
"""
Create variations of subdomains
"""
hostnames = set()
for url in self.config.urls:
parsed = urlparse(url)
hostnames.add(parsed.hostname)
if parsed.hostname.startswith('www.'):
# remove 'www.' prefix
hostnames.add(parsed.hostname[4:])
else:
# add 'www.' prefix
hostnames.add('www.' + parsed.hostname)
return sorted(list(hostnames))
def resolve_hostname(self, hostname):
"""
Resolve one to IPv4 address(es)
"""
result = {
'hostname': hostname,
'resolvable': False,
'aliases': [],
'ipv4_addresses': [],
}
try:
hostname, aliases, ipv4_addresses = gethostbyname_ex(hostname)
result['resolvable'] = True
result['aliases'] = aliases
result['ipv4_addresses'] = ipv4_addresses
except Exception as e:
logging.debug("Hostname %s not resolvable. Exception: %r" % (hostname, e))
return result
def resolve_hostnames(self, hostnames):
result = []
for hostname in hostnames:
result.append(self.resolve_hostname(hostname))
return result

View file

@ -0,0 +1,13 @@
"""
This check verifies whether there is a single URL
or several variants left at this point.
"""
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
def run(self):
return self.config.urls

View file

@ -0,0 +1,85 @@
"""
This check verifies whether the urls in config are reachable.
Some additional information regarding redirects and SSL problems
are also recorded and returned as results.
Non-accessible URLs are removed from config.urls.
TODO: Parallelize the work done in this test
"""
import logging
import requests
from checks.abstract_checker import AbstractChecker
class Checker(AbstractChecker):
def __init__(self, config, previous_results=None):
super().__init__(config, previous_results)
def run(self):
headers = {
"User-Agent": self.config.user_agent
}
results = {}
urls = list(self.config.urls)
for url in urls:
logging.debug("Checking URL reachability for %s", url)
result = {
"url": url,
"redirect_history": None,
"status": None,
"exception": None,
"duration": None,
}
# Perform HEAD requests, recording redirect log
try:
r = requests.head(url, headers=headers, allow_redirects=True)
result['status'] = r.status_code
result['duration'] = round(r.elapsed.total_seconds() * 1000)
if len(r.history):
result['redirect_history'] = self.expand_history(r.history)
logging.debug("Redirects: %r", result['redirect_history'])
if r.url == url:
logging.debug("URL: %s - status %s", url, r.status_code)
else:
logging.debug("URL: %s - status %s - redirects to %s", url,
r.status_code, r.url)
# remove source URL, add target URL to config.urls
self.config.remove_url(url)
self.config.add_url(r.url)
except Exception as exc:
logging.info("Exception for URL %s: %s %s", url, str(type(exc)), exc)
result['exception'] = {
'type': str(type(exc)),
'message': str(exc),
}
# remove URL to prevent further checks on unreachable URL
self.config.remove_url(url)
results[url] = result
return results
def expand_history(self, history):
"""Extracts primitives from a list of requests.Response objects"""
items = []
for h in history:
item = {
'status': h.status_code,
'duration': round(h.elapsed.total_seconds() * 1000),
'redirect_to': h.headers['location'],
}
items.append(item)
return items

View file

@ -152,7 +152,8 @@ def get_job_from_queue(datastore_client):
out = None
with datastore_client.transaction():
query = datastore_client.query(kind=config.JOB_DATASTORE_KIND)
query = datastore_client.query(kind=config.JOB_DATASTORE_KIND,
order=['index'])
for entity in query.fetch(limit=1):
logging.debug("Got job: %s", entity)
out = dict(entity)

View file

@ -8,6 +8,7 @@ import re
import statistics
import time
from datetime import datetime
from pprint import pprint
from socket import gethostbyname_ex
from urllib.parse import urljoin
from urllib.parse import urlparse
@ -26,23 +27,6 @@ import checks
DATASTORE_CLIENT = None
def reduce_urls(urllist):
"""
Reduce a list of urls with metadata by eliminating those
that either don't work or lead somewhere else
"""
targets = set()
for url in urllist:
if url['error'] is not None:
continue
if url['redirects_to'] is not None:
targets.add(url['redirects_to'])
else:
targets.add(url['url'])
return sorted(list(targets))
def normalize_title(title):
"""
Removes garbage from HTML page titles
@ -177,7 +161,7 @@ def collect_ipv4_addresses(hostname_results):
Return list of unique IPv4 addresses
"""
ips = set()
for item in hostname_results:
for item in hostname_results.items():
if 'ipv4_addresses' not in item:
continue
ips = ips | set(item['ipv4_addresses']) # union
@ -257,57 +241,25 @@ def check_site(entry):
# Results from our next generation checkers
nextgen_results = checks.perform_checks(entry['url'])
result['details']['hostnames'] = nextgen_results['subdomain_variations']
logging.debug("result[details][hostnames]: %r" % result['details']['hostnames'])
pprint(nextgen_results['dns_resolution'])
pprint(nextgen_results['charset'])
pprint(nextgen_results['html_head'])
result['details']['ipv4_addresses'] = collect_ipv4_addresses(nextgen_results['subdomain_variations'])
logging.debug("result[details][ipv4_addresses]: %r" % result['details']['ipv4_addresses'])
result['details']['hostnames'] = nextgen_results['domain_variations'].items()
#logging.debug("result[details][hostnames]: %r" % result['details']['hostnames'])
time.sleep(5)
result['details']['ipv4_addresses'] = collect_ipv4_addresses(nextgen_results['domain_variations'])
#logging.debug("result[details][ipv4_addresses]: %r" % result['details']['ipv4_addresses'])
# check basic HTTP(S) reachability
checked_urls = []
checked_urls_set = set()
result['details']['resolvable_urls'] = sorted(nextgen_results['url_reachability'].items(), key=lambda url: url['url'])
for item in result['details']['hostnames']:
result['details']['canonical_urls'] = sorted(nextgen_results['url_canonicalization'].items())
if not item['resolvable']:
continue
for scheme in ('http', 'https'):
url = scheme + '://' + item['hostname'] + '/'
if url in checked_urls_set:
continue
checked_urls_set.add(url)
record = {
'url': url,
'error': None,
'redirects_to': None,
}
try:
req = requests.head(record['url'], headers=headers, allow_redirects=True)
if req.url == url:
logging.info("URL: %s - status %s", record['url'], req.status_code)
else:
logging.info("URL: %s - status %s - redirects to %s", record['url'],
req.status_code, req.url)
record['redirects_to'] = req.url
except Exception as exc:
record['error'] = {
'type': str(type(exc)),
'message': str(exc),
}
logging.info("URL %s: %s %s", url, str(type(exc)), exc)
checked_urls.append(record)
result['details']['resolvable_urls'] = sorted(checked_urls, key=lambda url: url['url'])
result['details']['canonical_urls'] = sorted(reduce_urls(checked_urls))
# TODO: continue with content checks
logging.info("Waiting 10 seconds...")
time.sleep(60)
# Deeper test for the remaining (canonical) URL(s)
for check_url in result['details']['canonical_urls']:
@ -437,7 +389,7 @@ def check_site(entry):
# SITE_REACHABLE
for item in result['details']['resolvable_urls']:
if item['error'] is None:
if item['exception'] is None:
result['result']['SITE_REACHABLE'] = {'value': True, 'score': 1}
break