Coverage for portality/dao.py: 80%
596 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-08-04 15:38 +0100
« prev ^ index » next coverage.py v6.4.2, created at 2022-08-04 15:38 +0100
1import time
2import re
3import sys
4import uuid
5import json
6import elasticsearch
7import urllib.parse
9from collections import UserDict
10from copy import deepcopy
11from datetime import datetime, timedelta
12from typing import List
14from portality.core import app, es_connection as ES
17# All models in models.py should inherit this DomainObject to know how to save themselves in the index and so on.
18# You can overwrite and add to the DomainObject functions as required. See models.py for some examples.
21ES_MAPPING_MISSING_REGEX = re.compile(r'.*No mapping found for \[[a-zA-Z0-9-_\.]+?\] in order to sort on.*', re.DOTALL)
22CONTENT_TYPE_JSON = {'Content-Type': 'application/json'}
25class ElasticSearchWriteException(Exception):
26 pass
29class ScrollException(Exception):
30 pass
33class ScrollInitialiseException(ScrollException):
34 pass
37class ScrollTimeoutException(ScrollException):
38 pass
41class BulkException(Exception):
42 pass
45class DomainObject(UserDict, object):
46 """
47 ~~DomainObject:Model->Elasticsearch:Technology~~
48 """
49 __type__ = None # set the type on the model that inherits this
51 def __init__(self, **kwargs):
52 # if self.data is already set, don't do anything here
53 try:
54 object.__getattribute__(self, "data")
55 except:
56 if '_source' in kwargs:
57 self.data = dict(kwargs['_source'])
58 self.meta = dict(kwargs)
59 del self.meta['_source']
60 else:
61 self.data = dict(kwargs)
62 # FIXME: calling super() breaks everything, even thought this is the correct thing to do
63 # this is because the DomainObject incorrectly overrides properties of the super class
64 # super(DomainObject, self).__init__()
66 @classmethod
67 def index_name(cls):
68 if app.config['ELASTIC_SEARCH_INDEX_PER_TYPE'] and cls.__type__ is not None:
69 name = ','.join([app.config['ELASTIC_SEARCH_DB_PREFIX'] + t for t in cls.__type__.split(',')])
70 else:
71 name = app.config['ELASTIC_SEARCH_DB']
72 return name
74 @classmethod
75 def doc_type(cls):
76 if app.config['ELASTIC_SEARCH_INDEX_PER_TYPE']:
77 return None
78 else:
79 return cls.__type__
81 @classmethod
82 def makeid(cls):
83 """Create a new id for data object overwrite this in specific model types if required"""
84 return str(uuid.uuid4().hex)
86 @property
87 def id(self):
88 rawid = self.data.get("id", None)
89 if rawid is not None:
90 return str(rawid)
91 return None
93 def set_id(self, id=None):
94 if id is None:
95 id = self.makeid()
96 self.data["id"] = str(id)
98 @property
99 def version(self):
100 return self.meta.get('_version', None)
102 @property
103 def json(self):
104 return json.dumps(self.data)
106 def set_created(self, date=None):
107 if date is None:
108 self.data['created_date'] = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
109 else:
110 self.data['created_date'] = date
112 @property
113 def created_date(self):
114 return self.data.get("created_date")
116 @property
117 def created_timestamp(self):
118 return datetime.strptime(self.data.get("created_date"), "%Y-%m-%dT%H:%M:%SZ")
120 @property
121 def last_updated(self):
122 return self.data.get("last_updated")
124 @property
125 def last_updated_timestamp(self):
126 return datetime.strptime(self.last_updated, "%Y-%m-%dT%H:%M:%SZ")
128 def save(self, retries=0, back_off_factor=1, differentiate=False, blocking=False, block_wait=0.25):
129 """
130 ~~->ReadOnlyMode:Feature~~
131 :param retries:
132 :param back_off_factor:
133 :param differentiate:
134 :param blocking:
135 :return:
136 """
137 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False):
138 app.logger.warn("System is in READ-ONLY mode, save command cannot run")
139 return
141 if retries > app.config.get("ES_RETRY_HARD_LIMIT", 1000): # an arbitrary large number
142 retries = app.config.get("ES_RETRY_HARD_LIMIT", 1000)
144 if app.config.get("ES_BLOCK_WAIT_OVERRIDE") is not None:
145 block_wait = app.config["ES_BLOCK_WAIT_OVERRIDE"]
147 if 'id' not in self.data:
148 self.data['id'] = self.makeid()
150 self.data['es_type'] = self.__type__
152 now = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
153 if (blocking or differentiate) and "last_updated" in self.data:
154 diff = datetime.now() - datetime.strptime(self.data["last_updated"], "%Y-%m-%dT%H:%M:%SZ")
156 # we need the new last_updated time to be later than the new one
157 if diff.total_seconds() < 1:
158 soon = datetime.utcnow() + timedelta(seconds=1)
159 now = soon.strftime("%Y-%m-%dT%H:%M:%SZ")
161 self.data['last_updated'] = now
163 if 'created_date' not in self.data:
164 self.data['created_date'] = now
166 attempt = 0
167 d = json.dumps(self.data)
168 r = None
169 while attempt <= retries:
170 try:
171 r = ES.index(self.index_name(), d, doc_type=self.doc_type(), id=self.data.get("id"), headers=CONTENT_TYPE_JSON)
172 break
174 except (elasticsearch.ConnectionError, elasticsearch.ConnectionTimeout):
175 app.logger.exception("Failed to connect to ES")
176 attempt += 1
178 except elasticsearch.TransportError as e:
179 # Retries depend on which end the error lies.
180 if 400 <= e.status_code < 500:
181 # Bad request, do not retry as it won't work. Fail with ElasticSearchWriteException.
182 app.logger.exception("Bad Request to ES, save failed. Details: {0}".format(e.error))
183 raise ElasticSearchWriteException(e.error)
184 elif e.status_code >= 500:
185 # Server error, this could be temporary so we may want to retry
186 app.logger.exception("Server Error from ES, retrying. Details: {0}".format(e.error))
187 attempt += 1
188 except Exception as e:
189 # if any other exception occurs, make sure it's at least logged.
190 app.logger.exception("Unhandled exception in save method of DAO")
191 raise ElasticSearchWriteException(e)
193 # wait before retrying
194 time.sleep((2**attempt) * back_off_factor)
196 if attempt > retries:
197 raise DAOSaveExceptionMaxRetriesReached(
198 "After {attempts} attempts the record with "
199 "id {id} failed to save.".format(
200 attempts=attempt, id=self.data['id']))
202 if blocking:
203 bq = BlockQuery(self.id)
204 while True:
205 res = self.query(q=bq.query())
206 j = self._unwrap_search_result(res)
207 if len(j) == 0:
208 time.sleep(block_wait)
209 continue
210 if len(j) > 1:
211 raise Exception("More than one record with id {x}".format(x=self.id))
212 if j[0].get("last_updated", [])[0] == now:
213 break
214 else:
215 time.sleep(block_wait)
216 continue
218 return r
220 def delete(self):
221 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False):
222 app.logger.warn("System is in READ-ONLY mode, delete command cannot run")
223 return
225 # r = requests.delete(self.target() + self.id)
226 try:
227 ES.delete(self.index_name(), self.id, doc_type=self.doc_type())
228 except elasticsearch.NotFoundError:
229 pass # This is to preserve the old behaviour
231 @staticmethod
232 def make_query(theq=None, should_terms=None, consistent_order=True, **kwargs):
233 """
234 Generate a query object based on parameters but don't send to
235 backend - return it instead. Must always have the same
236 parameters as the query method. See query method for explanation
237 of parameters.
238 """
239 if theq is None:
240 theq = ""
241 q = deepcopy(theq)
242 if isinstance(q, dict):
243 query = q
244 if 'bool' not in query['query']:
245 boolean = {'bool': {'must': []}}
246 boolean['bool']['must'].append(query['query'])
247 query['query'] = boolean
248 if 'must' not in query['query']['bool']:
249 query['query']['bool']['must'] = []
250 elif q:
251 query = {
252 'query': {
253 'bool': {
254 'must': [
255 {'query_string': {'query': q}}
256 ]
257 }
258 }
259 }
260 else:
261 query = {
262 'query': {
263 'bool': {
264 'must': [
265 {'match_all': {}}
266 ]
267 }
268 }
269 }
271 if should_terms is not None and len(should_terms) > 0:
272 for s in should_terms:
273 if not isinstance(should_terms[s], list):
274 should_terms[s] = [should_terms[s]]
275 query["query"]["bool"]["must"].append({"terms": {s: should_terms[s]}})
277 sort_specified = False
278 for k, v in kwargs.items():
279 if k == '_from':
280 query['from'] = v
281 elif k == 'sort':
282 sort_specified = True
283 query['sort'] = v
284 else:
285 query[k] = v
286 if "sort" in query:
287 sort_specified = True
289 if not sort_specified and consistent_order:
290 # FIXME: review this - where is default sort necessary, and why do we want this in ID order?
291 query['sort'] = [{"id.exact": {"order": "asc", "unmapped_type": "keyword"}}]
293 return query
295 @classmethod
296 def _unwrap_search_result(cls, res):
297 return [i.get("_source") if "_source" in i else i.get("fields") for i in
298 res.get('hits', {}).get('hits', [])]
300 @classmethod
301 def bulk_delete(cls, id_list, idkey='id', refresh=False):
302 return cls.bulk(documents=[{'id': i} for i in id_list], idkey=idkey, refresh=refresh, action='delete')
304 @classmethod
305 def bulk(cls, documents: List[dict], idkey='id', refresh=False, action='index', req_timeout=10, **kwargs):
306 """
307 :param documents: a list of objects to perform bulk actions on (list of dicts)
308 :param idkey: The path to extract an ID from the object, e.g. 'id', 'identifiers.id'
309 :param refresh: Refresh the index in each operation (make immediately available for search) - expensive!
310 :param req_timeout: Request timeout for bulk operation
311 :param kwargs: kwargs are passed into the bulk instruction for each record
312 """
313 # ~~->ReadOnlyMode:Feature~~
314 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False):
315 app.logger.warn("System is in READ-ONLY mode, bulk command cannot run")
316 return
318 if action not in ['index', 'update', 'delete']:
319 raise Exception("Unrecognised bulk action '{0}'".format(action))
321 data = ''
322 for d in documents:
323 data += cls.to_bulk_single_rec(d, idkey=idkey, action=action, **kwargs)
324 resp = ES.bulk(body=data, index=cls.index_name(), doc_type=cls.doc_type(), refresh=refresh,
325 request_timeout=req_timeout)
326 return resp
328 @staticmethod
329 def to_bulk_single_rec(record, idkey="id", action="index", **kwargs):
330 """ Adapted from esprit. Create a bulk instruction from a single record. """
331 data = ''
332 idpath = idkey.split(".")
334 # traverse down the object in search of the supplied ID key
335 context = record
336 for pathseg in idpath:
337 if pathseg in context:
338 context = context[pathseg]
339 else:
340 raise BulkException(
341 "'{0}' not available in record to generate bulk _id: {1}".format(idkey, json.dumps(record)))
343 datadict = {action: {'_id': context}}
344 datadict[action].update(kwargs)
346 data += json.dumps(datadict) + '\n'
348 if action == 'delete':
349 return data
351 # For update, we wrap the document in {doc: document} if not already supplied
352 if action == 'update' and not (record.get('doc') and len(record.keys()) == 1):
353 data += json.dumps({'doc': record}) + '\n'
354 else:
355 data += json.dumps(record) + '\n'
356 return data
358 @classmethod
359 def refresh(cls):
360 """
361 ~~->ReadOnlyMode:Feature~~
362 :return:
363 """
364 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False):
365 app.logger.warn("System is in READ-ONLY mode, refresh command cannot run")
366 return
368 # r = requests.post(cls.target() + '_refresh', headers=CONTENT_TYPE_JSON)
369 # return r.json()
371 return ES.indices.refresh(index=cls.index_name())
373 @classmethod
374 def pull(cls, id_):
375 """Retrieve object by id."""
376 if id_ is None or id_ == '':
377 return None
379 try:
380 # out = requests.get(cls.target() + id_)
381 out = ES.get(cls.index_name(), id_, doc_type=cls.doc_type())
382 except elasticsearch.NotFoundError:
383 return None
384 except elasticsearch.TransportError as e:
385 raise Exception("ES returned an error: {x}".format(x=e.info))
386 except Exception as e:
387 return None
388 if out is None:
389 return None
391 return cls(**out)
393 @classmethod
394 def pull_by_key(cls, key, value):
395 res = cls.query(q={"query": {"term": {key+app.config['FACET_FIELD']: value}}})
396 if res.get('hits', {}).get('total', {}).get('value', 0) == 1:
397 return cls.pull(res['hits']['hits'][0]['_source']['id'])
398 else:
399 return None
401 @classmethod
402 def object_query(cls, q=None, **kwargs):
403 result = cls.query(q, **kwargs)
404 return [cls(**r.get("_source")) for r in result.get("hits", {}).get("hits", [])]
406 @classmethod
407 def query(cls, q=None, **kwargs):
408 """Perform a query on backend.
410 :param q: maps to query_string parameter if string, or query dict if dict.
411 :param kwargs: any keyword args as per
412 http://www.elasticsearch.org/guide/reference/api/search/uri-request.html
413 """
414 query = cls.make_query(q, **kwargs)
415 return cls.send_query(query)
417 @classmethod
418 def send_query(cls, qobj, retry=50, **kwargs):
419 """Actually send a query object to the backend.
420 :param kwargs are passed directly to Elasticsearch search() function
421 """
423 if retry > app.config.get("ES_RETRY_HARD_LIMIT", 1000) + 1: # an arbitrary large number
424 retry = app.config.get("ES_RETRY_HARD_LIMIT", 1000) + 1
426 r = None
427 count = 0
428 exception = None
429 while count < retry:
430 count += 1
431 try:
432 # ES 7.10 updated target to whole index, since specifying type for search is deprecated
433 # r = requests.post(cls.target_whole_index() + recid + "_search", data=json.dumps(qobj), headers=CONTENT_TYPE_JSON)
434 r = ES.search(body=json.dumps(qobj), index=cls.index_name(), doc_type=cls.doc_type(), headers=CONTENT_TYPE_JSON, **kwargs)
435 break
436 except Exception as e:
437 exception = ESMappingMissingError(e) if ES_MAPPING_MISSING_REGEX.match(json.dumps(e.args[2])) else e
438 if isinstance(exception, ESMappingMissingError):
439 raise exception
440 time.sleep(0.5)
442 if r is not None:
443 return r
444 if exception is not None:
445 raise exception
446 raise Exception("Couldn't get the ES query endpoint to respond. Also, you shouldn't be seeing this.")
448 @classmethod
449 def remove_by_id(cls, id):
450 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False):
451 app.logger.warn("System is in READ-ONLY mode, delete_by_id command cannot run")
452 return
454 # r = requests.delete(cls.target() + id)
455 try:
456 ES.delete(cls.index_name(), id)
457 except elasticsearch.NotFoundError:
458 return
460 @classmethod
461 def delete_by_query(cls, query):
462 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False):
463 app.logger.warn("System is in READ-ONLY mode, delete_by_query command cannot run")
464 return
466 #r = requests.delete(cls.target() + "_query", data=json.dumps(query))
467 #return r
468 return ES.delete_by_query(cls.index_name(), json.dumps(query), doc_type=cls.doc_type())
471 @classmethod
472 def destroy_index(cls):
473 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False):
474 app.logger.warn("System is in READ-ONLY mode, destroy_index command cannot run")
475 return
477 # if app.config['ELASTIC_SEARCH_INDEX_PER_TYPE']:
478 # return esprit.raw.delete_index_by_prefix(es_connection, app.config['ELASTIC_SEARCH_DB_PREFIX'])
479 # else:
480 # return esprit.raw.delete_index(es_connection)
481 print('Destroying indexes with prefix ' + app.config['ELASTIC_SEARCH_DB_PREFIX'] + '*')
482 return ES.indices.delete(app.config['ELASTIC_SEARCH_DB_PREFIX'] + '*')
484 @classmethod
485 def check_es_raw_response(cls, res, extra_trace_info=''):
486 if 'error' in res:
487 es_resp = json.dumps(res, indent=2)
489 error_to_raise = ESMappingMissingError if ES_MAPPING_MISSING_REGEX.match(es_resp) else ESError
491 raise error_to_raise(
492 (
493 "Elasticsearch returned an error:"
494 "\nES HTTP Response status: {es_status}"
495 "\nES Response:{es_resp}"
496 .format(es_status=res.get('status', 'unknown'), es_resp=es_resp)
497 ) + extra_trace_info
498 )
500 if 'hits' not in res and 'hits' not in res['hits']: # i.e. if res['hits']['hits'] does not exist
501 raise ESResponseCannotBeInterpreted(
502 (
503 "Elasticsearch did not return any records. "
504 "It probably returned an error we did not understand instead."
505 "\nES HTTP Response status: {es_status}"
506 "\nES Response:{es_resp}\n"
507 .format(es_status=res.get('status', 'unknown'), es_resp=json.dumps(res, indent=2))
508 ) + extra_trace_info
509 )
510 return True
512 @classmethod
513 def handle_es_raw_response(cls, res, wrap, extra_trace_info=''):
514 """
515 Handles the JSON returned by ES, raising errors as needed. If no problems are detected it returns its input
516 unchanged.
518 :param res: The full ES raw response to a query in a Python dict (this method does not handle the raw JSON ES
519 outputs). Usually this parameter is the return value of the .query or .send_query methods.
520 :param wrap: Did the caller request wrapping of each ES record inside a model object? This matters for handling
521 records that have no '_source' or 'fields' keys, but do have an '_id' key. Such records should raise an error
522 if wrapping was requested, since there is nothing to wrap. If no wrapping was requested, perhaps the caller
523 simply needed the object IDs and nothing else, so we do not need to raise an error.
524 :param extra_trace_info: A string with additional diagnostic information to be put into exceptions.
525 """
527 cls.check_es_raw_response(res)
529 rs = []
530 for i, each in enumerate(res['hits']['hits']):
531 if '_source' in each:
532 rs.append(each['_source'])
533 elif 'fields' in each:
534 rs.append(each['fields'])
535 elif '_id' in each and not wrap:
536 # "_id" is a sibling (not child) of "_source" so it can only be used with unwrapped raw responses.
537 # wrap = True only makes sense if "_source" or "fields" were returned.
538 # So, if "_id" is the only one present, extract it into an object that's shaped the same as the item
539 # in the raw response.
540 rs.append({"_id": each['_id']})
541 else:
542 msg1 = "Can't find any useful data in the ES response.\n" + extra_trace_info
543 msg2 = "\nItem {i}.\nItem data:\n{each}".format(i=i, each=json.dumps(each, indent=2))
544 raise ESResponseCannotBeInterpreted(msg1 + msg2)
546 return rs
548 @classmethod
549 def iterate(cls, q: dict = None, page_size: int = 1000, limit: int = None, wrap: bool = True, keepalive: str = '1m'):
550 """ Provide an iterable of all items in a model, use
551 :param q: The query to scroll results on
552 :param page_size: limited by ElasticSearch, check settings to override
553 :param limit: Limit the number of results returned (e.g. to take a slice)
554 :param wrap: Whether to return the results in raw json or wrapped as an object
555 :param keepalive: scroll timeout
556 """
557 theq = {"query": {"match_all": {}}} if q is None else deepcopy(q)
558 theq["size"] = page_size
559 theq["from"] = 0
560 if "sort" not in theq:
561 # This gives the same performance enhancement as scan, use it by default. This is the order of indexing like sort by ID
562 theq["sort"] = ["_doc"]
564 # Initialise the scroll
565 try:
566 res = cls.send_query(theq, scroll=keepalive)
567 except Exception as e:
568 raise ScrollInitialiseException("Unable to initialise scroll - could be your mappings are broken", e)
570 # unpack scroll response
571 scroll_id = res.get('_scroll_id')
572 total_results = res.get('hits', {}).get('total', {}).get('value')
574 # Supply the first set of results
575 counter = 0
576 for r in cls.handle_es_raw_response(
577 res,
578 wrap=wrap,
579 extra_trace_info=
580 "\nScroll initialised:\n{q}\n"
581 "\n\nPage #{counter} of the ES response with size {page_size}."
582 .format(q=json.dumps(theq, indent=2), counter=counter, page_size=page_size)):
584 # apply the limit
585 if limit is not None and counter >= int(limit):
586 break
587 counter += 1
588 if wrap:
589 yield cls(**r)
590 else:
591 yield r
593 # Continue to scroll through the rest of the results
594 while True:
595 # apply the limit
596 if limit is not None and counter >= int(limit):
597 break
599 # if we consumed all the results we were expecting, we can just stop here
600 if counter >= total_results:
601 break
603 # get the next page and check that we haven't timed out
604 try:
605 res = ES.scroll(scroll_id=scroll_id, scroll=keepalive)
606 except elasticsearch.exceptions.NotFoundError as e:
607 raise ScrollTimeoutException(
608 "Scroll timed out; {status} - {message}".format(status=e.status_code, message=e.info))
609 except Exception as e:
610 # if any other exception occurs, make sure it's at least logged.
611 app.logger.exception("Unhandled exception in scroll method of DAO")
612 raise ScrollException(e)
614 # if we didn't get any results back, this means we're at the end
615 if len(res.get('hits', {}).get('hits')) == 0:
616 break
618 for r in cls.handle_es_raw_response(
619 res,
620 wrap=wrap,
621 extra_trace_info=
622 "\nScroll:\n{q}\n"
623 "\n\nPage #{counter} of the ES response with size {page_size}."
624 .format(q=json.dumps(theq, indent=2), counter=counter, page_size=page_size)):
626 # apply the limit
627 if limit is not None and counter >= int(limit):
628 break
629 counter += 1
630 if wrap:
631 yield cls(**r)
632 else:
633 yield r
635 @classmethod
636 def iterall(cls, page_size=1000, limit=None):
637 return cls.iterate(MatchAllQuery().query(), page_size, limit)
639 # an alias for the iterate function
640 scroll = iterate
642 @classmethod
643 def dump(cls, q=None, page_size=1000, limit=None, out=None, out_template=None, out_batch_sizes=100000,
644 out_rollover_callback=None, transform=None, es_bulk_format=True, idkey='id', es_bulk_fields=None,
645 scroll_keepalive='2m'):
646 """ Export to file, bulk format or just a json dump of the record """
648 q = q if q is not None else {"query": {"match_all": {}}}
650 filenames = []
651 n = 1
652 current_file = None
653 if out_template is not None:
654 current_file = out_template + "." + str(n)
655 filenames.append(current_file)
656 if out is None and current_file is not None:
657 out = open(current_file, "w")
658 else:
659 out = sys.stdout
661 count = 0
662 for record in cls.scroll(q, page_size=page_size, limit=limit, wrap=False, keepalive=scroll_keepalive):
663 if transform is not None:
664 record = transform(record)
666 if es_bulk_format:
667 kwargs = {}
668 if es_bulk_fields is None:
669 es_bulk_fields = ["_id", "_index", "_type"]
670 for key in es_bulk_fields:
671 if key == "_id":
672 kwargs["idkey"] = idkey
673 if key == "_index":
674 kwargs["index"] = cls.index_name()
675 if key == "_type":
676 kwargs["type_"] = type
677 data = cls.to_bulk_single_rec(record, **kwargs)
678 else:
679 data = json.dumps(record) + "\n"
681 out.write(data)
682 if out_template is not None:
683 count += 1
684 if count > out_batch_sizes:
685 out.close()
686 if out_rollover_callback is not None:
687 out_rollover_callback(current_file)
689 count = 0
690 n += 1
691 current_file = out_template + "." + str(n)
692 filenames.append(current_file)
693 out = open(current_file, "w")
695 if out_template is not None:
696 out.close()
697 if out_rollover_callback is not None:
698 out_rollover_callback(current_file)
700 return filenames
702 @classmethod
703 def prefix_query(cls, field, prefix, size=5, facet_field=None, analyzed_field=True):
704 # example of a prefix query
705 # {
706 # "query": {"prefix" : { "bibjson.publisher" : "ope" } },
707 # "size": 0,
708 # "facets" : {
709 # "publisher" : { "terms" : {"field" : "bibjson.publisher.exact", "size": 5} }
710 # }
711 # }
713 suffix = app.config['FACET_FIELD']
714 query_field = field
715 if analyzed_field:
716 if field.endswith(suffix):
717 # strip .exact (or whatever it's configured as) off the end
718 query_field = field[:field.rfind(suffix)]
719 else:
720 if not field.endswith(suffix):
721 query_field = field + suffix
723 # the actual terms should come from the .exact version of the
724 # field - we are suggesting whole values, not fragments
725 if facet_field is None:
726 facet_field = query_field + suffix
727 if not facet_field.endswith(suffix):
728 facet_field = facet_field + suffix
730 q = PrefixAutocompleteQuery(query_field, prefix, field, facet_field, size)
731 return cls.send_query(q.query())
733 @classmethod
734 def wildcard_autocomplete_query(cls, field, substring, before=True, after=True, facet_size=5, facet_field=None):
735 """
736 Example of a wildcard query
737 Works only on .exact fields
739 {
740 "query" : {
741 "wildcard" : {"bibjson.publisher.exact" : "De *"}
742 },
743 "size" : 0,
744 "facets" : {
745 "bibjson.publisher.exact" : {
746 "terms" : {"field" : "bibjson.publisher.exact", "size" : 5}
747 }
748 }
749 }
750 :param field:
751 :param substring:
752 :param facet_size:
753 :return:
754 """
755 # wildcard queries need to be on unanalyzed fields
756 suffix = app.config['FACET_FIELD']
757 filter_field = field
758 if not filter_field.endswith(suffix):
759 filter_field = filter_field + suffix
761 # add the wildcard before/after
762 if before:
763 substring = "*" + substring
764 if after:
765 substring = substring + "*"
767 # sort out the facet field
768 if facet_field is None:
769 facet_field = filter_field
770 if not facet_field.endswith(suffix):
771 facet_field = facet_field + suffix
773 # build the query
774 q = WildcardAutocompleteQuery(filter_field, substring, field, facet_field, facet_size)
775 return cls.send_query(q.query())
777 @classmethod
778 def advanced_autocomplete(cls, filter_field, facet_field, substring, size=5, prefix_only=True):
779 analyzed = True
780 if " " in substring:
781 analyzed = False
783 substring = substring.lower()
785 if " " in substring and not prefix_only:
786 res = cls.wildcard_autocomplete_query(filter_field, substring, before=True, after=True, facet_size=size, facet_field=facet_field)
787 else:
788 res = cls.prefix_query(filter_field, substring, size=size, facet_field=facet_field, analyzed_field=analyzed)
790 result = []
791 for term in res['aggregations'][filter_field]['buckets']:
792 # keep ordering - it's by count by default, so most frequent
793 # terms will now go to the front of the result list
794 result.append({"id": term['key'], "text": term['key']})
795 return result
797 @classmethod
798 def autocomplete(cls, field, prefix, size=5):
799 res = None
800 # if there is a space in the prefix, the prefix query won't work, so we fall back to a wildcard
801 # we only do this if we have to, because the wildcard query is a little expensive
802 if " " in prefix:
803 res = cls.wildcard_autocomplete_query(field, prefix, before=False, after=True, facet_size=size)
804 else:
805 prefix = prefix.lower()
806 res = cls.prefix_query(field, prefix, size=size)
808 result = []
809 for term in res['aggregations'][field]['buckets']:
810 # keep ordering - it's by count by default, so most frequent
811 # terms will now go to the front of the result list
812 result.append({"id": term['key'], "text": term['key']})
813 return result
815 @classmethod
816 def q2obj(cls, **kwargs):
817 extra_trace_info = ''
818 if 'q' in kwargs:
819 extra_trace_info = "\nQuery sent to ES (before manipulation in DomainObject.query):\n{}\n".format(json.dumps(kwargs['q'], indent=2))
821 res = cls.query(**kwargs)
822 rs = cls.handle_es_raw_response(res, wrap=True, extra_trace_info=extra_trace_info)
823 results = [cls(**r) for r in rs]
824 return results
826 @classmethod
827 def all(cls, size=10000, **kwargs):
828 """ This is a shortcut to a match_all query with a large size, to return all records """
829 # FIXME: is this only used in tests? ES now limits size so we can't guarantee ALL without using scroll / scan
830 return cls.q2obj(size=size, **kwargs)
832 @classmethod
833 def count(cls):
834 res = ES.count(index=cls.index_name(), doc_type=cls.doc_type())
835 return res.get("count")
836 # return requests.get(cls.target() + '_count').json()['count']
838 @classmethod
839 def hit_count(cls, query, **kwargs):
840 countable_query = deepcopy(query)
841 if "track_total_hits" not in countable_query:
842 countable_query["track_total_hits"] = True
844 res = cls.query(q=countable_query, **kwargs)
845 return res.get("hits", {}).get("total", {}).get("value", 0)
847 @classmethod
848 def block(cls, id, last_updated=None, sleep=0.5, max_retry_seconds=30):
849 if app.config.get("ES_BLOCK_WAIT_OVERRIDE") is not None:
850 sleep = app.config["ES_BLOCK_WAIT_OVERRIDE"]
852 q = BlockQuery(id)
853 start_time = datetime.now()
854 while True:
855 res = cls.query(q=q.query())
856 hits = res.get("hits", {}).get("hits", [])
857 if len(hits) > 0:
858 obj = hits[0].get("fields")
859 if last_updated is not None:
860 if "last_updated" in obj:
861 lu = obj["last_updated"]
862 if len(lu) > 0:
863 threshold = datetime.strptime(last_updated, "%Y-%m-%dT%H:%M:%SZ")
864 lud = datetime.strptime(lu[0], "%Y-%m-%dT%H:%M:%SZ")
865 if lud >= threshold:
866 return
867 else:
868 return
869 else:
870 if (datetime.now() - start_time).total_seconds() >= max_retry_seconds:
871 raise BlockTimeOutException("Attempting to block until record with id {id} appears in Elasticsearch, but this has not happened after {limit}".format(id=id, limit=max_retry_seconds))
873 time.sleep(sleep)
875 @classmethod
876 def blockall(cls, ids_and_last_updateds, sleep=0.05, individual_max_retry_seconds=30):
877 for id, lu in ids_and_last_updateds:
878 cls.block(id, lu, sleep=sleep, max_retry_seconds=individual_max_retry_seconds)
880 @classmethod
881 def blockdeleted(cls, id, sleep=0.5, max_retry_seconds=30):
882 if app.config.get("ES_BLOCK_WAIT_OVERRIDE") is not None:
883 sleep = app.config["ES_BLOCK_WAIT_OVERRIDE"]
885 q = BlockQuery(id)
886 start_time = datetime.now()
887 while True:
888 res = cls.query(q=q.query())
889 hits = res.get("hits", {}).get("hits", [])
890 if len(hits) == 0:
891 return
892 else:
893 if (datetime.now() - start_time).total_seconds() >= max_retry_seconds:
894 raise BlockTimeOutException(
895 "Attempting to block until record with id {id} deleted from Elasticsearch, but this has not happened after {limit}".format(
896 id=id, limit=max_retry_seconds))
898 time.sleep(sleep)
900 @classmethod
901 def blockalldeleted(cls, ids, sleep=0.05, individual_max_retry_seconds=30):
902 for id in ids:
903 cls.blockdeleted(id, sleep, individual_max_retry_seconds)
906class BlockTimeOutException(Exception):
907 pass
910class DAOSaveExceptionMaxRetriesReached(Exception):
911 pass
914class ESResponseCannotBeInterpreted(Exception):
915 pass
918class ESMappingMissingError(Exception):
919 pass
922class ESError(Exception):
923 pass
925########################################################################
926# Some useful ES queries
927########################################################################
930class MatchAllQuery(object):
931 def query(self):
932 return {
933 "track_total_hits" : True,
934 "query": {
935 "match_all": {}
936 }
937 }
940class BlockQuery(object):
941 def __init__(self, id):
942 self._id = id
944 def query(self):
945 return {
946 "query": {
947 "bool": {
948 "must": [
949 {"term": {"id.exact": self._id}}
950 ]
951 }
952 },
953 "_source" : False,
954 "docvalue_fields": [
955 {"field": "last_updated", "format": "date_time_no_millis"}
956 ]
957 }
960class PrefixAutocompleteQuery(object):
961 def __init__(self, query_field, prefix, agg_name, agg_field, agg_size):
962 self._query_field = query_field
963 self._prefix = prefix
964 self._agg_name = agg_name
965 self._agg_field = agg_field
966 self._agg_size = agg_size
968 def query(self):
969 return {
970 "track_total_hits": True,
971 "query": {"prefix": {self._query_field: self._prefix.lower()}},
972 "size": 0,
973 "aggs": {
974 self._agg_name: {"terms": {"field": self._agg_field, "size": self._agg_size}}
975 }
976 }
978class WildcardAutocompleteQuery(object):
979 def __init__(self, wildcard_field, wildcard_query, agg_name, agg_field, agg_size):
980 self._wildcard_field = wildcard_field
981 self._wildcard_query = wildcard_query
982 self._agg_name = agg_name
983 self._agg_field = agg_field
984 self._agg_size = agg_size
986 def query(self):
987 return {
988 "track_total_hits": True,
989 "query": {
990 "wildcard": {self._wildcard_field: self._wildcard_query}
991 },
992 "size": 0,
993 "aggs": {
994 self._agg_name: {
995 "terms": {"field": self._agg_field, "size": self._agg_size}
996 }
997 }
998 }
1001#########################################################################
1002# A query handler that knows how to speak facetview2
1003#########################################################################
1006class Facetview2(object):
1007 """
1008 ~~SearchURLGenerator:Feature->Elasticsearch:Technology~~
1009 """
1011 # Examples of queries
1012 # {"query":{"filtered":{"filter":{"bool":{"must":[{"term":{"_type":"article"}}]}},"query":{"query_string":{"query":"richard","default_operator":"OR"}}}},"from":0,"size":10}
1013 # {"query":{"query_string":{"query":"richard","default_operator":"OR"}},"from":0,"size":10}
1015 @staticmethod
1016 def make_term_filter(term, value):
1017 return {"term": {term: value}}
1019 @staticmethod
1020 def make_query(query_string=None, filters=None, default_operator="OR", sort_parameter=None, sort_order="asc", default_field=None):
1021 query_part = {"match_all": {}}
1022 if query_string is not None:
1023 query_part = {"query_string": {"query": query_string, "default_operator": default_operator}}
1024 if default_field is not None:
1025 query_part["query_string"]["default_field"] = default_field
1026 query = {"query": query_part}
1028 if filters is not None:
1029 if not isinstance(filters, list):
1030 filters = [filters]
1031 filters.append(query_part)
1032 bool_part = {"bool": {"must": filters}}
1033 query = {"query": query_part}
1035 if sort_parameter is not None:
1036 # For facetview we can only have one sort parameter, but ES actually supports lists
1037 sort_part = [{sort_parameter: {"order": sort_order}}]
1038 query["sort"] = sort_part
1040 return query
1042 @staticmethod
1043 def url_encode_query(query):
1044 return urllib.parse.quote(json.dumps(query).replace(' ', ''))