170 lines
5.5 KiB
Python
170 lines
5.5 KiB
Python
from datetime import datetime
|
|
import logging
|
|
from os import getenv
|
|
import sys
|
|
|
|
from dateutil.parser import parse
|
|
from google.cloud import datastore
|
|
from elasticsearch import Elasticsearch
|
|
from elasticsearch.exceptions import NotFoundError
|
|
|
|
|
|
# credentials_path is the path to the Google Cloud credentials JSON file
|
|
# used for authentication.
|
|
credentials_path = getenv('GCLOUD_DATASTORE_CREDENTIALS_PATH')
|
|
|
|
# spider_results_kind is the name of the database/entity storing spider results
|
|
# in the Google Cloud datastore.
|
|
spider_results_kind = 'spider-results'
|
|
|
|
datastore_client = datastore.Client.from_service_account_json(credentials_path)
|
|
|
|
es_index_name = spider_results_kind
|
|
|
|
|
|
def convert_datastore_datetime(field):
|
|
"""
|
|
return datetime in different ways, depending on whether the lib returns
|
|
a str, int, or datetime.datetime
|
|
"""
|
|
dt = ''
|
|
if type(field) == datetime:
|
|
dt = field
|
|
elif type(field) == int:
|
|
dt = datetime.utcfromtimestamp(field / 1000000)
|
|
elif type(field) == str:
|
|
dt = datetime.utcfromtimestamp(int(field) / 1000000)
|
|
return dt
|
|
|
|
|
|
def get_spider_results(client):
|
|
query = client.query(kind=spider_results_kind,
|
|
order=['-created'])
|
|
|
|
for entity in query.fetch(eventual=True):
|
|
created = convert_datastore_datetime(entity.get('created'))
|
|
|
|
yield {
|
|
'url': entity.key.name,
|
|
'created': created.isoformat(),
|
|
'meta': entity.get('meta'),
|
|
'score': entity.get('score'),
|
|
'checks': entity.get('checks'),
|
|
'rating': entity.get('rating'),
|
|
}
|
|
|
|
def make_indexname(name_prefix):
|
|
"""
|
|
creates a timestamped index name
|
|
"""
|
|
return name_prefix + "-" + datetime.utcnow().strftime("%Y%m%d-%H%M%S")
|
|
|
|
def main():
|
|
# Set up logging
|
|
root = logging.getLogger()
|
|
root.setLevel(logging.INFO)
|
|
ch = logging.StreamHandler(sys.stdout)
|
|
ch.setLevel(logging.INFO)
|
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
ch.setFormatter(formatter)
|
|
root.addHandler(ch)
|
|
|
|
print("Connecting to elasticsearch:9200...")
|
|
es = Elasticsearch([{'host': 'elasticsearch', 'port': 9200}])
|
|
es.cluster.health(wait_for_status='yellow', request_timeout=20)
|
|
|
|
settings = {
|
|
"index.mapping.total_fields.limit": 2000,
|
|
"analysis": {
|
|
"tokenizer": {
|
|
"my_autocomplete_tokenizer": {
|
|
"type": "edge_ngram",
|
|
"min_gram": 2,
|
|
"max_gram": 10,
|
|
"token_chars": ["letter"]
|
|
}
|
|
},
|
|
"analyzer": {
|
|
"my_autocomplete_analyzer": {
|
|
"tokenizer": "my_autocomplete_tokenizer",
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
mappings = {
|
|
"properties": {
|
|
"url": {"type": "text", "analyzer": "my_autocomplete_analyzer"},
|
|
"meta": {
|
|
"dynamic": "false",
|
|
"properties": {
|
|
"type": {"type": "keyword"},
|
|
"level": {"type": "keyword"},
|
|
"city": {"type": "text", "analyzer": "my_autocomplete_analyzer"},
|
|
"district": {"type": "text", "analyzer": "my_autocomplete_analyzer"},
|
|
"state": {"type": "text", "analyzer": "my_autocomplete_analyzer"},
|
|
}
|
|
},
|
|
"checks": {"dynamic": "false", "properties": {}},
|
|
"rating": {"dynamic": "false", "properties": {}},
|
|
"score": {"type": "float"},
|
|
"created": {"type": "date"},
|
|
}
|
|
}
|
|
|
|
# Sometimes useful in development
|
|
#es.indices.delete(index=es_index_name)
|
|
|
|
tempindex = make_indexname(es_index_name)
|
|
|
|
# Create new index
|
|
es.indices.create(index=tempindex, ignore=400)
|
|
es.indices.close(index=tempindex)
|
|
es.indices.put_settings(index=tempindex, body=settings)
|
|
es.indices.put_mapping(index=tempindex, doc_type='result', body=mappings)
|
|
es.indices.open(index=tempindex)
|
|
|
|
# Index database content
|
|
logging.info('Reading result documents from %s DB' % spider_results_kind)
|
|
count = 0
|
|
for doc in get_spider_results(datastore_client):
|
|
es.index(index=tempindex, doc_type='result', id=doc['url'], body=doc)
|
|
count += 1
|
|
|
|
logging.info('Indexed %s documents' % count)
|
|
|
|
# Set our index alias to the new index,
|
|
# remove old index if existed, re-create alias.
|
|
if es.indices.exists_alias(name=es_index_name):
|
|
old_index = es.indices.get_alias(name=es_index_name)
|
|
|
|
# here we assume there is only one index behind this alias
|
|
old_indices = list(old_index.keys())
|
|
|
|
if len(old_indices) > 0:
|
|
logging.info("Old index on alias is: %s" % old_indices[0])
|
|
|
|
try:
|
|
es.indices.delete_alias(index=old_indices[0], name=es_index_name)
|
|
except NotFoundError:
|
|
logging.error("Could not delete index alias for %s" % old_indices[0])
|
|
pass
|
|
|
|
try:
|
|
es.indices.delete(index=old_indices[0])
|
|
except:
|
|
logging.error("Could not delete index %s" % old_indices[0])
|
|
pass
|
|
|
|
# Delete legacy index with same name as alias
|
|
if es.indices.exists(index=es_index_name):
|
|
logging.info("Deleting legacy index with name %s" % es_index_name)
|
|
es.indices.delete(index=es_index_name)
|
|
|
|
logging.info("Setting alias '%s' to index '%s" % (es_index_name, tempindex))
|
|
es.indices.put_alias(index=tempindex, name=es_index_name)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|