green-spider-indexer/indexer.py

170 lines
5.5 KiB
Python

from datetime import datetime
import logging
from os import getenv
import sys
from dateutil.parser import parse
from google.cloud import datastore
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError
# credentials_path is the path to the Google Cloud credentials JSON file
# used for authentication.
credentials_path = getenv('GCLOUD_DATASTORE_CREDENTIALS_PATH')
# spider_results_kind is the name of the database/entity storing spider results
# in the Google Cloud datastore.
spider_results_kind = 'spider-results'
datastore_client = datastore.Client.from_service_account_json(credentials_path)
es_index_name = spider_results_kind
def convert_datastore_datetime(field):
"""
return datetime in different ways, depending on whether the lib returns
a str, int, or datetime.datetime
"""
dt = ''
if type(field) == datetime:
dt = field
elif type(field) == int:
dt = datetime.utcfromtimestamp(field / 1000000)
elif type(field) == str:
dt = datetime.utcfromtimestamp(int(field) / 1000000)
return dt
def get_spider_results(client):
query = client.query(kind=spider_results_kind,
order=['-created'])
for entity in query.fetch(eventual=True):
created = convert_datastore_datetime(entity.get('created'))
yield {
'url': entity.key.name,
'created': created.isoformat(),
'meta': entity.get('meta'),
'score': entity.get('score'),
'checks': entity.get('checks'),
'rating': entity.get('rating'),
}
def make_indexname(name_prefix):
"""
creates a timestamped index name
"""
return name_prefix + "-" + datetime.utcnow().strftime("%Y%m%d-%H%M%S")
def main():
# Set up logging
root = logging.getLogger()
root.setLevel(logging.INFO)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
print("Connecting to elasticsearch:9200...")
es = Elasticsearch([{'host': 'elasticsearch', 'port': 9200}])
es.cluster.health(wait_for_status='yellow', request_timeout=20)
settings = {
"index.mapping.total_fields.limit": 2000,
"analysis": {
"tokenizer": {
"my_autocomplete_tokenizer": {
"type": "edge_ngram",
"min_gram": 2,
"max_gram": 10,
"token_chars": ["letter"]
}
},
"analyzer": {
"my_autocomplete_analyzer": {
"tokenizer": "my_autocomplete_tokenizer",
}
}
}
}
mappings = {
"properties": {
"url": {"type": "text", "analyzer": "my_autocomplete_analyzer"},
"meta": {
"dynamic": "false",
"properties": {
"type": {"type": "keyword"},
"level": {"type": "keyword"},
"city": {"type": "text", "analyzer": "my_autocomplete_analyzer"},
"district": {"type": "text", "analyzer": "my_autocomplete_analyzer"},
"state": {"type": "text", "analyzer": "my_autocomplete_analyzer"},
}
},
"checks": {"dynamic": "false", "properties": {}},
"rating": {"dynamic": "false", "properties": {}},
"score": {"type": "float"},
"created": {"type": "date"},
}
}
# Sometimes useful in development
#es.indices.delete(index=es_index_name)
tempindex = make_indexname(es_index_name)
# Create new index
es.indices.create(index=tempindex, ignore=400)
es.indices.close(index=tempindex)
es.indices.put_settings(index=tempindex, body=settings)
es.indices.put_mapping(index=tempindex, doc_type='result', body=mappings)
es.indices.open(index=tempindex)
# Index database content
logging.info('Reading result documents from %s DB' % spider_results_kind)
count = 0
for doc in get_spider_results(datastore_client):
es.index(index=tempindex, doc_type='result', id=doc['url'], body=doc)
count += 1
logging.info('Indexed %s documents' % count)
# Set our index alias to the new index,
# remove old index if existed, re-create alias.
if es.indices.exists_alias(name=es_index_name):
old_index = es.indices.get_alias(name=es_index_name)
# here we assume there is only one index behind this alias
old_indices = list(old_index.keys())
if len(old_indices) > 0:
logging.info("Old index on alias is: %s" % old_indices[0])
try:
es.indices.delete_alias(index=old_indices[0], name=es_index_name)
except NotFoundError:
logging.error("Could not delete index alias for %s" % old_indices[0])
pass
try:
es.indices.delete(index=old_indices[0])
except:
logging.error("Could not delete index %s" % old_indices[0])
pass
# Delete legacy index with same name as alias
if es.indices.exists(index=es_index_name):
logging.info("Deleting legacy index with name %s" % es_index_name)
es.indices.delete(index=es_index_name)
logging.info("Setting alias '%s' to index '%s" % (es_index_name, tempindex))
es.indices.put_alias(index=tempindex, name=es_index_name)
if __name__ == "__main__":
main()