Coverage for portality / dao.py: 69%
919 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
1from __future__ import annotations
3import json
4import os
5import re
6import sys
7import time
8import urllib.parse
9import uuid
10from collections import UserDict
11from copy import deepcopy
12from datetime import timedelta
13from typing import List, Iterable, Tuple
15import elasticsearch
16from elasticsearch.exceptions import ConnectionTimeout
18from portality.core import app, es_connection as ES
19from portality.lib import dates
20from portality.lib.dates import FMT_DATETIME_STD
22# All models in models.py should inherit this DomainObject to know how to save themselves in the index and so on.
23# You can overwrite and add to the DomainObject functions as required. See models.py for some examples.
26ES_MAPPING_MISSING_REGEX = re.compile(r'.*No mapping found for \[[a-zA-Z0-9-_\.]+?\] in order to sort on.*', re.DOTALL)
27CONTENT_TYPE_JSON = {'Content-Type': 'application/json'}
30class ElasticSearchWriteException(Exception):
31 pass
34class ScrollException(Exception):
35 pass
38class ScrollInitialiseException(ScrollException):
39 pass
42class ScrollTimeoutException(ScrollException):
43 pass
46class BulkException(Exception):
47 pass
50class DomainObject(UserDict, object):
51 """
52 ~~DomainObject:Model->Elasticsearch:Technology~~
54 base models class for Elasticsearch(ES) index,
55 which provide interaction with ES index such as (save, delete, query, etc.)
56 """
58 # set the type on the model that inherits this# which also is ES index name of the model
59 __type__ = None
61 def __init__(self, **kwargs):
62 # if self.data is already set, don't do anything here
63 try:
64 object.__getattribute__(self, "data")
65 except:
66 if '_source' in kwargs:
67 self.data = dict(kwargs['_source'])
68 self.meta = dict(kwargs)
69 del self.meta['_source']
70 else:
71 self.data = dict(kwargs)
72 # FIXME: calling super() breaks everything, even thought this is the correct thing to do
73 # this is because the DomainObject incorrectly overrides properties of the super class
74 # super(DomainObject, self).__init__()
76 @classmethod
77 def index_name(cls, override_index_name=None, **kwargs):
78 if app.config['ELASTIC_SEARCH_INDEX_PER_TYPE'] and cls.__type__ is not None:
79 if override_index_name is not None:
80 name = app.config['ELASTIC_SEARCH_DB_PREFIX'] + override_index_name
81 else:
82 name = ','.join([app.config['ELASTIC_SEARCH_DB_PREFIX'] + t for t in cls.__type__.split(',')])
83 else:
84 if override_index_name is not None:
85 name = override_index_name
86 else:
87 name = app.config['ELASTIC_SEARCH_DB']
88 return name
90 @classmethod
91 def doc_type(cls):
92 if app.config['ELASTIC_SEARCH_INDEX_PER_TYPE']:
93 return None
94 else:
95 return cls.__type__
97 @classmethod
98 def makeid(cls):
99 """Create a new id for data object overwrite this in specific model types if required"""
100 return str(uuid.uuid4().hex)
102 @property
103 def id(self):
104 rawid = self.data.get("id", None)
105 if rawid is not None:
106 return str(rawid)
107 return None
109 def set_id(self, id=None):
110 if id is None:
111 id = self.makeid()
112 self.data["id"] = str(id)
114 @property
115 def version(self):
116 return self.meta.get('_version', None)
118 @property
119 def json(self):
120 return json.dumps(self.data)
122 def set_created(self, date=None):
123 if date is None:
124 self.data['created_date'] = dates.now_str()
125 else:
126 self.data['created_date'] = date
128 @property
129 def created_date(self):
130 return self.data.get("created_date")
132 @property
133 def created_timestamp(self):
134 return dates.parse(self.data.get("created_date"))
136 @property
137 def last_updated(self):
138 return self.data.get("last_updated")
140 @property
141 def last_updated_timestamp(self):
142 lu = self.last_updated
143 if lu is None:
144 return None
145 return dates.parse(self.last_updated)
147 def pre_save_prep(self, blocking=False, differentiate=False, update_last_updated=True):
148 if 'id' not in self.data:
149 self.data['id'] = self.makeid()
151 self.data['es_type'] = self.__type__
153 now = dates.now_str()
154 if (blocking or differentiate) and "last_updated" in self.data:
155 diff = dates.now() - dates.parse(self.data["last_updated"])
157 # we need the new last_updated time to be later than the new one
158 if diff.total_seconds() < 1:
159 soon = dates.now() + timedelta(seconds=1)
160 now = soon.strftime(FMT_DATETIME_STD)
162 if update_last_updated:
163 self.data['last_updated'] = now
165 if 'created_date' not in self.data:
166 self.data['created_date'] = now
168 def save(self, retries=0, back_off_factor=1, differentiate=False, blocking=False, block_wait=0.25, update_last_updated=True):
169 """
170 ~~->ReadOnlyMode:Feature~~
171 :param retries:
172 :param back_off_factor:
173 :param differentiate:
174 :param blocking:
175 :return:
176 """
177 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False):
178 app.logger.warning("System is in READ-ONLY mode, save command cannot run")
179 return
181 if retries > app.config.get("ES_RETRY_HARD_LIMIT", 1000): # an arbitrary large number
182 retries = app.config.get("ES_RETRY_HARD_LIMIT", 1000)
184 if app.config.get("ES_BLOCK_WAIT_OVERRIDE") is not None:
185 block_wait = app.config["ES_BLOCK_WAIT_OVERRIDE"]
187 self.pre_save_prep(blocking=blocking, differentiate=differentiate, update_last_updated=update_last_updated)
188 now = self.data.get("last_updated", dates.now_str())
190 attempt = 0
191 d = json.dumps(self.data)
192 r = None
193 while attempt <= retries:
194 try:
195 r = ES.index(self.index_name(), d, doc_type=self.doc_type(),
196 id=self.data.get("id"),
197 headers=CONTENT_TYPE_JSON,
198 timeout=app.config.get('ES_READ_TIMEOUT', None), )
199 break
201 except (elasticsearch.ConnectionError, elasticsearch.ConnectionTimeout):
202 app.logger.exception("Failed to connect to ES")
203 attempt += 1
205 except elasticsearch.TransportError as e:
206 # Retries depend on which end the error lies.
207 if 400 <= e.status_code < 500:
208 # Bad request, do not retry as it won't work. Fail with ElasticSearchWriteException.
209 app.logger.exception("Bad Request to ES, save failed. Details: {0}".format(e.error))
210 raise ElasticSearchWriteException(e.error)
211 elif e.status_code >= 500:
212 # Server error, this could be temporary so we may want to retry
213 app.logger.exception("Server Error from ES, retrying. Details: {0}".format(e.error))
214 attempt += 1
215 except Exception as e:
216 # if any other exception occurs, make sure it's at least logged.
217 app.logger.exception("Unhandled exception in save method of DAO")
218 raise ElasticSearchWriteException(e)
220 # wait before retrying
221 time.sleep((2 ** attempt) * back_off_factor)
223 if attempt > retries:
224 raise DAOSaveExceptionMaxRetriesReached(
225 "After {attempts} attempts the record with "
226 "id {id} failed to save.".format(
227 attempts=attempt, id=self.data['id']))
229 if blocking:
230 self.__class__.block(self.id, last_updated=now, sleep=block_wait)
232 return r
234 def delete(self):
235 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False):
236 app.logger.warning("System is in READ-ONLY mode, delete command cannot run")
237 return
239 # r = requests.delete(self.target() + self.id)
240 try:
241 ES.delete(self.index_name(), self.id, doc_type=self.doc_type())
242 except elasticsearch.NotFoundError:
243 pass # This is to preserve the old behaviour
245 @staticmethod
246 def make_query(theq=None, should_terms=None, consistent_order=True, **kwargs):
247 """
248 Generate a query object based on parameters but don't send to
249 backend - return it instead. Must always have the same
250 parameters as the query method. See query method for explanation
251 of parameters.
252 """
253 if theq is None:
254 theq = ""
255 q = deepcopy(theq)
256 if isinstance(q, dict):
257 query = q
258 if 'bool' not in query['query']:
259 boolean = {'bool': {'must': []}}
260 boolean['bool']['must'].append(query['query'])
261 query['query'] = boolean
262 if 'must' not in query['query']['bool']:
263 query['query']['bool']['must'] = []
264 elif q:
265 query = {
266 'query': {
267 'bool': {
268 'must': [
269 {'query_string': {'query': q}}
270 ]
271 }
272 }
273 }
274 else:
275 query = {
276 'query': {
277 'bool': {
278 'must': [
279 {'match_all': {}}
280 ]
281 }
282 }
283 }
285 if should_terms is not None and len(should_terms) > 0:
286 for s in should_terms:
287 if not isinstance(should_terms[s], list):
288 should_terms[s] = [should_terms[s]]
289 query["query"]["bool"]["must"].append({"terms": {s: should_terms[s]}})
291 sort_specified = False
292 for k, v in kwargs.items():
293 if k == '_from':
294 query['from'] = v
295 elif k == 'sort':
296 sort_specified = True
297 query['sort'] = v
298 else:
299 query[k] = v
300 if "sort" in query:
301 sort_specified = True
303 if not sort_specified and consistent_order:
304 # FIXME: review this - where is default sort necessary, and why do we want this in ID order?
305 query['sort'] = [{"id.exact": {"order": "asc", "unmapped_type": "keyword"}}]
307 return query
309 @classmethod
310 def _unwrap_search_result(cls, res):
311 return [i.get("_source") if "_source" in i else i.get("fields") for i in
312 res.get('hits', {}).get('hits', [])]
314 @classmethod
315 def bulk_delete(cls, id_list, idkey='id', refresh=False):
316 return cls.bulk(documents=[{'id': i} for i in id_list], idkey=idkey, refresh=refresh, action='delete')
318 @classmethod
319 def bulk(cls, documents: List[dict], idkey='id', refresh=False, action='index', req_timeout=10, override_index_name=None, **kwargs):
320 """
321 :param documents: a list of objects to perform bulk actions on (list of dicts)
322 :param idkey: The path to extract an ID from the object, e.g. 'id', 'identifiers.id'
323 :param refresh: Refresh the index in each operation (make immediately available for search) - expensive!
324 :param req_timeout: Request timeout for bulk operation
325 :param kwargs: kwargs are passed into the bulk instruction for each record
326 """
327 # ~~->ReadOnlyMode:Feature~~
328 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False):
329 app.logger.warning("System is in READ-ONLY mode, bulk command cannot run")
330 return
332 if action not in ['index', 'update', 'delete']:
333 raise Exception("Unrecognised bulk action '{0}'".format(action))
335 data = ''
336 for d in documents:
337 data += cls.to_bulk_single_rec(d, idkey=idkey, action=action, **kwargs)
338 resp = ES.bulk(body=data, index=cls.index_name(override_index_name), doc_type=cls.doc_type(), refresh=refresh,
339 request_timeout=req_timeout)
340 return resp
342 @staticmethod
343 def to_bulk_single_rec(record, idkey="id", action="index", **kwargs):
344 """ Adapted from esprit. Create a bulk instruction from a single record. """
345 data = ''
346 idpath = idkey.split(".")
348 # traverse down the object in search of the supplied ID key
349 context = record
350 for pathseg in idpath:
351 if pathseg in context:
352 context = context[pathseg]
353 else:
354 raise BulkException(
355 "'{0}' not available in record to generate bulk _id: {1}".format(idkey, json.dumps(record)))
357 datadict = {action: {'_id': context}}
358 datadict[action].update(kwargs)
360 data += json.dumps(datadict) + '\n'
362 if action == 'delete':
363 return data
365 # For update, we wrap the document in {doc: document} if not already supplied
366 if action == 'update' and not (record.get('doc') and len(record.keys()) == 1):
367 data += json.dumps({'doc': record}) + '\n'
368 else:
369 data += json.dumps(record) + '\n'
370 return data
372 @classmethod
373 def refresh(cls):
374 """
375 ~~->ReadOnlyMode:Feature~~
376 :return:
377 """
378 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False):
379 app.logger.warning("System is in READ-ONLY mode, refresh command cannot run")
380 return
382 # r = requests.post(cls.target() + '_refresh', headers=CONTENT_TYPE_JSON)
383 # return r.json()
385 return ES.indices.refresh(index=cls.index_name())
387 @classmethod
388 def pull(cls, id_):
389 """Retrieve object by id."""
390 if id_ is None or id_ == '':
391 return None
393 try:
394 # out = requests.get(cls.target() + id_)
395 out = ES.get(cls.index_name(), id_, doc_type=cls.doc_type())
396 except elasticsearch.NotFoundError:
397 return None
398 except elasticsearch.TransportError as e:
399 raise Exception("ES returned an error: {x}".format(x=e.info))
400 except Exception as e:
401 app.logger.exception(f"Unexpected error in pull(): {e}")
402 return None
403 if out is None:
404 return None
406 return cls(**out)
408 @classmethod
409 def pull_by_key(cls, key, value):
410 res = cls.query(q={"query": {"term": {key + app.config['FACET_FIELD']: value}}})
411 if res.get('hits', {}).get('total', {}).get('value', 0) == 1:
412 return cls.pull(res['hits']['hits'][0]['_source']['id'])
413 else:
414 return None
416 @classmethod
417 def object_query(cls, q=None, **kwargs):
418 result = cls.query(q, **kwargs)
419 return [cls(**r.get("_source")) for r in result.get("hits", {}).get("hits", [])]
421 @classmethod
422 def query(cls, q=None, **kwargs):
423 """Perform a query on backend.
425 :param q: maps to query_string parameter if string, or query dict if dict.
426 :param kwargs: any keyword args as per
427 http://www.elasticsearch.org/guide/reference/api/search/uri-request.html
428 """
429 query = cls.make_query(q, **kwargs)
430 return cls.send_query(query)
432 @classmethod
433 def send_query(cls, qobj, retry=50, pit_query=False, **kwargs):
434 """Actually send a query object to the backend.
435 :param kwargs are passed directly to Elasticsearch search() function
436 """
438 if retry > app.config.get("ES_RETRY_HARD_LIMIT", 1000) + 1: # an arbitrary large number
439 retry = app.config.get("ES_RETRY_HARD_LIMIT", 1000) + 1
441 r = None
442 count = 0
443 exception = None
444 while count < retry:
445 count += 1
446 try:
447 # ES 7.10 updated target to whole index, since specifying type for search is deprecated
448 # r = requests.post(cls.target_whole_index() + recid + "_search", data=json.dumps(qobj), headers=CONTENT_TYPE_JSON)
449 if kwargs.get('timeout') is None:
450 kwargs['timeout'] = app.config.get('ES_READ_TIMEOUT', None)
451 index = cls.index_name() if pit_query is False else None
452 r = ES.search(body=json.dumps(qobj), index=index, doc_type=cls.doc_type(),
453 headers=CONTENT_TYPE_JSON, **kwargs)
454 break
455 except ConnectionTimeout as e:
456 continue # retry on timeout
457 except Exception as e:
458 try:
459 exception = ESMappingMissingError(e) if ES_MAPPING_MISSING_REGEX.match(json.dumps(e.args[2])) else e
460 if isinstance(exception, ESMappingMissingError):
461 raise exception
462 except TypeError:
463 raise e
465 time.sleep(0.5)
467 if r is not None:
468 return r
469 if exception is not None:
470 raise exception
471 raise Exception("Couldn't get the ES query endpoint to respond. Also, you shouldn't be seeing this.")
473 @classmethod
474 def remove_by_id(cls, id):
475 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False):
476 app.logger.warning("System is in READ-ONLY mode, delete_by_id command cannot run")
477 return
479 # r = requests.delete(cls.target() + id)
480 try:
481 ES.delete(cls.index_name(), id)
482 except elasticsearch.NotFoundError:
483 return
485 @classmethod
486 def delete_by_query(cls, query):
487 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False):
488 app.logger.warning("System is in READ-ONLY mode, delete_by_query command cannot run")
489 return
491 # r = requests.delete(cls.target() + "_query", data=json.dumps(query))
492 # return r
493 return ES.delete_by_query(cls.index_name(), json.dumps(query), doc_type=cls.doc_type())
495 @classmethod
496 def destroy_index(cls, **es_kwargs):
497 default_es_kwargs = {
498 'timeout': '5m',
499 'request_timeout': 1001,
500 }
501 default_es_kwargs.update(es_kwargs)
502 es_kwargs = default_es_kwargs
504 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False):
505 app.logger.warning("System is in READ-ONLY mode, destroy_index command cannot run")
506 return
508 # if app.config['ELASTIC_SEARCH_INDEX_PER_TYPE']:
509 # return esprit.raw.delete_index_by_prefix(es_connection, app.config['ELASTIC_SEARCH_DB_PREFIX'])
510 # else:
511 # return esprit.raw.delete_index(es_connection)
512 print('Destroying indexes with prefix ' + app.config['ELASTIC_SEARCH_DB_PREFIX'] + '*')
513 return ES.indices.delete(app.config['ELASTIC_SEARCH_DB_PREFIX'] + '*', **es_kwargs)
515 @classmethod
516 def check_es_raw_response(cls, res, extra_trace_info=''):
517 if 'error' in res:
518 es_resp = json.dumps(res, indent=2)
520 error_to_raise = ESMappingMissingError if ES_MAPPING_MISSING_REGEX.match(es_resp) else ESError
522 raise error_to_raise(
523 (
524 "Elasticsearch returned an error:"
525 "\nES HTTP Response status: {es_status}"
526 "\nES Response:{es_resp}"
527 .format(es_status=res.get('status', 'unknown'), es_resp=es_resp)
528 ) + extra_trace_info
529 )
531 if 'hits' not in res and 'hits' not in res['hits']: # i.e. if res['hits']['hits'] does not exist
532 raise ESResponseCannotBeInterpreted(
533 (
534 "Elasticsearch did not return any records. "
535 "It probably returned an error we did not understand instead."
536 "\nES HTTP Response status: {es_status}"
537 "\nES Response:{es_resp}\n"
538 .format(es_status=res.get('status', 'unknown'), es_resp=json.dumps(res, indent=2))
539 ) + extra_trace_info
540 )
541 return True
543 @classmethod
544 def handle_es_raw_response(cls, res, wrap, extra_trace_info=''):
545 """
546 Handles the JSON returned by ES, raising errors as needed. If no problems are detected it returns its input
547 unchanged.
549 :param res: The full ES raw response to a query in a Python dict (this method does not handle the raw JSON ES
550 outputs). Usually this parameter is the return value of the .query or .send_query methods.
551 :param wrap: Did the caller request wrapping of each ES record inside a model object? This matters for handling
552 records that have no '_source' or 'fields' keys, but do have an '_id' key. Such records should raise an error
553 if wrapping was requested, since there is nothing to wrap. If no wrapping was requested, perhaps the caller
554 simply needed the object IDs and nothing else, so we do not need to raise an error.
555 :param extra_trace_info: A string with additional diagnostic information to be put into exceptions.
556 """
558 cls.check_es_raw_response(res)
560 rs = []
561 for i, each in enumerate(res['hits']['hits']):
562 if '_source' in each:
563 rs.append(each['_source'])
564 elif 'fields' in each:
565 rs.append(each['fields'])
566 elif '_id' in each and not wrap:
567 # "_id" is a sibling (not child) of "_source" so it can only be used with unwrapped raw responses.
568 # wrap = True only makes sense if "_source" or "fields" were returned.
569 # So, if "_id" is the only one present, extract it into an object that's shaped the same as the item
570 # in the raw response.
571 rs.append({"_id": each['_id']})
572 else:
573 msg1 = "Can't find any useful data in the ES response.\n" + extra_trace_info
574 msg2 = "\nItem {i}.\nItem data:\n{each}".format(i=i, each=json.dumps(each, indent=2))
575 raise ESResponseCannotBeInterpreted(msg1 + msg2)
577 return rs
579 @classmethod
580 def iterate(cls, q: dict = None, page_size: int = 1000, limit: int = None, wrap: bool = True,
581 keepalive: str = '1m'):
582 """ Provide an iterable of all items in a model, use
583 :param q: The query to scroll results on
584 :param page_size: limited by ElasticSearch, check settings to override
585 :param limit: Limit the number of results returned (e.g. to take a slice)
586 :param wrap: Whether to return the results in raw json or wrapped as an object
587 :param keepalive: scroll timeout
589 TODO: this is the old method, we should evaluate and aim to make iterate_pit the default scroll method.
590 """
591 theq = {"query": {"match_all": {}}} if q is None else deepcopy(q)
592 theq["size"] = page_size
593 theq["from"] = 0
594 if "sort" not in theq:
595 # This gives the same performance enhancement as scan, use it by default. This is the order of indexing like sort by ID
596 theq["sort"] = ["_doc"]
598 # Initialise the scroll
599 try:
600 res = cls.send_query(theq, scroll=keepalive)
601 except Exception as e:
602 raise ScrollInitialiseException("Unable to initialise scroll - could be your mappings are broken", e)
604 # unpack scroll response
605 scroll_id = res.get('_scroll_id')
606 total_results = res.get('hits', {}).get('total', {}).get('value')
608 # Supply the first set of results
609 counter = 0
610 for r in cls.handle_es_raw_response(
611 res,
612 wrap=wrap,
613 extra_trace_info=
614 "\nScroll initialised:\n{q}\n"
615 "\n\nPage #{counter} of the ES response with size {page_size}."
616 .format(q=json.dumps(theq, indent=2), counter=counter, page_size=page_size)):
618 # apply the limit
619 if limit is not None and counter >= int(limit):
620 break
621 counter += 1
622 if wrap:
623 yield cls(**r)
624 else:
625 yield r
627 # Continue to scroll through the rest of the results
628 while True:
629 # apply the limit
630 if limit is not None and counter >= int(limit):
631 break
633 # if we consumed all the results we were expecting, we can just stop here
634 if counter >= total_results:
635 break
637 # get the next page and check that we haven't timed out
638 try:
639 res = ES.scroll(scroll_id=scroll_id, scroll=keepalive)
640 except elasticsearch.exceptions.NotFoundError as e:
641 raise ScrollTimeoutException(
642 "Scroll timed out; {status} - {message}".format(status=e.status_code, message=e.info))
643 except Exception as e:
644 # if any other exception occurs, make sure it's at least logged.
645 app.logger.exception("Unhandled exception in scroll method of DAO")
646 raise ScrollException(e)
648 # if we didn't get any results back, this means we're at the end
649 if len(res.get('hits', {}).get('hits')) == 0:
650 break
652 for r in cls.handle_es_raw_response(
653 res,
654 wrap=wrap,
655 extra_trace_info=
656 "\nScroll:\n{q}\n"
657 "\n\nPage #{counter} of the ES response with size {page_size}."
658 .format(q=json.dumps(theq, indent=2), counter=counter, page_size=page_size)):
660 # apply the limit
661 if limit is not None and counter >= int(limit):
662 break
663 counter += 1
664 if wrap:
665 yield cls(**r)
666 else:
667 yield r
669 @classmethod
670 def iterate_pit(cls, q: dict = None, page_size: int = 1000, limit: int = None, wrap: bool = True,
671 keepalive: str = '1m'):
672 """ Provide an iterable of all items in a model, reimplemented using point-in-time queries
673 :param q: The query to scroll results on
674 :param page_size: limited by ElasticSearch, check settings to override
675 :param limit: Limit the number of results returned (e.g. to take a slice)
676 :param wrap: Whether to return the results in raw json or wrapped as an object
677 :param keepalive: scroll timeout
678 """
679 theq = {"query": {"match_all": {}}} if q is None else deepcopy(q)
680 theq["size"] = page_size
681 theq["from"] = 0
682 if "sort" not in theq:
683 # This gives the same performance enhancement as scan, use it by default. This is the order of indexing like sort by ID
684 theq["sort"] = ["_doc"]
686 # Open a point in time query context
687 res = ES.open_point_in_time(index=cls.index_name(), keep_alive=keepalive)
688 pit_id = res.get("id")
690 theq["pit"] = {
691 "id": pit_id,
692 "keep_alive": keepalive
693 }
694 theq["track_total_hits"] = True
696 first_resp = cls.send_query(theq, pit_query=True)
697 if len(first_resp.get('hits', {}).get('hits', [])) == 0:
698 return
700 search_after = first_resp.get('hits', {}).get('hits', [])[-1].get('sort', [])
701 total_results = first_resp.get('hits', {}).get('total', {}).get('value')
703 # Supply the first set of results
704 counter = 0
705 for r in cls.handle_es_raw_response(
706 first_resp,
707 wrap=wrap,
708 extra_trace_info=
709 "\nPIT Initialised:\n{q}\n"
710 "\n\nPage #{counter} of the ES response with size {page_size}."
711 .format(q=json.dumps(theq, indent=2), counter=counter, page_size=page_size)):
713 # apply the limit
714 if limit is not None and counter >= int(limit):
715 break
716 counter += 1
717 if wrap:
718 yield cls(**r)
719 else:
720 yield r
722 del theq["track_total_hits"]
724 # Continue to scroll through the rest of the results
725 while True:
726 # apply the limit
727 if limit is not None and counter >= int(limit):
728 break
730 # if we consumed all the results we were expecting, we can just stop here
731 if counter >= total_results:
732 break
734 theq["search_after"] = search_after
736 # get the next page and check that we haven't timed out
737 try:
738 res = cls.send_query(theq, pit_query=True)
739 if len(res.get('hits', {}).get('hits', [])) == 0:
740 break
741 search_after = first_resp.get('hits', {}).get('hits', [])[-1].get('sort', [])
742 except elasticsearch.exceptions.NotFoundError as e:
743 raise ScrollTimeoutException(
744 "PIT timed out; {status} - {message}".format(status=e.status_code, message=e.info))
745 except Exception as e:
746 # if any other exception occurs, make sure it's at least logged.
747 app.logger.exception("Unhandled exception in iterate_pit method of DAO")
748 try:
749 ES.close_point_in_time({"id": pit_id})
750 except:
751 pass
752 raise ScrollException(e)
754 for r in cls.handle_es_raw_response(
755 res,
756 wrap=wrap,
757 extra_trace_info=
758 "\nPIT:\n{q}\n"
759 "\n\nPage #{counter} of the ES response with size {page_size}."
760 .format(q=json.dumps(theq, indent=2), counter=counter, page_size=page_size)):
762 # apply the limit
763 if limit is not None and counter >= int(limit):
764 break
765 counter += 1
766 if wrap:
767 yield cls(**r)
768 else:
769 yield r
771 ES.close_point_in_time({"id": pit_id})
773 @classmethod
774 def iterate_unstable(cls, q: dict = None, page_size: int = 1000, limit: int = None, wrap: bool = True, logger=None):
775 """ Provide an iterable of all items in a model, using search_after but with no scroll context or
776 PIT. This means that if the index changes as the iterate is happening, there may be repeated or
777 missed elements. This is useful for cases where the index is not changing during the iteration, or the
778 exact export is not important (e.g. for anon_export for testing purposes)
780 :param q: The query to scroll results on
781 :param page_size: limited by ElasticSearch, check settings to override
782 :param limit: Limit the number of results returned (e.g. to take a slice)
783 :param wrap: Whether to return the results in raw json or wrapped as an object
784 """
785 theq = {"query": {"match_all": {}}} if q is None else deepcopy(q)
786 theq["size"] = page_size
787 if "from" in theq:
788 del theq["from"]
790 if "sort" not in theq:
791 # This gives the same performance enhancement as scan, use it by default. This is the order of indexing like sort by ID
792 theq["sort"] = [{"_id": "desc"}]
794 theq["track_total_hits"] = True
796 # if logger: logger(json.dumps(theq))
798 first_resp = cls.send_query(theq)
799 if len(first_resp.get('hits', {}).get('hits', [])) == 0:
800 # if logger: logger("No results found")
801 return
803 search_after = first_resp.get('hits', {}).get('hits', [])[-1].get('sort', [])
804 total_results = first_resp.get('hits', {}).get('total', {}).get('value')
805 if logger: logger(f"Expecting total {total_results}")
807 # Supply the first set of results
808 counter = 0
809 for r in cls.handle_es_raw_response(
810 first_resp,
811 wrap=wrap,
812 extra_trace_info=
813 "\nUnstable Iterate Initialised:\n{q}\n"
814 "\n\nPage #{counter} of the ES response with size {page_size}."
815 .format(q=json.dumps(theq, indent=2), counter=counter, page_size=page_size)):
817 # apply the limit
818 if limit is not None and counter >= int(limit):
819 break
820 counter += 1
821 if wrap:
822 yield cls(**r)
823 else:
824 yield r
825 # if logger: logger(f"Iterated {counter} records")
827 del theq["track_total_hits"]
829 # Continue to scroll through the rest of the results
830 while True:
831 # apply the limit
832 if limit is not None and counter >= int(limit):
833 break
835 # if we consumed all the results we were expecting, we can just stop here
836 if counter >= total_results:
837 break
839 theq["search_after"] = search_after
840 # if logger: logger(json.dumps(theq))
841 # if logger: logger("search_after: " + str(search_after))
842 try:
843 res = cls.send_query(theq)
844 if len(res.get('hits', {}).get('hits', [])) == 0:
845 break
846 search_after = res.get('hits', {}).get('hits', [])[-1].get('sort', [])
847 except Exception as e:
848 # if any exception occurs, make sure it's at least logged.
849 app.logger.exception("Unhandled exception in iterate_unstable method of DAO")
850 if logger: logger(f"Iterate failed on {json.dumps(theq)}")
851 raise ScrollException(e)
853 for r in cls.handle_es_raw_response(
854 res,
855 wrap=wrap,
856 extra_trace_info=
857 "\nUnstable Iterate:\n{q}\n"
858 "\n\nPage #{counter} of the ES response with size {page_size}."
859 .format(q=json.dumps(theq, indent=2), counter=counter, page_size=page_size)):
861 # apply the limit
862 if limit is not None and counter >= int(limit):
863 break
864 counter += 1
865 if wrap:
866 yield cls(**r)
867 else:
868 yield r
870 # if logger: logger(f"Iterated {counter} records")
872 @classmethod
873 def iterall(cls, page_size=1000, limit=None, **kwargs):
874 # TODO: Another candidate for swapping to iterate_pit (or rename iterate_pit to iterate when we're happy)
875 return cls.iterate(MatchAllQuery().query(), page_size, limit, **kwargs)
877 @classmethod
878 def iterall_unstable(cls, page_size=1000,
879 stripe_field="id",
880 striped=False,
881 prefix_generator=None,
882 prefix_size=4,
883 limit=None,
884 logger=None,
885 must=None,
886 **kwargs):
887 def hex_prefixes(n=4):
888 """ Generate a list of hex prefixes of length n """
889 return [str(hex(i))[2:].zfill(n) for i in range(0, 16 ** n)]
891 if striped:
892 q = None
893 if must is not None:
894 q = {
895 "query": {
896 "bool": {
897 "must": must
898 }
899 }
900 }
901 total = cls.count(q)
902 if total == 0:
903 return
905 count = 0
906 prefixes = prefix_generator(prefix_size) if prefix_generator is not None else hex_prefixes(prefix_size)
907 empty_prefixes = []
908 for prefix in prefixes:
909 if must is None:
910 q = {
911 "query": {
912 "prefix": {stripe_field: prefix}
913 }
914 }
915 else:
916 q = {
917 "query": {
918 "bool": {
919 "must": must + [
920 {
921 "prefix": {stripe_field: prefix}
922 }
923 ]
924 }
925 }
926 }
928 first = True
929 for record in cls.iterate_unstable(q, page_size, limit=limit, logger=logger, **kwargs):
930 count += 1
931 if logger and first:
932 if len(empty_prefixes) > 0:
933 logger(f"Skipped empty prefixes: {empty_prefixes}")
934 empty_prefixes = []
935 logger(f"Exporting prefix: {prefix}")
936 first = False
937 if limit is not None:
938 if count > limit:
939 if logger: logger(f"Limit reached: {count} / {limit}")
940 return
941 yield record
943 if first:
944 empty_prefixes.append(prefix)
945 else:
946 if logger: logger(f"Finished prefix: {prefix}; {count} total records")
947 if len(empty_prefixes) > 0:
948 if logger: logger(f"Skipped empty prefixes: {empty_prefixes}")
949 else:
950 if logger: logger("Exporting without prefix striping")
951 for record in cls.iterate_unstable(q=None, page_size=page_size, limit=limit, logger=logger, **kwargs):
952 yield record
954 # Aliases for the iterate functions
955 scroll = iterate
956 scroll_pit = iterate_pit
958 @classmethod
959 def dump(cls, q=None, page_size=1000, limit=None, out=None, out_template=None, out_batch_sizes=100000,
960 out_rollover_callback=None, transform=None, es_bulk_format=True, idkey='id', es_bulk_fields=None,
961 stripe_field="id", striped=False, prefix_generator=None, prefix_size=3, logger=None):
962 """ Export to file, bulk format or just a json dump of the record """
964 filenames = []
965 n = 1
966 current_file = None
967 if out_template is not None:
968 current_file = out_template + "." + str(n)
969 filenames.append(current_file)
970 if out is None and current_file is not None:
971 out = open(current_file, "w")
972 else:
973 out = sys.stdout
975 count = 0
976 if q is None:
977 iterator = cls.iterall_unstable(page_size=page_size, stripe_field=stripe_field,
978 striped=striped, prefix_generator=prefix_generator,
979 prefix_size=prefix_size,
980 limit=limit, wrap=False, logger=logger)
981 else:
982 iterator = cls.iterate_unstable(q, page_size=page_size, limit=limit, wrap=False)
984 for record in iterator:
985 if transform is not None:
986 record = transform(record)
988 if es_bulk_format:
989 kwargs = {}
990 if es_bulk_fields is None:
991 es_bulk_fields = ["_id", "_index", "_type"]
992 for key in es_bulk_fields:
993 if key == "_id":
994 kwargs["idkey"] = idkey
995 if key == "_index":
996 kwargs["index"] = cls.index_name()
997 if key == "_type":
998 kwargs["type_"] = type
999 data = cls.to_bulk_single_rec(record, **kwargs)
1000 else:
1001 data = json.dumps(record) + "\n"
1003 out.write(data)
1004 if out_template is not None:
1005 count += 1
1006 if count > out_batch_sizes:
1007 out.close()
1008 if out_rollover_callback is not None:
1009 out_rollover_callback(current_file)
1011 count = 0
1012 n += 1
1013 current_file = out_template + "." + str(n)
1014 filenames.append(current_file)
1015 out = open(current_file, "w")
1017 if out_template is not None:
1018 out.close()
1019 if out_rollover_callback is not None:
1020 out_rollover_callback(current_file)
1022 return filenames
1024 @classmethod
1025 def bulk_load_from_file(cls, source_file, index=None, limit=None, max_content_length=100000000):
1026 """ ported from esprit.tasks - bulk load to index from file
1027 :param source_file
1028 :param index: index name for target
1029 :param limit: number of records to load (integer)
1030 :param max_content_length: Upload chunk size in bytes
1031 """
1032 index = index or cls.index_name()
1034 source_size = os.path.getsize(source_file)
1035 with open(source_file, "r") as f:
1036 if limit is None and source_size < max_content_length:
1037 # if we aren't selecting a portion of the file, and the file is below the max content length, then
1038 # we can just serve it directly
1039 ES.bulk(body=f.read(), index=index, doc_type=cls.doc_type(), request_timeout=120)
1040 return -1
1041 else:
1042 count = 0
1043 while True:
1044 chunk = DomainObject._make_next_chunk(f, max_content_length)
1045 if chunk == "":
1046 break
1048 finished = False
1049 if limit is not None:
1050 newlines = chunk.count("\n")
1051 records = newlines // 2
1052 if count + records > limit:
1053 max = (limit - count) * 2
1054 lines = chunk.split("\n")
1055 allowed = lines[:max]
1056 chunk = "\n".join(allowed) + "\n"
1057 count += max
1058 finished = True
1059 else:
1060 count += records
1062 ES.bulk(body=chunk, index=index, doc_type=cls.doc_type(), request_timeout=120)
1063 if finished:
1064 break
1065 if limit is not None:
1066 return count
1067 else:
1068 return -1
1070 @staticmethod
1071 def make_bulk_chunk_files(source_file, out_file_prefix, max_content_length=100000000):
1072 """ ported from esprit.tasks - break out a bulk file into smaller chunks """
1074 source_size = os.path.getsize(source_file)
1075 with open(source_file, "r") as f:
1076 if source_size < max_content_length:
1077 return [source_file]
1078 else:
1079 filenames = []
1080 count = 0
1081 while True:
1082 count += 1
1083 chunk = DomainObject._make_next_chunk(f, max_content_length)
1084 if chunk == "":
1085 break
1087 filename = out_file_prefix + "." + str(count)
1088 with open(filename, "w") as g:
1089 g.write(chunk)
1090 filenames.append(filename)
1092 return filenames
1094 @staticmethod
1095 def _make_next_chunk(f, max_content_length):
1096 """ ported from esprit.tasks - create a bulk chunk, ensuring it's not a partial instruction """
1098 def is_command(line):
1099 try:
1100 command = json.loads(line)
1101 except (json.JSONDecodeError, TypeError):
1102 return False
1103 keys = list(command.keys())
1104 if len(keys) > 1:
1105 return False
1106 if "index" not in keys:
1107 return False
1108 subkeys = list(command["index"].keys())
1109 for sk in subkeys:
1110 if sk not in ["_id"]:
1111 return False
1113 return True
1115 offset = f.tell()
1116 chunk = f.read(max_content_length)
1117 while True:
1118 last_newline = chunk.rfind("\n")
1119 tail = chunk[last_newline + 1:]
1120 chunk = chunk[:last_newline]
1122 if is_command(tail):
1123 f.seek(offset + last_newline)
1124 if chunk.startswith("\n"):
1125 chunk = chunk[1:]
1126 return chunk
1127 else:
1128 continue
1130 @classmethod
1131 def prefix_query(cls, field, prefix, filter_condition=None, size=5, facet_field=None, analyzed_field=True):
1132 # example of a prefix query
1133 # {
1134 # "query": {"prefix" : { "bibjson.publisher" : "ope" } },
1135 # "size": 0,
1136 # "facets" : {
1137 # "publisher" : { "terms" : {"field" : "bibjson.publisher.exact", "size": 5} }
1138 # }
1139 # }
1141 suffix = app.config['FACET_FIELD']
1142 query_field = field
1143 if analyzed_field:
1144 if field.endswith(suffix):
1145 # strip .exact (or whatever it's configured as) off the end
1146 query_field = field[:field.rfind(suffix)]
1147 else:
1148 if not field.endswith(suffix):
1149 query_field = field + suffix
1151 # the actual terms should come from the .exact version of the
1152 # field - we are suggesting whole values, not fragments
1153 if facet_field is None:
1154 facet_field = query_field + suffix
1155 if not facet_field.endswith(suffix):
1156 facet_field = facet_field + suffix
1158 q = PrefixAutocompleteQuery(query_field, prefix, field, facet_field, size, filter_condition)
1159 return cls.send_query(q.query())
1161 @classmethod
1162 def wildcard_autocomplete_query(cls, field, substring, before=True, after=True, facet_size=5, facet_field=None):
1163 """
1164 Example of a wildcard query
1165 Works only on .exact fields
1167 {
1168 "query" : {
1169 "wildcard" : {"bibjson.publisher.exact" : "De *"}
1170 },
1171 "size" : 0,
1172 "facets" : {
1173 "bibjson.publisher.exact" : {
1174 "terms" : {"field" : "bibjson.publisher.exact", "size" : 5}
1175 }
1176 }
1177 }
1178 :param field:
1179 :param substring:
1180 :param facet_size:
1181 :return:
1182 """
1183 # wildcard queries need to be on unanalyzed fields
1184 suffix = app.config['FACET_FIELD']
1185 filter_field = field
1186 if not filter_field.endswith(suffix):
1187 filter_field = filter_field + suffix
1189 # add the wildcard before/after
1190 if before:
1191 substring = "*" + substring
1192 if after:
1193 substring = substring + "*"
1195 # sort out the facet field
1196 if facet_field is None:
1197 facet_field = filter_field
1198 if not facet_field.endswith(suffix):
1199 facet_field = facet_field + suffix
1201 # build the query
1202 q = WildcardAutocompleteQuery(filter_field, substring, field, facet_field, facet_size)
1203 return cls.send_query(q.query())
1205 @classmethod
1206 def advanced_autocomplete(cls, filter_field, facet_field, substring, size=5, prefix_only=True):
1207 analyzed = True
1208 if " " in substring:
1209 analyzed = False
1211 substring = substring.lower()
1213 if " " in substring and not prefix_only:
1214 res = cls.wildcard_autocomplete_query(filter_field, substring, before=True, after=True, facet_size=size,
1215 facet_field=facet_field)
1216 else:
1217 res = cls.prefix_query(filter_field, substring, size=size, facet_field=facet_field, analyzed_field=analyzed)
1219 result = []
1220 for term in res['aggregations'][filter_field]['buckets']:
1221 # keep ordering - it's by count by default, so most frequent
1222 # terms will now go to the front of the result list
1223 result.append({"id": term['key'], "text": term['key']})
1224 return result
1226 @classmethod
1227 def autocomplete(cls, field, prefix, filter_condition=None, size=5):
1228 res = None
1229 # if there is a space in the prefix, the prefix query won't work, so we fall back to a wildcard
1230 # we only do this if we have to, because the wildcard query is a little expensive
1231 if " " in prefix:
1232 res = cls.wildcard_autocomplete_query(field, prefix, before=False, after=True, facet_size=size)
1233 else:
1234 prefix = prefix.lower()
1235 res = cls.prefix_query(field, prefix, filter_condition, size=size)
1237 result = []
1238 for term in res['aggregations'][field]['buckets']:
1239 # keep ordering - it's by count by default, so most frequent
1240 # terms will now go to the front of the result list
1241 result.append({"id": term['key'], "text": term['key']})
1242 return result
1244 @classmethod
1245 def q2obj(cls, **kwargs):
1246 extra_trace_info = ''
1247 if 'q' in kwargs:
1248 extra_trace_info = "\nQuery sent to ES (before manipulation in DomainObject.query):\n{}\n".format(
1249 json.dumps(kwargs['q'], indent=2))
1251 res = cls.query(**kwargs)
1252 rs = cls.handle_es_raw_response(res, wrap=True, extra_trace_info=extra_trace_info)
1253 results = [cls(**r) for r in rs]
1254 return results
1256 @classmethod
1257 def all(cls, size=10000, **kwargs):
1258 """ This is a shortcut to a match_all query with a large size, to return all records """
1259 # FIXME: is this only used in tests? ES now limits size so we can't guarantee ALL without using scroll / scan
1260 return cls.q2obj(size=size, **kwargs)
1262 @classmethod
1263 def count(cls, query=None):
1264 res = ES.count(index=cls.index_name(), doc_type=cls.doc_type(), body=query)
1265 return res.get("count")
1266 # return requests.get(cls.target() + '_count').json()['count']
1268 @classmethod
1269 def hit_count(cls, query, **kwargs) -> int:
1270 countable_query = deepcopy(query)
1271 if "track_total_hits" not in countable_query:
1272 countable_query["track_total_hits"] = True
1274 res = cls.query(q=countable_query, **kwargs)
1275 return res.get("hits", {}).get("total", {}).get("value", 0)
1277 @classmethod
1278 def count_updated_since(cls, last_update):
1279 """
1280 Count the number of records updated since a given date
1281 :param last_update: The date to count from
1282 :return: The number of records updated since the given date
1283 """
1284 q = {
1285 "query": {
1286 "range": {
1287 "last_updated": {
1288 "gte": last_update
1289 }
1290 }
1291 }
1292 }
1293 return cls.count(q)
1295 @classmethod
1296 def block(cls, id, last_updated=None, sleep=0.5, max_retry_seconds=30):
1297 if app.config.get("ES_BLOCK_WAIT_OVERRIDE") is not None:
1298 sleep = app.config["ES_BLOCK_WAIT_OVERRIDE"]
1300 q = BlockQuery(id)
1301 start_time = dates.now()
1302 while True:
1303 res = cls.query(q=q.query())
1304 hits = res.get("hits", {}).get("hits", [])
1305 if len(hits) > 0:
1306 if len(hits) > 1:
1307 raise Exception("More than one record with id {x}".format(x=id))
1308 obj = hits[0].get("_source", {})
1309 if last_updated is not None:
1310 lu = obj.get("last_updated")
1311 if lu:
1312 threshold = dates.parse(last_updated)
1313 lud = dates.parse(lu)
1314 if lud >= threshold:
1315 return
1316 else:
1317 return
1319 if (dates.now() - start_time).total_seconds() >= max_retry_seconds:
1320 raise (BlockTimeOutException(
1321 "Attempting to block until record with id {id} appears in the index, but this has not happened after {limit} seconds"
1322 .format(
1323 id=id, limit=max_retry_seconds)))
1325 time.sleep(sleep)
1327 @classmethod
1328 def blockall(cls, ids_and_last_updateds, sleep=0.05, individual_max_retry_seconds=30):
1329 for id, lu in ids_and_last_updateds:
1330 cls.block(id, lu, sleep=sleep, max_retry_seconds=individual_max_retry_seconds)
1332 @classmethod
1333 def blockdeleted(cls, id, sleep=0.5, max_retry_seconds=30):
1334 if app.config.get("ES_BLOCK_WAIT_OVERRIDE") is not None:
1335 sleep = app.config["ES_BLOCK_WAIT_OVERRIDE"]
1337 q = BlockQuery(id)
1338 start_time = dates.now()
1339 while True:
1340 res = cls.query(q=q.query())
1341 hits = res.get("hits", {}).get("hits", [])
1342 if len(hits) == 0:
1343 return
1344 else:
1345 if (dates.now() - start_time).total_seconds() >= max_retry_seconds:
1346 raise BlockTimeOutException(
1347 "Attempting to block until record with id {id} deleted from Elasticsearch, but this has not happened after {limit}".format(
1348 id=id, limit=max_retry_seconds))
1350 time.sleep(sleep)
1352 @classmethod
1353 def blockalldeleted(cls, ids, sleep=0.05, individual_max_retry_seconds=30):
1354 for id in ids:
1355 cls.blockdeleted(id, sleep, individual_max_retry_seconds)
1357 @classmethod
1358 def save_all(cls, models, blocking=False):
1359 for m in models:
1360 m.save()
1361 if blocking:
1362 cls.blockall((m.id, getattr(m, "last_updated", None)) for m in models)
1364 @classmethod
1365 def create_and_seed_index_and_rollover_alias(cls, documents: List[dict], mapping=None, keep_history=0):
1366 """
1367 Create a new index based on this object's type, put the mapping if provided, and seed it with the documents.
1368 The indexes alias is then repointed to the new index.
1369 If a keep_history value is provided, the previous indexes with the same prefix as the alias will be deleted,
1371 :param documents:
1372 :param mapping:
1373 :param keep_history: supply -1 to keep all history, 0 to keep no history, or a positive integer to keep that many previous indexes.
1374 :return:
1375 """
1376 full_alias = cls.index_name()
1377 new_name = cls.__type__ + "-" + dates.now_str(dates.FMT_DATETIME_LONG)
1378 if mapping is not None:
1379 cls.put_mapping(mapping, override_index_name=new_name)
1380 resp = cls.bulk(documents, refresh=True, override_index_name=new_name)
1381 if resp.get('errors', False):
1382 raise ESError("Error creating index {}: {}".format(new_name, resp))
1383 cls.move_alias(full_alias, new_name)
1385 if keep_history > -1:
1386 # Find all indexes with the same prefix as full_alias
1387 all_indexes = find_indexes_by_prefix(full_alias)
1388 all_indexes.sort()
1389 keep_history += 1
1390 for idx in all_indexes[:-keep_history]:
1391 try:
1392 ES.indices.delete(index=idx)
1393 except elasticsearch.exceptions.NotFoundError:
1394 pass
1395 except elasticsearch.exceptions.RequestError as e:
1396 raise ESError(e)
1399 @classmethod
1400 def put_mapping(cls, mapping: dict, override_index_name: str = None):
1401 """
1402 Put a mapping for the index
1403 :param mapping: The mapping to put. If None then use cls.mapping()
1404 """
1405 try:
1406 if "mappings" not in mapping:
1407 mapping = {"mappings": mapping}
1408 return ES.indices.create(index=cls.index_name(override_index_name),
1409 body=mapping,
1410 request_timeout=app.config.get("ES_SOCKET_TIMEOUT", None))
1411 except elasticsearch.exceptions.RequestError as e:
1412 raise ESError(e)
1414 @classmethod
1415 def move_alias(cls, alias_name: str, target_index: str):
1416 """
1417 Create or update an alias for the index
1418 :param alias_name: The name of the alias to create or update
1419 :param target_index: The name of the index to point the alias to
1420 """
1422 try:
1423 alias_info = ES.indices.get_alias(name=alias_name)
1424 old_index_name = list(alias_info.keys())[0]
1425 full_index_name = cls.index_name(target_index)
1426 body = {
1427 "actions": [
1428 {
1429 "add": {
1430 "index": full_index_name,
1431 "alias": alias_name
1432 }
1433 },
1434 {
1435 "remove": {
1436 "index": old_index_name,
1437 "alias": alias_name
1438 }
1439 }
1440 ]
1441 }
1442 ES.indices.update_aliases(body=body)
1443 except elasticsearch.exceptions.RequestError as e:
1444 raise ESError(e)
1449def any_pending_tasks():
1450 """ Check if there are any pending tasks in the elasticsearch task queue """
1451 results = ES.cluster.pending_tasks()
1452 return len(results["tasks"]) > 0
1455def query_data_tasks(timeout='30s'):
1456 """ Check if there are any pending tasks in the elasticsearch task queue """
1457 results = ES.tasks.list(params={
1458 "actions": 'indices:data*',
1459 "timeout": timeout,
1460 "wait_for_completion": 'true',
1461 })
1462 tasks = []
1463 for node in results['nodes'].values():
1464 tasks.extend(node['tasks'].values())
1465 return tasks
1468def refresh():
1469 """
1470 refresh all indexes to make newly added or deleted documents immediately searchable
1471 """
1472 return ES.indices.refresh()
1475def find_indexes_by_prefix(index_prefix) -> list[str]:
1476 data = ES.indices.get(f'{index_prefix}*')
1477 return list(data.keys())
1480def find_index_aliases(alias_prefixes=None) -> Iterable[Tuple[str, str]]:
1481 def _yield_index_alias():
1482 data = ES.indices.get_alias()
1483 for index, d in data.items():
1484 for alias in d['aliases'].keys():
1485 yield index, alias
1487 index_aliases = _yield_index_alias()
1488 if alias_prefixes:
1489 index_aliases = ((index, alias) for index, alias in index_aliases
1490 if any(alias.startswith(p) for p in alias_prefixes))
1491 return index_aliases
1494def is_exist(query: dict, index):
1495 query['size'] = 1
1496 query['_source'] = False
1497 res = ES.search(body=query, index=index, size=1, ignore=[404])
1499 return res.get('hits', {}).get('total',{}).get('value', 0) > 0
1502class BlockTimeOutException(Exception):
1503 pass
1506class DAOSaveExceptionMaxRetriesReached(Exception):
1507 pass
1510class ESResponseCannotBeInterpreted(Exception):
1511 pass
1514class ESMappingMissingError(Exception):
1515 pass
1518class ESError(Exception):
1519 pass
1522########################################################################
1523# Some useful ES queries
1524########################################################################
1527class MatchAllQuery(object):
1528 def query(self):
1529 return {
1530 "track_total_hits": True,
1531 "query": {
1532 "match_all": {}
1533 }
1534 }
1537class BlockQuery(object):
1538 def __init__(self, id):
1539 self._id = id
1541 def query(self):
1542 return {
1543 "query": {
1544 "ids": {
1545 "values": [self._id]
1546 }
1547 },
1548 "_source": ["last_updated"],
1549 "size": 2
1550 }
1553class PrefixAutocompleteQuery(object):
1554 def __init__(self, query_field, prefix, agg_name, agg_field, agg_size, filter_condition=None):
1555 self._query_field = query_field
1556 self._prefix = prefix
1557 self._agg_name = agg_name
1558 self._agg_field = agg_field
1559 self._agg_size = agg_size
1560 self._filter_condition = filter_condition
1561 def query(self):
1562 query_body = {
1563 "track_total_hits": True,
1564 "query": {
1565 "bool": {
1566 "must": [
1567 {"prefix": {self._query_field: self._prefix.lower()}} # Keep the prefix query
1568 ]
1569 }
1570 },
1571 "size": 0,
1572 "aggs": {
1573 self._agg_name: {"terms": {"field": self._agg_field, "size": self._agg_size}}
1574 }
1575 }
1577 if self._filter_condition:
1578 query_body["query"]["bool"]["filter"] = [
1579 {"term": self._filter_condition}
1580 ]
1582 return query_body
1586class WildcardAutocompleteQuery(object):
1587 def __init__(self, wildcard_field, wildcard_query, agg_name, agg_field, agg_size):
1588 self._wildcard_field = wildcard_field
1589 self._wildcard_query = wildcard_query
1590 self._agg_name = agg_name
1591 self._agg_field = agg_field
1592 self._agg_size = agg_size
1594 def query(self):
1595 return {
1596 "track_total_hits": True,
1597 "query": {
1598 "wildcard": {self._wildcard_field: self._wildcard_query}
1599 },
1600 "size": 0,
1601 "aggs": {
1602 self._agg_name: {
1603 "terms": {"field": self._agg_field, "size": self._agg_size}
1604 }
1605 }
1606 }
1609#########################################################################
1610# A query handler that knows how to speak facetview2
1611#########################################################################
1614class Facetview2(object):
1615 """
1616 ~~SearchURLGenerator:Feature->Elasticsearch:Technology~~
1618 # Examples of queries
1619 # {"query":{"filtered":{"filter":{"bool":{"must":[{"term":{"_type":"article"}}]}},"query":{"query_string":{"query":"richard","default_operator":"OR"}}}},"from":0,"size":10}
1620 # {"query":{"query_string":{"query":"richard","default_operator":"OR"}},"from":0,"size":10}
1621 """
1622 @staticmethod
1623 def make_term_filter(term, value):
1624 return {"term": {term: value}}
1626 @staticmethod
1627 def make_query(query_string=None, filters=None, default_operator="OR", sort_parameter=None, sort_order="asc",
1628 default_field=None):
1629 query_part = {"match_all": {}}
1630 if query_string is not None:
1631 query_part = {"query_string": {"query": query_string, "default_operator": default_operator}}
1632 if default_field is not None:
1633 query_part["query_string"]["default_field"] = default_field
1634 query = {"query": query_part}
1636 if filters is not None:
1637 if not isinstance(filters, list):
1638 filters = [filters]
1639 filters.append(query_part)
1640 bool_part = {"bool": {"must": filters}}
1641 query = {"query": query_part}
1643 if sort_parameter is not None:
1644 # For facetview we can only have one sort parameter, but ES actually supports lists
1645 sort_part = [{sort_parameter: {"order": sort_order}}]
1646 query["sort"] = sort_part
1648 return query
1650 @staticmethod
1651 def url_encode_query(query):
1652 return urllib.parse.quote(json.dumps(query).replace(' ', ''))
1655def patch_model_for_bulk(obj: DomainObject):
1656 obj.data['es_type'] = obj.__type__
1657 obj.data['id'] = obj.makeid()
1658 return obj