diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..f9606a3 --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +/venv diff --git a/indexer.py b/indexer.py index 74d143a..f604775 100644 --- a/indexer.py +++ b/indexer.py @@ -1,14 +1,24 @@ from datetime import datetime +import logging from os import getenv +import sys from dateutil.parser import parse from google.cloud import datastore from elasticsearch import Elasticsearch +from elasticsearch.exceptions import NotFoundError + +# credentials_path is the path to the Google Cloud credentials JSON file +# used for authentication. credentials_path = getenv('GCLOUD_DATASTORE_CREDENTIALS_PATH') + +# spider_results_kind is the name of the database/entity storing spider results +# in the Google Cloud datastore. +spider_results_kind = 'spider-results' + datastore_client = datastore.Client.from_service_account_json(credentials_path) -spider_results_kind = 'spider-results' es_index_name = spider_results_kind @@ -27,16 +37,9 @@ def convert_datastore_datetime(field): return dt -def get_spider_results(client, newer_than=None): +def get_spider_results(client): query = client.query(kind=spider_results_kind, order=['-created']) - - - if newer_than is None: - print("Fetching all results from database") - else: - print("Fetching results from database newer than %s" % newer_than.isoformat()) - query.add_filter('created', '>', newer_than) for entity in query.fetch(eventual=True): created = convert_datastore_datetime(entity.get('created')) @@ -50,22 +53,22 @@ def get_spider_results(client, newer_than=None): 'rating': entity.get('rating'), } - -def last_updated(es): - """ - Returns the most recent created date from the - spider-results index - """ - res = es.search(index=es_index_name, - _source_include=['created'], - body={"query": {"match_all": {}}}, - sort='created:desc', - size=1) - - return res['hits']['hits'][0]['_source']['created'] - +def make_indexname(name_prefix): + """ + creates a timestamped index name + """ + return name_prefix + "-" + datetime.utcnow().strftime("%Y%m%d-%H%M%S") def main(): + # Set up logging + root = logging.getLogger() + root.setLevel(logging.INFO) + ch = logging.StreamHandler(sys.stdout) + ch.setLevel(logging.INFO) + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + ch.setFormatter(formatter) + root.addHandler(ch) + print("Connecting to elasticsearch:9200...") es = Elasticsearch([{'host': 'elasticsearch', 'port': 9200}]) es.cluster.health(wait_for_status='yellow', request_timeout=20) @@ -112,25 +115,54 @@ def main(): # Sometimes useful in development #es.indices.delete(index=es_index_name) - time_threshold = None + tempindex = make_indexname(es_index_name) - if not es.indices.exists(index=es_index_name): - es.indices.create(index=es_index_name, ignore=400) - es.indices.close(index=es_index_name) - es.indices.put_settings(index=es_index_name, body=settings) - es.indices.put_mapping(index=es_index_name, doc_type='result', body=mappings) - es.indices.open(index=es_index_name) - else: - time_threshold = last_updated(es) - time_threshold = parse(time_threshold) + # Create new index + es.indices.create(index=tempindex, ignore=400) + es.indices.close(index=tempindex) + es.indices.put_settings(index=tempindex, body=settings) + es.indices.put_mapping(index=tempindex, doc_type='result', body=mappings) + es.indices.open(index=tempindex) # Index database content + logging.info('Reading result documents from %s DB' % spider_results_kind) count = 0 - for doc in get_spider_results(datastore_client, newer_than=time_threshold): - es.index(index=es_index_name, doc_type='result', id=doc['url'], body=doc) + for doc in get_spider_results(datastore_client): + es.index(index=tempindex, doc_type='result', id=doc['url'], body=doc) count += 1 - print('Done indexing %s documents' % count) + logging.info('Indexed %s documents' % count) + + # Set our index alias to the new index, + # remove old index if existed, re-create alias. + if es.indices.exists_alias(name=es_index_name): + old_index = es.indices.get_alias(name=es_index_name) + + # here we assume there is only one index behind this alias + old_indices = list(old_index.keys()) + + if len(old_indices) > 0: + logging.info("Old index on alias is: %s" % old_indices[0]) + + try: + es.indices.delete_alias(index=old_indices[0], name=es_index_name) + except NotFoundError: + logging.error("Could not delete index alias for %s" % old_indices[0]) + pass + + try: + es.indices.delete(index=old_indices[0]) + except: + logging.error("Could not delete index %s" % old_indices[0]) + pass + + # Delete legacy index with same name as alias + if es.indices.exists(index=es_index_name): + logging.info("Deleting legacy index with name %s" % es_index_name) + es.indices.delete(index=es_index_name) + + logging.info("Setting alias '%s' to index '%s" % (es_index_name, tempindex)) + es.indices.put_alias(index=tempindex, name=es_index_name) if __name__ == "__main__":