Move job handling into module

This commit is contained in:
Marian Steinbach 2018-09-24 21:19:47 +02:00
parent 73eff37110
commit 2391207755
2 changed files with 174 additions and 179 deletions

166
jobs/__init__.py Normal file
View file

@ -0,0 +1,166 @@
from datetime import datetime
import logging
import os
import shutil
from git import Repo
import tenacity
import yaml
from google.api_core.exceptions import Aborted
from google.cloud import datastore
import config
def clone_data_directory():
"""
Clones the source of website URLs, the green directory,
into the local file system using git
"""
if os.path.exists(config.GREEN_DIRECTORY_LOCAL_PATH):
shutil.rmtree(config.GREEN_DIRECTORY_LOCAL_PATH)
Repo.clone_from(config.GREEN_DIRECTORY_REPO, config.GREEN_DIRECTORY_LOCAL_PATH)
def directory_entries():
"""
Iterator over all data files in the cloned green directory
"""
path = os.path.join(config.GREEN_DIRECTORY_LOCAL_PATH, config.GREEN_DIRECTORY_DATA_PATH)
for root, _, files in os.walk(path):
for fname in files:
filepath = os.path.join(root, fname)
if not filepath.endswith(".yaml"):
continue
with open(filepath, 'r', encoding='utf8') as yamlfile:
for doc in yaml.load_all(yamlfile):
yield doc
def chunks(the_list, size):
"""
Yield successive n-sized chunks from list the_list
where n = size.
"""
for i in range(0, len(the_list), size):
yield the_list[i:i + size]
def create_jobs(datastore_client, url=None):
"""
Read all URLs from green directory and fill a job database
with one job per URL.
Alternatively, if the url argument is given, only the given URL
will be added as a spider job.
"""
# refresh our local clone of the green directory
logging.info("Refreshing green-directory clone")
clone_data_directory()
# build the list of website URLs to run checks for
logging.info("Processing green-directory")
input_entries = []
count = 0
for entry in directory_entries():
if 'type' not in entry:
logging.error("Entry without type")
continue
if 'urls' not in entry:
logging.debug("Entry %s does not have any URLs.", repr_entry(entry))
continue
website_url = None
for index in range(len(entry['urls'])):
try:
if entry['urls'][index]['type'] == "WEBSITE":
website_url = entry['urls'][index]['url']
if website_url:
if url is not None and website_url != url:
continue
input_entries.append({
"url": website_url,
"level": entry.get("level"),
"state": entry.get("state"),
"district": entry.get("district"),
"city": entry.get("city"),
})
count += 1
except NameError:
logging.error("Error in %s: 'url' key missing (%s)",
repr_entry(entry), entry['urls'][index])
# ensure the passed URL argument is really there, even if not part
# of the directory.
if url and count == 0:
logging.info("Adding job for URL %s which is not part of green-directory", url)
input_entries.append({
"url": url,
"level": None,
"state": None,
"district": None,
"city": None,
})
count = 0
logging.info("Writing jobs")
entities = []
for entry in input_entries:
key = datastore_client.key(config.JOB_DATASTORE_KIND, entry["url"])
entity = datastore.Entity(key=key)
entity.update({
"created": datetime.utcnow(),
"level": entry["level"],
"state": entry["state"],
"district": entry["district"],
"city": entry["city"],
})
entities.append(entity)
# commmit to DB
for chunk in chunks(entities, 300):
logging.debug("Writing jobs chunk of length %d", len(chunk))
datastore_client.put_multi(chunk)
count += len(chunk)
logging.info("Writing jobs done, %s jobs added", count)
@tenacity.retry(wait=tenacity.wait_exponential(),
retry=tenacity.retry_if_exception_type(Aborted))
def get_job_from_queue(datastore_client):
"""
Returns a URL from the queue
"""
out = None
with datastore_client.transaction():
query = datastore_client.query(kind=config.JOB_DATASTORE_KIND)
for entity in query.fetch(limit=1):
logging.debug("Got job: %s", entity)
out = dict(entity)
out["url"] = entity.key.name
datastore_client.delete(entity.key)
return out
def repr_entry(entry):
"""
Return string representation of a directory entry,
for logging/debugging purposes
"""
ret = entry['type']
if 'level' in entry:
ret += "/" + entry['level']
if 'state' in entry:
ret += "/" + entry['state']
if 'district' in entry:
ret += "/" + entry['district']
return ret

187
spider.py
View file

@ -3,12 +3,8 @@ Provides the spider functionality (website checks).
"""
import argparse
import json
import logging
import os
import random
import re
import shutil
import statistics
import time
from datetime import datetime
@ -17,167 +13,18 @@ from urllib.parse import urljoin
from urllib.parse import urlparse
import requests
import yaml
import tenacity
from bs4 import BeautifulSoup
from git import Repo
from selenium import webdriver
from google.cloud import datastore
from google.api_core.exceptions import Aborted
from google.api_core.exceptions import InvalidArgument
from google.cloud import datastore
# end configuration
import jobs
import config
DATASTORE_CLIENT = None
def chunks(the_list, size):
"""
Yield successive n-sized chunks from list the_list
where n = size.
"""
for i in range(0, len(the_list), size):
yield the_list[i:i + size]
def create_jobs(url=None):
"""
Read all URLs from green directory and fill a job database
with one job per URL.
Alternatively, if the url argument is given, only the given URL
will be added as a spider job.
"""
# refresh our local clone of the green directory
logging.info("Refreshing green-directory clone")
get_green_directory()
# build the list of website URLs to run checks for
logging.info("Processing green-directory")
input_entries = []
count = 0
for entry in dir_entries():
if 'type' not in entry:
logging.error("Entry without type")
continue
if 'urls' not in entry:
logging.debug("Entry %s does not have any URLs.", repr_entry(entry))
continue
website_url = None
for index in range(len(entry['urls'])):
try:
if entry['urls'][index]['type'] == "WEBSITE":
website_url = entry['urls'][index]['url']
if website_url:
if url is not None and website_url != url:
continue
input_entries.append({
"url": website_url,
"level": entry.get("level"),
"state": entry.get("state"),
"district": entry.get("district"),
"city": entry.get("city"),
})
count += 1
except NameError:
logging.error("Error in %s: 'url' key missing (%s)",
repr_entry(entry), entry['urls'][index])
# ensure the passed URL argument is really there, even if not part
# of the directory.
if url and count == 0:
logging.info("Adding job for URL %s which is not part of green-directory", url)
input_entries.append({
"url": url,
"level": None,
"state": None,
"district": None,
"city": None,
})
# randomize order, to distribute requests over servers
logging.debug("Shuffling input URLs")
random.seed()
random.shuffle(input_entries)
count = 0
logging.info("Writing jobs")
entities = []
for entry in input_entries:
key = DATASTORE_CLIENT.key(JOB_DATASTORE_KIND, entry["url"])
entity = datastore.Entity(key=key)
entity.update({
"created": datetime.utcnow(),
"level": entry["level"],
"state": entry["state"],
"district": entry["district"],
"city": entry["city"],
})
entities.append(entity)
# commmit to DB
for chunk in chunks(entities, 300):
logging.debug("Writing jobs chunk of length %d", len(chunk))
DATASTORE_CLIENT.put_multi(chunk)
count += len(chunk)
logging.info("Writing jobs done, %s jobs added", count)
def get_green_directory():
"""
Clones the source of website URLs, the green directory,
into the local file system using git
"""
if os.path.exists(GREEN_DIRECTORY_LOCAL_PATH):
shutil.rmtree(GREEN_DIRECTORY_LOCAL_PATH)
Repo.clone_from(GREEN_DIRECTORY_REPO, GREEN_DIRECTORY_LOCAL_PATH)
def dir_entries():
"""
Iterator over all data files in the cloned green directory
"""
path = os.path.join(GREEN_DIRECTORY_LOCAL_PATH, GREEN_DIRECTORY_DATA_PATH)
for root, _, files in os.walk(path):
for fname in files:
filepath = os.path.join(root, fname)
if not filepath.endswith(".yaml"):
continue
with open(filepath, 'r', encoding='utf8') as yamlfile:
for doc in yaml.load_all(yamlfile):
yield doc
def repr_entry(entry):
"""
Return string representation of a directory entry,
for logging/debugging purposes
"""
ret = entry['type']
if 'level' in entry:
ret += "/" + entry['level']
if 'state' in entry:
ret += "/" + entry['state']
if 'district' in entry:
ret += "/" + entry['district']
return ret
def derive_test_hostnames(hostname):
"""
Derives the hostnames variants to test for a given host name.
@ -512,7 +359,7 @@ def check_site(entry):
}
try:
req = requests.get(check_url, headers=headers, timeout=(CONNECT_TIMEOUT, READ_TIMEOUT))
req = requests.get(check_url, headers=headers, timeout=(config.CONNECT_TIMEOUT, config.READ_TIMEOUT))
check['status_code'] = req.status_code
check['duration'] = round(req.elapsed.microseconds / 1000)
@ -595,7 +442,7 @@ def check_site(entry):
result['details']['cms'] = parse_generator(c['content']['generator'])
# Qualify certain CMS flavours in more detail
if result['details']['cms'] == "typo3":
if GCMS_IP in result['details']['ipv4_addresses']:
if config.GCMS_IP in result['details']['ipv4_addresses']:
result['details']['cms'] = "typo3-gcms"
elif 'typo3-gruene.de' in c['content']['html']:
result['details']['cms'] = "typo3-gruene"
@ -703,30 +550,12 @@ def check_site(entry):
return result
@tenacity.retry(wait=tenacity.wait_exponential(),
retry=tenacity.retry_if_exception_type(Aborted))
def get_job_from_queue():
"""
Returns a URL from the queue
"""
out = None
with DATASTORE_CLIENT.transaction():
query = DATASTORE_CLIENT.query(kind=JOB_DATASTORE_KIND)
for entity in query.fetch(limit=1):
logging.debug("Got job: %s", entity)
out = dict(entity)
out["url"] = entity.key.name
DATASTORE_CLIENT.delete(entity.key)
return out
def work_of_queue():
"""
Take job from queue and finish it until there are no more jobs
"""
while True:
job = get_job_from_queue()
job = jobs.get_job_from_queue(DATASTORE_CLIENT)
if job is None:
logging.info("No more jobs. Exiting.")
break
@ -737,7 +566,7 @@ def work_of_queue():
logging.info("Job %s finished checks", job["url"])
logging.info("Job %s writing to DB", job["url"])
key = DATASTORE_CLIENT.key(RESULTS_DATASTORE_KIND, job["url"])
key = DATASTORE_CLIENT.key(config.RESULTS_DATASTORE_KIND, job["url"])
entity = datastore.Entity(key=key, exclude_from_indexes=['results'])
record = {
"created": datetime.utcnow(),
@ -790,6 +619,6 @@ if __name__ == "__main__":
logging.debug("Called command %s", args.command)
if args.command == 'jobs':
create_jobs(args.url)
jobs.create_jobs(DATASTORE_CLIENT, args.url)
else:
work_of_queue()