Remove spider code no longer used

This commit is contained in:
Marian Steinbach 2018-09-28 22:49:28 +02:00
parent d4b5695ae9
commit 8a5229861e

399
spider.py
View file

@ -3,140 +3,26 @@ Provides the spider functionality (website checks).
"""
import argparse
import json
import logging
import re
import statistics
import time
from datetime import datetime
from pprint import pprint
from socket import gethostbyname_ex
from urllib.parse import urljoin
from urllib.parse import urlparse
import requests
from bs4 import BeautifulSoup
from google.api_core.exceptions import InvalidArgument
from google.cloud import datastore
import jobs
import config
import checks
import config
import jobs
import rating
DATASTORE_CLIENT = None
def normalize_title(title):
"""
Removes garbage from HTML page titles
"""
title = title.replace(u'\u00a0', ' ')
title = title.replace(' ', ' ')
title = title.strip()
return title
def check_content(req):
"""
Adds details to check regarding content of the page
check: the dict containing details for this URL
r: requests request/response object
"""
result = {}
result['encoding'] = req.encoding.lower()
soup = BeautifulSoup(req.text, 'html.parser')
result['html'] = req.text
# page title
result['title'] = None
title = None
head = soup.find('head')
if head is not None:
title = head.find('title')
if title is not None:
result['title'] = normalize_title(title.get_text())
# canonical link
result['canonical_link'] = None
link = soup.find('link', rel='canonical')
if link:
result['canonical_link'] = urljoin(req.url, link.get('href'))
# icon
result['icon'] = None
link = soup.find('link', rel=lambda x: x and x.lower() == 'icon')
if link:
result['icon'] = urljoin(req.url, link.get('href'))
else:
link = soup.find('link', rel=lambda x: x and x.lower() == 'shortcut icon')
if link:
result['icon'] = urljoin(req.url, link.get('href'))
# feed links
result['feeds'] = []
rss_links = soup.find_all('link', type='application/rss+xml')
atom_links = soup.find_all('link', type='application/atom+xml')
if rss_links:
for link in rss_links:
result['feeds'].append(urljoin(req.url, link.get('href')))
if atom_links:
for link in rss_links:
result['feeds'].append(urljoin(req.url, link.get('href')))
# generator meta tag
result['generator'] = None
if head is not None:
generator = head.select('[name=generator]')
if generator:
result['generator'] = generator[0].get('content')
# opengraph meta tags
result['opengraph'] = None
opengraph = set()
if head is not None:
for item in head.find_all(property=re.compile('^og:')):
opengraph.add(item.get('property'))
for item in head.find_all(itemprop=re.compile('^og:')):
opengraph.add(item.get('itemprop'))
if opengraph:
result['opengraph'] = sorted(list(opengraph))
return result
def collect_ipv4_addresses(hostname_results):
"""
Return list of unique IPv4 addresses
"""
ips = set()
for item in hostname_results.items():
if 'ipv4_addresses' not in item:
continue
ips = ips | set(item['ipv4_addresses']) # union
return sorted(list(ips))
def parse_generator(generator):
"""
Return well known CMS names from generator
"""
generator = generator.lower()
if 'typo3' in generator:
return "typo3"
if 'wordpress' in generator:
return "wordpress"
if 'drupal' in generator:
return "drupal"
if 'joomla' in generator:
return "joomla"
return generator
def check_site(entry):
def check_and_rate_site(entry):
"""
Performs our site check and returns results as a dict.
@ -145,11 +31,6 @@ def check_site(entry):
3. Determine the canonical URL
4. Run full check on canonical URL
"""
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_4) ' +
'AppleWebKit/537.36 (KHTML, like Gecko) ' +
'Chrome/65.0.3325.181 green-spider/0.1'
}
# all the info we'll return for the site
result = {
@ -157,259 +38,36 @@ def check_site(entry):
'input_url': entry['url'],
# Meta: Regional and type metadata for the site
'meta': {
'type': entry.get('type'),
'level': entry.get('level'),
'state': entry.get('state'),
'district': entry.get('district'),
'city': entry.get('city'),
},
# Details: All details we collected about the site (which aren't directly
# related to the report criteria)
'details': {
'hostnames': {},
'ipv4_addresses': [],
'resolvable_urls': [],
'canonical_urls': [],
'urlchecks': [],
'icons': [],
'feeds': [],
'cms': None,
'responsive': None,
},
# The actual report criteria
'result': {
'DNS_RESOLVABLE_IPV4': {'type': 'boolean', 'value': False, 'score': 0},
'SITE_REACHABLE': {'type': 'boolean', 'value': False, 'score': 0},
'HTTPS': {'type': 'boolean', 'value': False, 'score': 0},
'WWW_OPTIONAL': {'type': 'boolean', 'value': False, 'score': 0},
'CANONICAL_URL': {'type': 'boolean', 'value': False, 'score': 0},
'FAVICON': {'type': 'boolean', 'value': False, 'score': 0},
'FEEDS': {'type': 'boolean', 'value': False, 'score': 0},
'HTTP_RESPONSE_DURATION': {'type': 'number', 'value': None, 'score': 0},
'RESPONSIVE': {'type': 'boolean', 'value': False, 'score': 0},
},
# checks: Results from our checks
'checks': {},
# The actual report scoring criteria
'rating': {},
# resulting score
'score': 0.0,
}
# Results from our next generation checkers
nextgen_results = checks.perform_checks(entry['url'])
result['checks'] = checks.perform_checks(entry['url'])
pprint(nextgen_results['dns_resolution'])
pprint(nextgen_results['url_reachability'])
pprint(nextgen_results['charset'])
pprint(nextgen_results['html_head'])
pprint(nextgen_results['generator'])
pprint(nextgen_results['responsive_layout'])
result['rating'] = rating.calculate_rating(result['checks'])
result['details']['hostnames'] = nextgen_results['dns_resolution'].values()
#logging.debug("result[details][hostnames]: %r" % result['details']['hostnames'])
# Overall score is the sum of the individual scores
for key in result['rating']:
result['score'] += result['rating'][key]['score']
result['details']['ipv4_addresses'] = collect_ipv4_addresses(nextgen_results['dns_resolution'])
#logging.debug("result[details][ipv4_addresses]: %r" % result['details']['ipv4_addresses'])
result['details']['resolvable_urls'] = sorted(nextgen_results['url_reachability'].values(), key=lambda url: url['url'])
result['details']['canonical_urls'] = sorted(nextgen_results['url_canonicalization'])
# TODO: continue with content checks
logging.info("Waiting 10 seconds...")
time.sleep(60)
# Deeper test for the remaining (canonical) URL(s)
for check_url in result['details']['canonical_urls']:
logging.info("Downloading URL %s", check_url)
check = {
'url': check_url,
'status_code': None,
'duration': None,
'error': None,
'content': None,
'responsive': None,
}
try:
req = requests.get(check_url, headers=headers, timeout=(config.CONNECT_TIMEOUT, config.READ_TIMEOUT))
check['status_code'] = req.status_code
check['duration'] = round(req.elapsed.microseconds / 1000)
# Content checks
if req.status_code < 300:
check['content'] = check_content(req)
except requests.exceptions.ConnectionError as exc:
logging.error(str(exc) + " " + check_url)
check['error'] = "connection"
except requests.exceptions.ReadTimeout as exc:
logging.error(str(exc) + " " + check_url)
check['error'] = "read_timeout"
except requests.exceptions.Timeout as exc:
logging.error(str(exc) + " " + check_url)
check['error'] = "connection_timeout"
except Exception as exc:
logging.error(str(exc) + " " + check_url)
check['error'] = "unknown"
result['details']['urlchecks'].append(check)
result['details']['urlchecks'] = sorted(result['details']['urlchecks'],
key=lambda url: url['url'])
# collect icons
icons = set()
for c in result['details']['urlchecks']:
if 'content' not in c:
continue
if c['content'] is None:
logging.warning("No content for %s", entry['url'])
continue
if c['content']['icon'] is not None:
icons.add(c['content']['icon'])
result['details']['icons'] = sorted(list(icons))
# collect feeds
feeds = set()
for c in result['details']['urlchecks']:
if c['content'] is None:
logging.warning("No content for %s", entry['url'])
continue
if 'feeds' in c['content'] and len(c['content']['feeds']):
for feed in c['content']['feeds']:
feeds.add(feed)
result['details']['feeds'] = sorted(list(feeds))
# detect responsive
viewports = set()
min_width = 2000
for c in result['details']['urlchecks']:
if c['responsive'] is None:
continue
if c['responsive']['viewport_meta_tag'] is not None:
viewports.add(c['responsive']['viewport_meta_tag'])
widths = c['responsive']['document_width'].values()
if min(widths) < min_width:
min_width = min(widths)
result['details']['responsive'] = {
'viewport_meta_tag': list(viewports),
'min_width': min_width,
}
# detect CMS
for c in result['details']['urlchecks']:
if c['content'] is None:
continue
if 'generator' not in c['content']:
continue
if c['content']['generator'] != "" and c['content']['generator'] is not None:
result['details']['cms'] = parse_generator(c['content']['generator'])
# Qualify certain CMS flavours in more detail
if result['details']['cms'] == "typo3":
if config.GCMS_IP in result['details']['ipv4_addresses']:
result['details']['cms'] = "typo3-gcms"
elif 'typo3-gruene.de' in c['content']['html']:
result['details']['cms'] = "typo3-gruene"
elif result['details']['cms'] == "wordpress":
if 'Urwahl3000' in c['content']['html']:
result['details']['cms'] = "wordpress-urwahl"
else:
# No generator Tag. Use HTML content.
if 'Urwahl3000' in c['content']['html']:
result['details']['cms'] = "wordpress-urwahl"
elif ('josephknowsbest' in c['content']['html'] or
'Joseph-knows-best' in c['content']['html']):
result['details']['cms'] = "wordpress-josephknowsbest"
elif 'wordpress' in c['content']['html']:
result['details']['cms'] = "wordpress"
# we can stop here
break
### Derive criteria
# DNS_RESOLVABLE_IPV4
if result['details']['ipv4_addresses']:
result['result']['DNS_RESOLVABLE_IPV4'] = {'value': True, 'score': 1}
# SITE_REACHABLE
for item in result['details']['resolvable_urls']:
if item['exception'] is None:
result['result']['SITE_REACHABLE'] = {'value': True, 'score': 1}
break
# HTTPS
for item in result['details']['urlchecks']:
if item['error'] is None and item['url'].startswith('https://'):
result['result']['HTTPS'] = {'value': True, 'score': 2}
break
# WWW_OPTIONAL
num_hostnames = 0
for item in result['details']['hostnames']:
if not item['resolvable']:
continue
num_hostnames += 1
if num_hostnames > 1:
result['result']['WWW_OPTIONAL'] = {'value': True, 'score': 1}
# CANONICAL_URL
# - either there is only one canonical URL (through redirects)
# - or several pages have identical rel=canonical links
if len(result['details']['canonical_urls']) == 1:
result['result']['CANONICAL_URL'] = {'value': True, 'score': 1}
else:
links = set()
if result['details']['urlchecks'] is None:
logging.warning("No urlchecks for %s", entry['url'])
else:
for item in result['details']['urlchecks']:
if item['content'] is not None and item['content']['canonical_link'] is not None:
links.add(item['content']['canonical_link'])
if len(links) == 1:
result['result']['CANONICAL_URL'] = {'value': True, 'score': 1}
# FAVICON
if result['details']['icons']:
result['result']['FAVICON'] = {'value': True, 'score': 1}
# FEEDS
if result['details']['feeds']:
result['result']['FEEDS'] = {'value': True, 'score': 1}
# HTTP_RESPONSE_DURATION
durations = []
for item in result['details']['urlchecks']:
if item['error'] is None:
durations.append(item['duration'])
if durations:
val = round(statistics.mean(durations))
result['result']['HTTP_RESPONSE_DURATION']['value'] = val
if val < 100:
result['result']['HTTP_RESPONSE_DURATION']['score'] = 1
elif val < 1000:
result['result']['HTTP_RESPONSE_DURATION']['score'] = 0.5
# RESPONSIVE
if result['details']['responsive'] is not None:
if (result['details']['responsive']['min_width'] < 500 and
len(result['details']['responsive']['viewport_meta_tag']) > 0):
result['result']['RESPONSIVE']['value'] = True
result['result']['RESPONSIVE']['score'] = 1
# Overall score
for item in result['result'].keys():
result['score'] += result['result'][item]['score']
# clean up - remove full HTML
for item in result['details']['urlchecks']:
try:
del item['content']['html']
except:
pass
# remove full HTML page content,
# as it's no longer needed
try:
for url in result['checks']['page_content']:
del result['checks']['page_content'][url]['content']
except:
pass
return result
@ -425,7 +83,9 @@ def work_of_queue():
break
logging.info("Starting job %s", job["url"])
result = check_site(entry=job)
result = check_and_rate_site(entry=job)
logging.debug("Full JSON representation of returned result: %s", json.dumps(result))
logging.info("Job %s finished checks", job["url"])
logging.info("Job %s writing to DB", job["url"])
@ -433,8 +93,11 @@ def work_of_queue():
key = DATASTORE_CLIENT.key(config.RESULTS_DATASTORE_KIND, job["url"])
entity = datastore.Entity(key=key, exclude_from_indexes=['results'])
record = {
"created": datetime.utcnow(),
"results": result,
'created': datetime.utcnow(),
'meta': result['meta'],
'checks': result['checks'],
'rating': result['rating'],
'score': result['score'],
}
entity.update(record)
try: