Use index alias

This commit is contained in:
Marian Steinbach 2020-04-03 21:26:14 +02:00
parent 7dcb375825
commit 65ffa4e935
2 changed files with 69 additions and 36 deletions

1
.dockerignore Normal file
View file

@ -0,0 +1 @@
/venv

View file

@ -1,14 +1,24 @@
from datetime import datetime from datetime import datetime
import logging
from os import getenv from os import getenv
import sys
from dateutil.parser import parse from dateutil.parser import parse
from google.cloud import datastore from google.cloud import datastore
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError
# credentials_path is the path to the Google Cloud credentials JSON file
# used for authentication.
credentials_path = getenv('GCLOUD_DATASTORE_CREDENTIALS_PATH') credentials_path = getenv('GCLOUD_DATASTORE_CREDENTIALS_PATH')
# spider_results_kind is the name of the database/entity storing spider results
# in the Google Cloud datastore.
spider_results_kind = 'spider-results'
datastore_client = datastore.Client.from_service_account_json(credentials_path) datastore_client = datastore.Client.from_service_account_json(credentials_path)
spider_results_kind = 'spider-results'
es_index_name = spider_results_kind es_index_name = spider_results_kind
@ -27,16 +37,9 @@ def convert_datastore_datetime(field):
return dt return dt
def get_spider_results(client, newer_than=None): def get_spider_results(client):
query = client.query(kind=spider_results_kind, query = client.query(kind=spider_results_kind,
order=['-created']) order=['-created'])
if newer_than is None:
print("Fetching all results from database")
else:
print("Fetching results from database newer than %s" % newer_than.isoformat())
query.add_filter('created', '>', newer_than)
for entity in query.fetch(eventual=True): for entity in query.fetch(eventual=True):
created = convert_datastore_datetime(entity.get('created')) created = convert_datastore_datetime(entity.get('created'))
@ -50,22 +53,22 @@ def get_spider_results(client, newer_than=None):
'rating': entity.get('rating'), 'rating': entity.get('rating'),
} }
def make_indexname(name_prefix):
def last_updated(es): """
""" creates a timestamped index name
Returns the most recent created date from the """
spider-results index return name_prefix + "-" + datetime.utcnow().strftime("%Y%m%d-%H%M%S")
"""
res = es.search(index=es_index_name,
_source_include=['created'],
body={"query": {"match_all": {}}},
sort='created:desc',
size=1)
return res['hits']['hits'][0]['_source']['created']
def main(): def main():
# Set up logging
root = logging.getLogger()
root.setLevel(logging.INFO)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
print("Connecting to elasticsearch:9200...") print("Connecting to elasticsearch:9200...")
es = Elasticsearch([{'host': 'elasticsearch', 'port': 9200}]) es = Elasticsearch([{'host': 'elasticsearch', 'port': 9200}])
es.cluster.health(wait_for_status='yellow', request_timeout=20) es.cluster.health(wait_for_status='yellow', request_timeout=20)
@ -112,25 +115,54 @@ def main():
# Sometimes useful in development # Sometimes useful in development
#es.indices.delete(index=es_index_name) #es.indices.delete(index=es_index_name)
time_threshold = None tempindex = make_indexname(es_index_name)
if not es.indices.exists(index=es_index_name): # Create new index
es.indices.create(index=es_index_name, ignore=400) es.indices.create(index=tempindex, ignore=400)
es.indices.close(index=es_index_name) es.indices.close(index=tempindex)
es.indices.put_settings(index=es_index_name, body=settings) es.indices.put_settings(index=tempindex, body=settings)
es.indices.put_mapping(index=es_index_name, doc_type='result', body=mappings) es.indices.put_mapping(index=tempindex, doc_type='result', body=mappings)
es.indices.open(index=es_index_name) es.indices.open(index=tempindex)
else:
time_threshold = last_updated(es)
time_threshold = parse(time_threshold)
# Index database content # Index database content
logging.info('Reading result documents from %s DB' % spider_results_kind)
count = 0 count = 0
for doc in get_spider_results(datastore_client, newer_than=time_threshold): for doc in get_spider_results(datastore_client):
es.index(index=es_index_name, doc_type='result', id=doc['url'], body=doc) es.index(index=tempindex, doc_type='result', id=doc['url'], body=doc)
count += 1 count += 1
print('Done indexing %s documents' % count) logging.info('Indexed %s documents' % count)
# Set our index alias to the new index,
# remove old index if existed, re-create alias.
if es.indices.exists_alias(name=es_index_name):
old_index = es.indices.get_alias(name=es_index_name)
# here we assume there is only one index behind this alias
old_indices = list(old_index.keys())
if len(old_indices) > 0:
logging.info("Old index on alias is: %s" % old_indices[0])
try:
es.indices.delete_alias(index=old_indices[0], name=es_index_name)
except NotFoundError:
logging.error("Could not delete index alias for %s" % old_indices[0])
pass
try:
es.indices.delete(index=old_indices[0])
except:
logging.error("Could not delete index %s" % old_indices[0])
pass
# Delete legacy index with same name as alias
if es.indices.exists(index=es_index_name):
logging.info("Deleting legacy index with name %s" % es_index_name)
es.indices.delete(index=es_index_name)
logging.info("Setting alias '%s' to index '%s" % (es_index_name, tempindex))
es.indices.put_alias(index=tempindex, name=es_index_name)
if __name__ == "__main__": if __name__ == "__main__":