green-spider-indexer/indexer.py

170 lines
5.5 KiB
Python
Raw Normal View History

2019-04-11 22:35:09 +02:00
from datetime import datetime
2020-04-03 21:26:14 +02:00
import logging
2019-04-11 22:35:09 +02:00
from os import getenv
2020-04-03 21:26:14 +02:00
import sys
2019-04-11 22:35:09 +02:00
2019-04-15 21:28:17 +02:00
from dateutil.parser import parse
2019-04-11 22:35:09 +02:00
from google.cloud import datastore
from elasticsearch import Elasticsearch
2020-04-03 21:26:14 +02:00
from elasticsearch.exceptions import NotFoundError
2019-04-11 22:35:09 +02:00
2020-04-03 21:26:14 +02:00
# credentials_path is the path to the Google Cloud credentials JSON file
# used for authentication.
2019-04-11 22:35:09 +02:00
credentials_path = getenv('GCLOUD_DATASTORE_CREDENTIALS_PATH')
2020-04-03 21:26:14 +02:00
# spider_results_kind is the name of the database/entity storing spider results
# in the Google Cloud datastore.
2019-04-11 22:35:09 +02:00
spider_results_kind = 'spider-results'
2020-04-03 21:26:14 +02:00
datastore_client = datastore.Client.from_service_account_json(credentials_path)
2019-04-11 22:35:09 +02:00
es_index_name = spider_results_kind
def convert_datastore_datetime(field):
"""
return datetime in different ways, depending on whether the lib returns
a str, int, or datetime.datetime
"""
dt = ''
if type(field) == datetime:
dt = field
elif type(field) == int:
dt = datetime.utcfromtimestamp(field / 1000000)
elif type(field) == str:
dt = datetime.utcfromtimestamp(int(field) / 1000000)
return dt
2020-04-03 21:26:14 +02:00
def get_spider_results(client):
2019-04-11 22:35:09 +02:00
query = client.query(kind=spider_results_kind,
order=['-created'])
for entity in query.fetch(eventual=True):
created = convert_datastore_datetime(entity.get('created'))
yield {
'url': entity.key.name,
'created': created.isoformat(),
'meta': entity.get('meta'),
'score': entity.get('score'),
'checks': entity.get('checks'),
'rating': entity.get('rating'),
}
2020-04-03 21:26:14 +02:00
def make_indexname(name_prefix):
"""
creates a timestamped index name
"""
return name_prefix + "-" + datetime.utcnow().strftime("%Y%m%d-%H%M%S")
2019-04-15 21:28:17 +02:00
2019-04-11 22:35:09 +02:00
def main():
2020-04-03 21:26:14 +02:00
# Set up logging
root = logging.getLogger()
root.setLevel(logging.INFO)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
2019-04-15 21:28:17 +02:00
print("Connecting to elasticsearch:9200...")
2019-04-11 22:35:09 +02:00
es = Elasticsearch([{'host': 'elasticsearch', 'port': 9200}])
2019-04-15 21:28:17 +02:00
es.cluster.health(wait_for_status='yellow', request_timeout=20)
2019-04-11 22:35:09 +02:00
settings = {
"index.mapping.total_fields.limit": 2000,
"analysis": {
"tokenizer": {
"my_autocomplete_tokenizer": {
"type": "edge_ngram",
"min_gram": 2,
"max_gram": 10,
"token_chars": ["letter"]
}
},
"analyzer": {
"my_autocomplete_analyzer": {
"tokenizer": "my_autocomplete_tokenizer",
}
}
}
}
mappings = {
"properties": {
2019-04-15 21:28:17 +02:00
"url": {"type": "text", "analyzer": "my_autocomplete_analyzer"},
2019-04-11 22:35:09 +02:00
"meta": {
"dynamic": "false",
"properties": {
"type": {"type": "keyword"},
"level": {"type": "keyword"},
2019-04-15 21:28:17 +02:00
"city": {"type": "text", "analyzer": "my_autocomplete_analyzer"},
"district": {"type": "text", "analyzer": "my_autocomplete_analyzer"},
"state": {"type": "text", "analyzer": "my_autocomplete_analyzer"},
2019-04-11 22:35:09 +02:00
}
},
"checks": {"dynamic": "false", "properties": {}},
"rating": {"dynamic": "false", "properties": {}},
"score": {"type": "float"},
"created": {"type": "date"},
}
}
2019-04-15 21:28:17 +02:00
# Sometimes useful in development
2019-04-11 22:35:09 +02:00
#es.indices.delete(index=es_index_name)
2020-04-03 21:26:14 +02:00
tempindex = make_indexname(es_index_name)
2019-04-15 21:28:17 +02:00
2020-04-03 21:26:14 +02:00
# Create new index
es.indices.create(index=tempindex, ignore=400)
es.indices.close(index=tempindex)
es.indices.put_settings(index=tempindex, body=settings)
es.indices.put_mapping(index=tempindex, doc_type='result', body=mappings)
es.indices.open(index=tempindex)
2019-04-11 22:35:09 +02:00
# Index database content
2020-04-03 21:26:14 +02:00
logging.info('Reading result documents from %s DB' % spider_results_kind)
2019-04-11 22:35:09 +02:00
count = 0
2020-04-03 21:26:14 +02:00
for doc in get_spider_results(datastore_client):
es.index(index=tempindex, doc_type='result', id=doc['url'], body=doc)
2019-04-11 22:35:09 +02:00
count += 1
2020-04-03 21:26:14 +02:00
logging.info('Indexed %s documents' % count)
# Set our index alias to the new index,
# remove old index if existed, re-create alias.
if es.indices.exists_alias(name=es_index_name):
old_index = es.indices.get_alias(name=es_index_name)
# here we assume there is only one index behind this alias
old_indices = list(old_index.keys())
if len(old_indices) > 0:
logging.info("Old index on alias is: %s" % old_indices[0])
try:
es.indices.delete_alias(index=old_indices[0], name=es_index_name)
except NotFoundError:
logging.error("Could not delete index alias for %s" % old_indices[0])
pass
try:
es.indices.delete(index=old_indices[0])
except:
logging.error("Could not delete index %s" % old_indices[0])
pass
# Delete legacy index with same name as alias
if es.indices.exists(index=es_index_name):
logging.info("Deleting legacy index with name %s" % es_index_name)
es.indices.delete(index=es_index_name)
logging.info("Setting alias '%s' to index '%s" % (es_index_name, tempindex))
es.indices.put_alias(index=tempindex, name=es_index_name)
2019-04-11 22:35:09 +02:00
if __name__ == "__main__":
main()