Coverage for portality / dao.py: 69%

919 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-04 09:41 +0100

1from __future__ import annotations 

2 

3import json 

4import os 

5import re 

6import sys 

7import time 

8import urllib.parse 

9import uuid 

10from collections import UserDict 

11from copy import deepcopy 

12from datetime import timedelta 

13from typing import List, Iterable, Tuple 

14 

15import elasticsearch 

16from elasticsearch.exceptions import ConnectionTimeout 

17 

18from portality.core import app, es_connection as ES 

19from portality.lib import dates 

20from portality.lib.dates import FMT_DATETIME_STD 

21 

22# All models in models.py should inherit this DomainObject to know how to save themselves in the index and so on. 

23# You can overwrite and add to the DomainObject functions as required. See models.py for some examples. 

24 

25 

26ES_MAPPING_MISSING_REGEX = re.compile(r'.*No mapping found for \[[a-zA-Z0-9-_\.]+?\] in order to sort on.*', re.DOTALL) 

27CONTENT_TYPE_JSON = {'Content-Type': 'application/json'} 

28 

29 

30class ElasticSearchWriteException(Exception): 

31 pass 

32 

33 

34class ScrollException(Exception): 

35 pass 

36 

37 

38class ScrollInitialiseException(ScrollException): 

39 pass 

40 

41 

42class ScrollTimeoutException(ScrollException): 

43 pass 

44 

45 

46class BulkException(Exception): 

47 pass 

48 

49 

50class DomainObject(UserDict, object): 

51 """ 

52 ~~DomainObject:Model->Elasticsearch:Technology~~ 

53 

54 base models class for Elasticsearch(ES) index, 

55 which provide interaction with ES index such as (save, delete, query, etc.) 

56 """ 

57 

58 # set the type on the model that inherits this# which also is ES index name of the model 

59 __type__ = None 

60 

61 def __init__(self, **kwargs): 

62 # if self.data is already set, don't do anything here 

63 try: 

64 object.__getattribute__(self, "data") 

65 except: 

66 if '_source' in kwargs: 

67 self.data = dict(kwargs['_source']) 

68 self.meta = dict(kwargs) 

69 del self.meta['_source'] 

70 else: 

71 self.data = dict(kwargs) 

72 # FIXME: calling super() breaks everything, even thought this is the correct thing to do 

73 # this is because the DomainObject incorrectly overrides properties of the super class 

74 # super(DomainObject, self).__init__() 

75 

76 @classmethod 

77 def index_name(cls, override_index_name=None, **kwargs): 

78 if app.config['ELASTIC_SEARCH_INDEX_PER_TYPE'] and cls.__type__ is not None: 

79 if override_index_name is not None: 

80 name = app.config['ELASTIC_SEARCH_DB_PREFIX'] + override_index_name 

81 else: 

82 name = ','.join([app.config['ELASTIC_SEARCH_DB_PREFIX'] + t for t in cls.__type__.split(',')]) 

83 else: 

84 if override_index_name is not None: 

85 name = override_index_name 

86 else: 

87 name = app.config['ELASTIC_SEARCH_DB'] 

88 return name 

89 

90 @classmethod 

91 def doc_type(cls): 

92 if app.config['ELASTIC_SEARCH_INDEX_PER_TYPE']: 

93 return None 

94 else: 

95 return cls.__type__ 

96 

97 @classmethod 

98 def makeid(cls): 

99 """Create a new id for data object overwrite this in specific model types if required""" 

100 return str(uuid.uuid4().hex) 

101 

102 @property 

103 def id(self): 

104 rawid = self.data.get("id", None) 

105 if rawid is not None: 

106 return str(rawid) 

107 return None 

108 

109 def set_id(self, id=None): 

110 if id is None: 

111 id = self.makeid() 

112 self.data["id"] = str(id) 

113 

114 @property 

115 def version(self): 

116 return self.meta.get('_version', None) 

117 

118 @property 

119 def json(self): 

120 return json.dumps(self.data) 

121 

122 def set_created(self, date=None): 

123 if date is None: 

124 self.data['created_date'] = dates.now_str() 

125 else: 

126 self.data['created_date'] = date 

127 

128 @property 

129 def created_date(self): 

130 return self.data.get("created_date") 

131 

132 @property 

133 def created_timestamp(self): 

134 return dates.parse(self.data.get("created_date")) 

135 

136 @property 

137 def last_updated(self): 

138 return self.data.get("last_updated") 

139 

140 @property 

141 def last_updated_timestamp(self): 

142 lu = self.last_updated 

143 if lu is None: 

144 return None 

145 return dates.parse(self.last_updated) 

146 

147 def pre_save_prep(self, blocking=False, differentiate=False, update_last_updated=True): 

148 if 'id' not in self.data: 

149 self.data['id'] = self.makeid() 

150 

151 self.data['es_type'] = self.__type__ 

152 

153 now = dates.now_str() 

154 if (blocking or differentiate) and "last_updated" in self.data: 

155 diff = dates.now() - dates.parse(self.data["last_updated"]) 

156 

157 # we need the new last_updated time to be later than the new one 

158 if diff.total_seconds() < 1: 

159 soon = dates.now() + timedelta(seconds=1) 

160 now = soon.strftime(FMT_DATETIME_STD) 

161 

162 if update_last_updated: 

163 self.data['last_updated'] = now 

164 

165 if 'created_date' not in self.data: 

166 self.data['created_date'] = now 

167 

168 def save(self, retries=0, back_off_factor=1, differentiate=False, blocking=False, block_wait=0.25, update_last_updated=True): 

169 """ 

170 ~~->ReadOnlyMode:Feature~~ 

171 :param retries: 

172 :param back_off_factor: 

173 :param differentiate: 

174 :param blocking: 

175 :return: 

176 """ 

177 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False): 

178 app.logger.warning("System is in READ-ONLY mode, save command cannot run") 

179 return 

180 

181 if retries > app.config.get("ES_RETRY_HARD_LIMIT", 1000): # an arbitrary large number 

182 retries = app.config.get("ES_RETRY_HARD_LIMIT", 1000) 

183 

184 if app.config.get("ES_BLOCK_WAIT_OVERRIDE") is not None: 

185 block_wait = app.config["ES_BLOCK_WAIT_OVERRIDE"] 

186 

187 self.pre_save_prep(blocking=blocking, differentiate=differentiate, update_last_updated=update_last_updated) 

188 now = self.data.get("last_updated", dates.now_str()) 

189 

190 attempt = 0 

191 d = json.dumps(self.data) 

192 r = None 

193 while attempt <= retries: 

194 try: 

195 r = ES.index(self.index_name(), d, doc_type=self.doc_type(), 

196 id=self.data.get("id"), 

197 headers=CONTENT_TYPE_JSON, 

198 timeout=app.config.get('ES_READ_TIMEOUT', None), ) 

199 break 

200 

201 except (elasticsearch.ConnectionError, elasticsearch.ConnectionTimeout): 

202 app.logger.exception("Failed to connect to ES") 

203 attempt += 1 

204 

205 except elasticsearch.TransportError as e: 

206 # Retries depend on which end the error lies. 

207 if 400 <= e.status_code < 500: 

208 # Bad request, do not retry as it won't work. Fail with ElasticSearchWriteException. 

209 app.logger.exception("Bad Request to ES, save failed. Details: {0}".format(e.error)) 

210 raise ElasticSearchWriteException(e.error) 

211 elif e.status_code >= 500: 

212 # Server error, this could be temporary so we may want to retry 

213 app.logger.exception("Server Error from ES, retrying. Details: {0}".format(e.error)) 

214 attempt += 1 

215 except Exception as e: 

216 # if any other exception occurs, make sure it's at least logged. 

217 app.logger.exception("Unhandled exception in save method of DAO") 

218 raise ElasticSearchWriteException(e) 

219 

220 # wait before retrying 

221 time.sleep((2 ** attempt) * back_off_factor) 

222 

223 if attempt > retries: 

224 raise DAOSaveExceptionMaxRetriesReached( 

225 "After {attempts} attempts the record with " 

226 "id {id} failed to save.".format( 

227 attempts=attempt, id=self.data['id'])) 

228 

229 if blocking: 

230 self.__class__.block(self.id, last_updated=now, sleep=block_wait) 

231 

232 return r 

233 

234 def delete(self): 

235 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False): 

236 app.logger.warning("System is in READ-ONLY mode, delete command cannot run") 

237 return 

238 

239 # r = requests.delete(self.target() + self.id) 

240 try: 

241 ES.delete(self.index_name(), self.id, doc_type=self.doc_type()) 

242 except elasticsearch.NotFoundError: 

243 pass # This is to preserve the old behaviour 

244 

245 @staticmethod 

246 def make_query(theq=None, should_terms=None, consistent_order=True, **kwargs): 

247 """ 

248 Generate a query object based on parameters but don't send to 

249 backend - return it instead. Must always have the same 

250 parameters as the query method. See query method for explanation 

251 of parameters. 

252 """ 

253 if theq is None: 

254 theq = "" 

255 q = deepcopy(theq) 

256 if isinstance(q, dict): 

257 query = q 

258 if 'bool' not in query['query']: 

259 boolean = {'bool': {'must': []}} 

260 boolean['bool']['must'].append(query['query']) 

261 query['query'] = boolean 

262 if 'must' not in query['query']['bool']: 

263 query['query']['bool']['must'] = [] 

264 elif q: 

265 query = { 

266 'query': { 

267 'bool': { 

268 'must': [ 

269 {'query_string': {'query': q}} 

270 ] 

271 } 

272 } 

273 } 

274 else: 

275 query = { 

276 'query': { 

277 'bool': { 

278 'must': [ 

279 {'match_all': {}} 

280 ] 

281 } 

282 } 

283 } 

284 

285 if should_terms is not None and len(should_terms) > 0: 

286 for s in should_terms: 

287 if not isinstance(should_terms[s], list): 

288 should_terms[s] = [should_terms[s]] 

289 query["query"]["bool"]["must"].append({"terms": {s: should_terms[s]}}) 

290 

291 sort_specified = False 

292 for k, v in kwargs.items(): 

293 if k == '_from': 

294 query['from'] = v 

295 elif k == 'sort': 

296 sort_specified = True 

297 query['sort'] = v 

298 else: 

299 query[k] = v 

300 if "sort" in query: 

301 sort_specified = True 

302 

303 if not sort_specified and consistent_order: 

304 # FIXME: review this - where is default sort necessary, and why do we want this in ID order? 

305 query['sort'] = [{"id.exact": {"order": "asc", "unmapped_type": "keyword"}}] 

306 

307 return query 

308 

309 @classmethod 

310 def _unwrap_search_result(cls, res): 

311 return [i.get("_source") if "_source" in i else i.get("fields") for i in 

312 res.get('hits', {}).get('hits', [])] 

313 

314 @classmethod 

315 def bulk_delete(cls, id_list, idkey='id', refresh=False): 

316 return cls.bulk(documents=[{'id': i} for i in id_list], idkey=idkey, refresh=refresh, action='delete') 

317 

318 @classmethod 

319 def bulk(cls, documents: List[dict], idkey='id', refresh=False, action='index', req_timeout=10, override_index_name=None, **kwargs): 

320 """ 

321 :param documents: a list of objects to perform bulk actions on (list of dicts) 

322 :param idkey: The path to extract an ID from the object, e.g. 'id', 'identifiers.id' 

323 :param refresh: Refresh the index in each operation (make immediately available for search) - expensive! 

324 :param req_timeout: Request timeout for bulk operation 

325 :param kwargs: kwargs are passed into the bulk instruction for each record 

326 """ 

327 # ~~->ReadOnlyMode:Feature~~ 

328 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False): 

329 app.logger.warning("System is in READ-ONLY mode, bulk command cannot run") 

330 return 

331 

332 if action not in ['index', 'update', 'delete']: 

333 raise Exception("Unrecognised bulk action '{0}'".format(action)) 

334 

335 data = '' 

336 for d in documents: 

337 data += cls.to_bulk_single_rec(d, idkey=idkey, action=action, **kwargs) 

338 resp = ES.bulk(body=data, index=cls.index_name(override_index_name), doc_type=cls.doc_type(), refresh=refresh, 

339 request_timeout=req_timeout) 

340 return resp 

341 

342 @staticmethod 

343 def to_bulk_single_rec(record, idkey="id", action="index", **kwargs): 

344 """ Adapted from esprit. Create a bulk instruction from a single record. """ 

345 data = '' 

346 idpath = idkey.split(".") 

347 

348 # traverse down the object in search of the supplied ID key 

349 context = record 

350 for pathseg in idpath: 

351 if pathseg in context: 

352 context = context[pathseg] 

353 else: 

354 raise BulkException( 

355 "'{0}' not available in record to generate bulk _id: {1}".format(idkey, json.dumps(record))) 

356 

357 datadict = {action: {'_id': context}} 

358 datadict[action].update(kwargs) 

359 

360 data += json.dumps(datadict) + '\n' 

361 

362 if action == 'delete': 

363 return data 

364 

365 # For update, we wrap the document in {doc: document} if not already supplied 

366 if action == 'update' and not (record.get('doc') and len(record.keys()) == 1): 

367 data += json.dumps({'doc': record}) + '\n' 

368 else: 

369 data += json.dumps(record) + '\n' 

370 return data 

371 

372 @classmethod 

373 def refresh(cls): 

374 """ 

375 ~~->ReadOnlyMode:Feature~~ 

376 :return: 

377 """ 

378 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False): 

379 app.logger.warning("System is in READ-ONLY mode, refresh command cannot run") 

380 return 

381 

382 # r = requests.post(cls.target() + '_refresh', headers=CONTENT_TYPE_JSON) 

383 # return r.json() 

384 

385 return ES.indices.refresh(index=cls.index_name()) 

386 

387 @classmethod 

388 def pull(cls, id_): 

389 """Retrieve object by id.""" 

390 if id_ is None or id_ == '': 

391 return None 

392 

393 try: 

394 # out = requests.get(cls.target() + id_) 

395 out = ES.get(cls.index_name(), id_, doc_type=cls.doc_type()) 

396 except elasticsearch.NotFoundError: 

397 return None 

398 except elasticsearch.TransportError as e: 

399 raise Exception("ES returned an error: {x}".format(x=e.info)) 

400 except Exception as e: 

401 app.logger.exception(f"Unexpected error in pull(): {e}") 

402 return None 

403 if out is None: 

404 return None 

405 

406 return cls(**out) 

407 

408 @classmethod 

409 def pull_by_key(cls, key, value): 

410 res = cls.query(q={"query": {"term": {key + app.config['FACET_FIELD']: value}}}) 

411 if res.get('hits', {}).get('total', {}).get('value', 0) == 1: 

412 return cls.pull(res['hits']['hits'][0]['_source']['id']) 

413 else: 

414 return None 

415 

416 @classmethod 

417 def object_query(cls, q=None, **kwargs): 

418 result = cls.query(q, **kwargs) 

419 return [cls(**r.get("_source")) for r in result.get("hits", {}).get("hits", [])] 

420 

421 @classmethod 

422 def query(cls, q=None, **kwargs): 

423 """Perform a query on backend. 

424 

425 :param q: maps to query_string parameter if string, or query dict if dict. 

426 :param kwargs: any keyword args as per 

427 http://www.elasticsearch.org/guide/reference/api/search/uri-request.html 

428 """ 

429 query = cls.make_query(q, **kwargs) 

430 return cls.send_query(query) 

431 

432 @classmethod 

433 def send_query(cls, qobj, retry=50, pit_query=False, **kwargs): 

434 """Actually send a query object to the backend. 

435 :param kwargs are passed directly to Elasticsearch search() function 

436 """ 

437 

438 if retry > app.config.get("ES_RETRY_HARD_LIMIT", 1000) + 1: # an arbitrary large number 

439 retry = app.config.get("ES_RETRY_HARD_LIMIT", 1000) + 1 

440 

441 r = None 

442 count = 0 

443 exception = None 

444 while count < retry: 

445 count += 1 

446 try: 

447 # ES 7.10 updated target to whole index, since specifying type for search is deprecated 

448 # r = requests.post(cls.target_whole_index() + recid + "_search", data=json.dumps(qobj), headers=CONTENT_TYPE_JSON) 

449 if kwargs.get('timeout') is None: 

450 kwargs['timeout'] = app.config.get('ES_READ_TIMEOUT', None) 

451 index = cls.index_name() if pit_query is False else None 

452 r = ES.search(body=json.dumps(qobj), index=index, doc_type=cls.doc_type(), 

453 headers=CONTENT_TYPE_JSON, **kwargs) 

454 break 

455 except ConnectionTimeout as e: 

456 continue # retry on timeout 

457 except Exception as e: 

458 try: 

459 exception = ESMappingMissingError(e) if ES_MAPPING_MISSING_REGEX.match(json.dumps(e.args[2])) else e 

460 if isinstance(exception, ESMappingMissingError): 

461 raise exception 

462 except TypeError: 

463 raise e 

464 

465 time.sleep(0.5) 

466 

467 if r is not None: 

468 return r 

469 if exception is not None: 

470 raise exception 

471 raise Exception("Couldn't get the ES query endpoint to respond. Also, you shouldn't be seeing this.") 

472 

473 @classmethod 

474 def remove_by_id(cls, id): 

475 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False): 

476 app.logger.warning("System is in READ-ONLY mode, delete_by_id command cannot run") 

477 return 

478 

479 # r = requests.delete(cls.target() + id) 

480 try: 

481 ES.delete(cls.index_name(), id) 

482 except elasticsearch.NotFoundError: 

483 return 

484 

485 @classmethod 

486 def delete_by_query(cls, query): 

487 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False): 

488 app.logger.warning("System is in READ-ONLY mode, delete_by_query command cannot run") 

489 return 

490 

491 # r = requests.delete(cls.target() + "_query", data=json.dumps(query)) 

492 # return r 

493 return ES.delete_by_query(cls.index_name(), json.dumps(query), doc_type=cls.doc_type()) 

494 

495 @classmethod 

496 def destroy_index(cls, **es_kwargs): 

497 default_es_kwargs = { 

498 'timeout': '5m', 

499 'request_timeout': 1001, 

500 } 

501 default_es_kwargs.update(es_kwargs) 

502 es_kwargs = default_es_kwargs 

503 

504 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False): 

505 app.logger.warning("System is in READ-ONLY mode, destroy_index command cannot run") 

506 return 

507 

508 # if app.config['ELASTIC_SEARCH_INDEX_PER_TYPE']: 

509 # return esprit.raw.delete_index_by_prefix(es_connection, app.config['ELASTIC_SEARCH_DB_PREFIX']) 

510 # else: 

511 # return esprit.raw.delete_index(es_connection) 

512 print('Destroying indexes with prefix ' + app.config['ELASTIC_SEARCH_DB_PREFIX'] + '*') 

513 return ES.indices.delete(app.config['ELASTIC_SEARCH_DB_PREFIX'] + '*', **es_kwargs) 

514 

515 @classmethod 

516 def check_es_raw_response(cls, res, extra_trace_info=''): 

517 if 'error' in res: 

518 es_resp = json.dumps(res, indent=2) 

519 

520 error_to_raise = ESMappingMissingError if ES_MAPPING_MISSING_REGEX.match(es_resp) else ESError 

521 

522 raise error_to_raise( 

523 ( 

524 "Elasticsearch returned an error:" 

525 "\nES HTTP Response status: {es_status}" 

526 "\nES Response:{es_resp}" 

527 .format(es_status=res.get('status', 'unknown'), es_resp=es_resp) 

528 ) + extra_trace_info 

529 ) 

530 

531 if 'hits' not in res and 'hits' not in res['hits']: # i.e. if res['hits']['hits'] does not exist 

532 raise ESResponseCannotBeInterpreted( 

533 ( 

534 "Elasticsearch did not return any records. " 

535 "It probably returned an error we did not understand instead." 

536 "\nES HTTP Response status: {es_status}" 

537 "\nES Response:{es_resp}\n" 

538 .format(es_status=res.get('status', 'unknown'), es_resp=json.dumps(res, indent=2)) 

539 ) + extra_trace_info 

540 ) 

541 return True 

542 

543 @classmethod 

544 def handle_es_raw_response(cls, res, wrap, extra_trace_info=''): 

545 """ 

546 Handles the JSON returned by ES, raising errors as needed. If no problems are detected it returns its input 

547 unchanged. 

548 

549 :param res: The full ES raw response to a query in a Python dict (this method does not handle the raw JSON ES 

550 outputs). Usually this parameter is the return value of the .query or .send_query methods. 

551 :param wrap: Did the caller request wrapping of each ES record inside a model object? This matters for handling 

552 records that have no '_source' or 'fields' keys, but do have an '_id' key. Such records should raise an error 

553 if wrapping was requested, since there is nothing to wrap. If no wrapping was requested, perhaps the caller 

554 simply needed the object IDs and nothing else, so we do not need to raise an error. 

555 :param extra_trace_info: A string with additional diagnostic information to be put into exceptions. 

556 """ 

557 

558 cls.check_es_raw_response(res) 

559 

560 rs = [] 

561 for i, each in enumerate(res['hits']['hits']): 

562 if '_source' in each: 

563 rs.append(each['_source']) 

564 elif 'fields' in each: 

565 rs.append(each['fields']) 

566 elif '_id' in each and not wrap: 

567 # "_id" is a sibling (not child) of "_source" so it can only be used with unwrapped raw responses. 

568 # wrap = True only makes sense if "_source" or "fields" were returned. 

569 # So, if "_id" is the only one present, extract it into an object that's shaped the same as the item 

570 # in the raw response. 

571 rs.append({"_id": each['_id']}) 

572 else: 

573 msg1 = "Can't find any useful data in the ES response.\n" + extra_trace_info 

574 msg2 = "\nItem {i}.\nItem data:\n{each}".format(i=i, each=json.dumps(each, indent=2)) 

575 raise ESResponseCannotBeInterpreted(msg1 + msg2) 

576 

577 return rs 

578 

579 @classmethod 

580 def iterate(cls, q: dict = None, page_size: int = 1000, limit: int = None, wrap: bool = True, 

581 keepalive: str = '1m'): 

582 """ Provide an iterable of all items in a model, use 

583 :param q: The query to scroll results on 

584 :param page_size: limited by ElasticSearch, check settings to override 

585 :param limit: Limit the number of results returned (e.g. to take a slice) 

586 :param wrap: Whether to return the results in raw json or wrapped as an object 

587 :param keepalive: scroll timeout 

588 

589 TODO: this is the old method, we should evaluate and aim to make iterate_pit the default scroll method. 

590 """ 

591 theq = {"query": {"match_all": {}}} if q is None else deepcopy(q) 

592 theq["size"] = page_size 

593 theq["from"] = 0 

594 if "sort" not in theq: 

595 # This gives the same performance enhancement as scan, use it by default. This is the order of indexing like sort by ID 

596 theq["sort"] = ["_doc"] 

597 

598 # Initialise the scroll 

599 try: 

600 res = cls.send_query(theq, scroll=keepalive) 

601 except Exception as e: 

602 raise ScrollInitialiseException("Unable to initialise scroll - could be your mappings are broken", e) 

603 

604 # unpack scroll response 

605 scroll_id = res.get('_scroll_id') 

606 total_results = res.get('hits', {}).get('total', {}).get('value') 

607 

608 # Supply the first set of results 

609 counter = 0 

610 for r in cls.handle_es_raw_response( 

611 res, 

612 wrap=wrap, 

613 extra_trace_info= 

614 "\nScroll initialised:\n{q}\n" 

615 "\n\nPage #{counter} of the ES response with size {page_size}." 

616 .format(q=json.dumps(theq, indent=2), counter=counter, page_size=page_size)): 

617 

618 # apply the limit 

619 if limit is not None and counter >= int(limit): 

620 break 

621 counter += 1 

622 if wrap: 

623 yield cls(**r) 

624 else: 

625 yield r 

626 

627 # Continue to scroll through the rest of the results 

628 while True: 

629 # apply the limit 

630 if limit is not None and counter >= int(limit): 

631 break 

632 

633 # if we consumed all the results we were expecting, we can just stop here 

634 if counter >= total_results: 

635 break 

636 

637 # get the next page and check that we haven't timed out 

638 try: 

639 res = ES.scroll(scroll_id=scroll_id, scroll=keepalive) 

640 except elasticsearch.exceptions.NotFoundError as e: 

641 raise ScrollTimeoutException( 

642 "Scroll timed out; {status} - {message}".format(status=e.status_code, message=e.info)) 

643 except Exception as e: 

644 # if any other exception occurs, make sure it's at least logged. 

645 app.logger.exception("Unhandled exception in scroll method of DAO") 

646 raise ScrollException(e) 

647 

648 # if we didn't get any results back, this means we're at the end 

649 if len(res.get('hits', {}).get('hits')) == 0: 

650 break 

651 

652 for r in cls.handle_es_raw_response( 

653 res, 

654 wrap=wrap, 

655 extra_trace_info= 

656 "\nScroll:\n{q}\n" 

657 "\n\nPage #{counter} of the ES response with size {page_size}." 

658 .format(q=json.dumps(theq, indent=2), counter=counter, page_size=page_size)): 

659 

660 # apply the limit 

661 if limit is not None and counter >= int(limit): 

662 break 

663 counter += 1 

664 if wrap: 

665 yield cls(**r) 

666 else: 

667 yield r 

668 

669 @classmethod 

670 def iterate_pit(cls, q: dict = None, page_size: int = 1000, limit: int = None, wrap: bool = True, 

671 keepalive: str = '1m'): 

672 """ Provide an iterable of all items in a model, reimplemented using point-in-time queries 

673 :param q: The query to scroll results on 

674 :param page_size: limited by ElasticSearch, check settings to override 

675 :param limit: Limit the number of results returned (e.g. to take a slice) 

676 :param wrap: Whether to return the results in raw json or wrapped as an object 

677 :param keepalive: scroll timeout 

678 """ 

679 theq = {"query": {"match_all": {}}} if q is None else deepcopy(q) 

680 theq["size"] = page_size 

681 theq["from"] = 0 

682 if "sort" not in theq: 

683 # This gives the same performance enhancement as scan, use it by default. This is the order of indexing like sort by ID 

684 theq["sort"] = ["_doc"] 

685 

686 # Open a point in time query context 

687 res = ES.open_point_in_time(index=cls.index_name(), keep_alive=keepalive) 

688 pit_id = res.get("id") 

689 

690 theq["pit"] = { 

691 "id": pit_id, 

692 "keep_alive": keepalive 

693 } 

694 theq["track_total_hits"] = True 

695 

696 first_resp = cls.send_query(theq, pit_query=True) 

697 if len(first_resp.get('hits', {}).get('hits', [])) == 0: 

698 return 

699 

700 search_after = first_resp.get('hits', {}).get('hits', [])[-1].get('sort', []) 

701 total_results = first_resp.get('hits', {}).get('total', {}).get('value') 

702 

703 # Supply the first set of results 

704 counter = 0 

705 for r in cls.handle_es_raw_response( 

706 first_resp, 

707 wrap=wrap, 

708 extra_trace_info= 

709 "\nPIT Initialised:\n{q}\n" 

710 "\n\nPage #{counter} of the ES response with size {page_size}." 

711 .format(q=json.dumps(theq, indent=2), counter=counter, page_size=page_size)): 

712 

713 # apply the limit 

714 if limit is not None and counter >= int(limit): 

715 break 

716 counter += 1 

717 if wrap: 

718 yield cls(**r) 

719 else: 

720 yield r 

721 

722 del theq["track_total_hits"] 

723 

724 # Continue to scroll through the rest of the results 

725 while True: 

726 # apply the limit 

727 if limit is not None and counter >= int(limit): 

728 break 

729 

730 # if we consumed all the results we were expecting, we can just stop here 

731 if counter >= total_results: 

732 break 

733 

734 theq["search_after"] = search_after 

735 

736 # get the next page and check that we haven't timed out 

737 try: 

738 res = cls.send_query(theq, pit_query=True) 

739 if len(res.get('hits', {}).get('hits', [])) == 0: 

740 break 

741 search_after = first_resp.get('hits', {}).get('hits', [])[-1].get('sort', []) 

742 except elasticsearch.exceptions.NotFoundError as e: 

743 raise ScrollTimeoutException( 

744 "PIT timed out; {status} - {message}".format(status=e.status_code, message=e.info)) 

745 except Exception as e: 

746 # if any other exception occurs, make sure it's at least logged. 

747 app.logger.exception("Unhandled exception in iterate_pit method of DAO") 

748 try: 

749 ES.close_point_in_time({"id": pit_id}) 

750 except: 

751 pass 

752 raise ScrollException(e) 

753 

754 for r in cls.handle_es_raw_response( 

755 res, 

756 wrap=wrap, 

757 extra_trace_info= 

758 "\nPIT:\n{q}\n" 

759 "\n\nPage #{counter} of the ES response with size {page_size}." 

760 .format(q=json.dumps(theq, indent=2), counter=counter, page_size=page_size)): 

761 

762 # apply the limit 

763 if limit is not None and counter >= int(limit): 

764 break 

765 counter += 1 

766 if wrap: 

767 yield cls(**r) 

768 else: 

769 yield r 

770 

771 ES.close_point_in_time({"id": pit_id}) 

772 

773 @classmethod 

774 def iterate_unstable(cls, q: dict = None, page_size: int = 1000, limit: int = None, wrap: bool = True, logger=None): 

775 """ Provide an iterable of all items in a model, using search_after but with no scroll context or 

776 PIT. This means that if the index changes as the iterate is happening, there may be repeated or 

777 missed elements. This is useful for cases where the index is not changing during the iteration, or the 

778 exact export is not important (e.g. for anon_export for testing purposes) 

779 

780 :param q: The query to scroll results on 

781 :param page_size: limited by ElasticSearch, check settings to override 

782 :param limit: Limit the number of results returned (e.g. to take a slice) 

783 :param wrap: Whether to return the results in raw json or wrapped as an object 

784 """ 

785 theq = {"query": {"match_all": {}}} if q is None else deepcopy(q) 

786 theq["size"] = page_size 

787 if "from" in theq: 

788 del theq["from"] 

789 

790 if "sort" not in theq: 

791 # This gives the same performance enhancement as scan, use it by default. This is the order of indexing like sort by ID 

792 theq["sort"] = [{"_id": "desc"}] 

793 

794 theq["track_total_hits"] = True 

795 

796 # if logger: logger(json.dumps(theq)) 

797 

798 first_resp = cls.send_query(theq) 

799 if len(first_resp.get('hits', {}).get('hits', [])) == 0: 

800 # if logger: logger("No results found") 

801 return 

802 

803 search_after = first_resp.get('hits', {}).get('hits', [])[-1].get('sort', []) 

804 total_results = first_resp.get('hits', {}).get('total', {}).get('value') 

805 if logger: logger(f"Expecting total {total_results}") 

806 

807 # Supply the first set of results 

808 counter = 0 

809 for r in cls.handle_es_raw_response( 

810 first_resp, 

811 wrap=wrap, 

812 extra_trace_info= 

813 "\nUnstable Iterate Initialised:\n{q}\n" 

814 "\n\nPage #{counter} of the ES response with size {page_size}." 

815 .format(q=json.dumps(theq, indent=2), counter=counter, page_size=page_size)): 

816 

817 # apply the limit 

818 if limit is not None and counter >= int(limit): 

819 break 

820 counter += 1 

821 if wrap: 

822 yield cls(**r) 

823 else: 

824 yield r 

825 # if logger: logger(f"Iterated {counter} records") 

826 

827 del theq["track_total_hits"] 

828 

829 # Continue to scroll through the rest of the results 

830 while True: 

831 # apply the limit 

832 if limit is not None and counter >= int(limit): 

833 break 

834 

835 # if we consumed all the results we were expecting, we can just stop here 

836 if counter >= total_results: 

837 break 

838 

839 theq["search_after"] = search_after 

840 # if logger: logger(json.dumps(theq)) 

841 # if logger: logger("search_after: " + str(search_after)) 

842 try: 

843 res = cls.send_query(theq) 

844 if len(res.get('hits', {}).get('hits', [])) == 0: 

845 break 

846 search_after = res.get('hits', {}).get('hits', [])[-1].get('sort', []) 

847 except Exception as e: 

848 # if any exception occurs, make sure it's at least logged. 

849 app.logger.exception("Unhandled exception in iterate_unstable method of DAO") 

850 if logger: logger(f"Iterate failed on {json.dumps(theq)}") 

851 raise ScrollException(e) 

852 

853 for r in cls.handle_es_raw_response( 

854 res, 

855 wrap=wrap, 

856 extra_trace_info= 

857 "\nUnstable Iterate:\n{q}\n" 

858 "\n\nPage #{counter} of the ES response with size {page_size}." 

859 .format(q=json.dumps(theq, indent=2), counter=counter, page_size=page_size)): 

860 

861 # apply the limit 

862 if limit is not None and counter >= int(limit): 

863 break 

864 counter += 1 

865 if wrap: 

866 yield cls(**r) 

867 else: 

868 yield r 

869 

870 # if logger: logger(f"Iterated {counter} records") 

871 

872 @classmethod 

873 def iterall(cls, page_size=1000, limit=None, **kwargs): 

874 # TODO: Another candidate for swapping to iterate_pit (or rename iterate_pit to iterate when we're happy) 

875 return cls.iterate(MatchAllQuery().query(), page_size, limit, **kwargs) 

876 

877 @classmethod 

878 def iterall_unstable(cls, page_size=1000, 

879 stripe_field="id", 

880 striped=False, 

881 prefix_generator=None, 

882 prefix_size=4, 

883 limit=None, 

884 logger=None, 

885 must=None, 

886 **kwargs): 

887 def hex_prefixes(n=4): 

888 """ Generate a list of hex prefixes of length n """ 

889 return [str(hex(i))[2:].zfill(n) for i in range(0, 16 ** n)] 

890 

891 if striped: 

892 q = None 

893 if must is not None: 

894 q = { 

895 "query": { 

896 "bool": { 

897 "must": must 

898 } 

899 } 

900 } 

901 total = cls.count(q) 

902 if total == 0: 

903 return 

904 

905 count = 0 

906 prefixes = prefix_generator(prefix_size) if prefix_generator is not None else hex_prefixes(prefix_size) 

907 empty_prefixes = [] 

908 for prefix in prefixes: 

909 if must is None: 

910 q = { 

911 "query": { 

912 "prefix": {stripe_field: prefix} 

913 } 

914 } 

915 else: 

916 q = { 

917 "query": { 

918 "bool": { 

919 "must": must + [ 

920 { 

921 "prefix": {stripe_field: prefix} 

922 } 

923 ] 

924 } 

925 } 

926 } 

927 

928 first = True 

929 for record in cls.iterate_unstable(q, page_size, limit=limit, logger=logger, **kwargs): 

930 count += 1 

931 if logger and first: 

932 if len(empty_prefixes) > 0: 

933 logger(f"Skipped empty prefixes: {empty_prefixes}") 

934 empty_prefixes = [] 

935 logger(f"Exporting prefix: {prefix}") 

936 first = False 

937 if limit is not None: 

938 if count > limit: 

939 if logger: logger(f"Limit reached: {count} / {limit}") 

940 return 

941 yield record 

942 

943 if first: 

944 empty_prefixes.append(prefix) 

945 else: 

946 if logger: logger(f"Finished prefix: {prefix}; {count} total records") 

947 if len(empty_prefixes) > 0: 

948 if logger: logger(f"Skipped empty prefixes: {empty_prefixes}") 

949 else: 

950 if logger: logger("Exporting without prefix striping") 

951 for record in cls.iterate_unstable(q=None, page_size=page_size, limit=limit, logger=logger, **kwargs): 

952 yield record 

953 

954 # Aliases for the iterate functions 

955 scroll = iterate 

956 scroll_pit = iterate_pit 

957 

958 @classmethod 

959 def dump(cls, q=None, page_size=1000, limit=None, out=None, out_template=None, out_batch_sizes=100000, 

960 out_rollover_callback=None, transform=None, es_bulk_format=True, idkey='id', es_bulk_fields=None, 

961 stripe_field="id", striped=False, prefix_generator=None, prefix_size=3, logger=None): 

962 """ Export to file, bulk format or just a json dump of the record """ 

963 

964 filenames = [] 

965 n = 1 

966 current_file = None 

967 if out_template is not None: 

968 current_file = out_template + "." + str(n) 

969 filenames.append(current_file) 

970 if out is None and current_file is not None: 

971 out = open(current_file, "w") 

972 else: 

973 out = sys.stdout 

974 

975 count = 0 

976 if q is None: 

977 iterator = cls.iterall_unstable(page_size=page_size, stripe_field=stripe_field, 

978 striped=striped, prefix_generator=prefix_generator, 

979 prefix_size=prefix_size, 

980 limit=limit, wrap=False, logger=logger) 

981 else: 

982 iterator = cls.iterate_unstable(q, page_size=page_size, limit=limit, wrap=False) 

983 

984 for record in iterator: 

985 if transform is not None: 

986 record = transform(record) 

987 

988 if es_bulk_format: 

989 kwargs = {} 

990 if es_bulk_fields is None: 

991 es_bulk_fields = ["_id", "_index", "_type"] 

992 for key in es_bulk_fields: 

993 if key == "_id": 

994 kwargs["idkey"] = idkey 

995 if key == "_index": 

996 kwargs["index"] = cls.index_name() 

997 if key == "_type": 

998 kwargs["type_"] = type 

999 data = cls.to_bulk_single_rec(record, **kwargs) 

1000 else: 

1001 data = json.dumps(record) + "\n" 

1002 

1003 out.write(data) 

1004 if out_template is not None: 

1005 count += 1 

1006 if count > out_batch_sizes: 

1007 out.close() 

1008 if out_rollover_callback is not None: 

1009 out_rollover_callback(current_file) 

1010 

1011 count = 0 

1012 n += 1 

1013 current_file = out_template + "." + str(n) 

1014 filenames.append(current_file) 

1015 out = open(current_file, "w") 

1016 

1017 if out_template is not None: 

1018 out.close() 

1019 if out_rollover_callback is not None: 

1020 out_rollover_callback(current_file) 

1021 

1022 return filenames 

1023 

1024 @classmethod 

1025 def bulk_load_from_file(cls, source_file, index=None, limit=None, max_content_length=100000000): 

1026 """ ported from esprit.tasks - bulk load to index from file 

1027 :param source_file 

1028 :param index: index name for target 

1029 :param limit: number of records to load (integer) 

1030 :param max_content_length: Upload chunk size in bytes 

1031 """ 

1032 index = index or cls.index_name() 

1033 

1034 source_size = os.path.getsize(source_file) 

1035 with open(source_file, "r") as f: 

1036 if limit is None and source_size < max_content_length: 

1037 # if we aren't selecting a portion of the file, and the file is below the max content length, then 

1038 # we can just serve it directly 

1039 ES.bulk(body=f.read(), index=index, doc_type=cls.doc_type(), request_timeout=120) 

1040 return -1 

1041 else: 

1042 count = 0 

1043 while True: 

1044 chunk = DomainObject._make_next_chunk(f, max_content_length) 

1045 if chunk == "": 

1046 break 

1047 

1048 finished = False 

1049 if limit is not None: 

1050 newlines = chunk.count("\n") 

1051 records = newlines // 2 

1052 if count + records > limit: 

1053 max = (limit - count) * 2 

1054 lines = chunk.split("\n") 

1055 allowed = lines[:max] 

1056 chunk = "\n".join(allowed) + "\n" 

1057 count += max 

1058 finished = True 

1059 else: 

1060 count += records 

1061 

1062 ES.bulk(body=chunk, index=index, doc_type=cls.doc_type(), request_timeout=120) 

1063 if finished: 

1064 break 

1065 if limit is not None: 

1066 return count 

1067 else: 

1068 return -1 

1069 

1070 @staticmethod 

1071 def make_bulk_chunk_files(source_file, out_file_prefix, max_content_length=100000000): 

1072 """ ported from esprit.tasks - break out a bulk file into smaller chunks """ 

1073 

1074 source_size = os.path.getsize(source_file) 

1075 with open(source_file, "r") as f: 

1076 if source_size < max_content_length: 

1077 return [source_file] 

1078 else: 

1079 filenames = [] 

1080 count = 0 

1081 while True: 

1082 count += 1 

1083 chunk = DomainObject._make_next_chunk(f, max_content_length) 

1084 if chunk == "": 

1085 break 

1086 

1087 filename = out_file_prefix + "." + str(count) 

1088 with open(filename, "w") as g: 

1089 g.write(chunk) 

1090 filenames.append(filename) 

1091 

1092 return filenames 

1093 

1094 @staticmethod 

1095 def _make_next_chunk(f, max_content_length): 

1096 """ ported from esprit.tasks - create a bulk chunk, ensuring it's not a partial instruction """ 

1097 

1098 def is_command(line): 

1099 try: 

1100 command = json.loads(line) 

1101 except (json.JSONDecodeError, TypeError): 

1102 return False 

1103 keys = list(command.keys()) 

1104 if len(keys) > 1: 

1105 return False 

1106 if "index" not in keys: 

1107 return False 

1108 subkeys = list(command["index"].keys()) 

1109 for sk in subkeys: 

1110 if sk not in ["_id"]: 

1111 return False 

1112 

1113 return True 

1114 

1115 offset = f.tell() 

1116 chunk = f.read(max_content_length) 

1117 while True: 

1118 last_newline = chunk.rfind("\n") 

1119 tail = chunk[last_newline + 1:] 

1120 chunk = chunk[:last_newline] 

1121 

1122 if is_command(tail): 

1123 f.seek(offset + last_newline) 

1124 if chunk.startswith("\n"): 

1125 chunk = chunk[1:] 

1126 return chunk 

1127 else: 

1128 continue 

1129 

1130 @classmethod 

1131 def prefix_query(cls, field, prefix, filter_condition=None, size=5, facet_field=None, analyzed_field=True): 

1132 # example of a prefix query 

1133 # { 

1134 # "query": {"prefix" : { "bibjson.publisher" : "ope" } }, 

1135 # "size": 0, 

1136 # "facets" : { 

1137 # "publisher" : { "terms" : {"field" : "bibjson.publisher.exact", "size": 5} } 

1138 # } 

1139 # } 

1140 

1141 suffix = app.config['FACET_FIELD'] 

1142 query_field = field 

1143 if analyzed_field: 

1144 if field.endswith(suffix): 

1145 # strip .exact (or whatever it's configured as) off the end 

1146 query_field = field[:field.rfind(suffix)] 

1147 else: 

1148 if not field.endswith(suffix): 

1149 query_field = field + suffix 

1150 

1151 # the actual terms should come from the .exact version of the 

1152 # field - we are suggesting whole values, not fragments 

1153 if facet_field is None: 

1154 facet_field = query_field + suffix 

1155 if not facet_field.endswith(suffix): 

1156 facet_field = facet_field + suffix 

1157 

1158 q = PrefixAutocompleteQuery(query_field, prefix, field, facet_field, size, filter_condition) 

1159 return cls.send_query(q.query()) 

1160 

1161 @classmethod 

1162 def wildcard_autocomplete_query(cls, field, substring, before=True, after=True, facet_size=5, facet_field=None): 

1163 """ 

1164 Example of a wildcard query 

1165 Works only on .exact fields 

1166 

1167 { 

1168 "query" : { 

1169 "wildcard" : {"bibjson.publisher.exact" : "De *"} 

1170 }, 

1171 "size" : 0, 

1172 "facets" : { 

1173 "bibjson.publisher.exact" : { 

1174 "terms" : {"field" : "bibjson.publisher.exact", "size" : 5} 

1175 } 

1176 } 

1177 } 

1178 :param field: 

1179 :param substring: 

1180 :param facet_size: 

1181 :return: 

1182 """ 

1183 # wildcard queries need to be on unanalyzed fields 

1184 suffix = app.config['FACET_FIELD'] 

1185 filter_field = field 

1186 if not filter_field.endswith(suffix): 

1187 filter_field = filter_field + suffix 

1188 

1189 # add the wildcard before/after 

1190 if before: 

1191 substring = "*" + substring 

1192 if after: 

1193 substring = substring + "*" 

1194 

1195 # sort out the facet field 

1196 if facet_field is None: 

1197 facet_field = filter_field 

1198 if not facet_field.endswith(suffix): 

1199 facet_field = facet_field + suffix 

1200 

1201 # build the query 

1202 q = WildcardAutocompleteQuery(filter_field, substring, field, facet_field, facet_size) 

1203 return cls.send_query(q.query()) 

1204 

1205 @classmethod 

1206 def advanced_autocomplete(cls, filter_field, facet_field, substring, size=5, prefix_only=True): 

1207 analyzed = True 

1208 if " " in substring: 

1209 analyzed = False 

1210 

1211 substring = substring.lower() 

1212 

1213 if " " in substring and not prefix_only: 

1214 res = cls.wildcard_autocomplete_query(filter_field, substring, before=True, after=True, facet_size=size, 

1215 facet_field=facet_field) 

1216 else: 

1217 res = cls.prefix_query(filter_field, substring, size=size, facet_field=facet_field, analyzed_field=analyzed) 

1218 

1219 result = [] 

1220 for term in res['aggregations'][filter_field]['buckets']: 

1221 # keep ordering - it's by count by default, so most frequent 

1222 # terms will now go to the front of the result list 

1223 result.append({"id": term['key'], "text": term['key']}) 

1224 return result 

1225 

1226 @classmethod 

1227 def autocomplete(cls, field, prefix, filter_condition=None, size=5): 

1228 res = None 

1229 # if there is a space in the prefix, the prefix query won't work, so we fall back to a wildcard 

1230 # we only do this if we have to, because the wildcard query is a little expensive 

1231 if " " in prefix: 

1232 res = cls.wildcard_autocomplete_query(field, prefix, before=False, after=True, facet_size=size) 

1233 else: 

1234 prefix = prefix.lower() 

1235 res = cls.prefix_query(field, prefix, filter_condition, size=size) 

1236 

1237 result = [] 

1238 for term in res['aggregations'][field]['buckets']: 

1239 # keep ordering - it's by count by default, so most frequent 

1240 # terms will now go to the front of the result list 

1241 result.append({"id": term['key'], "text": term['key']}) 

1242 return result 

1243 

1244 @classmethod 

1245 def q2obj(cls, **kwargs): 

1246 extra_trace_info = '' 

1247 if 'q' in kwargs: 

1248 extra_trace_info = "\nQuery sent to ES (before manipulation in DomainObject.query):\n{}\n".format( 

1249 json.dumps(kwargs['q'], indent=2)) 

1250 

1251 res = cls.query(**kwargs) 

1252 rs = cls.handle_es_raw_response(res, wrap=True, extra_trace_info=extra_trace_info) 

1253 results = [cls(**r) for r in rs] 

1254 return results 

1255 

1256 @classmethod 

1257 def all(cls, size=10000, **kwargs): 

1258 """ This is a shortcut to a match_all query with a large size, to return all records """ 

1259 # FIXME: is this only used in tests? ES now limits size so we can't guarantee ALL without using scroll / scan 

1260 return cls.q2obj(size=size, **kwargs) 

1261 

1262 @classmethod 

1263 def count(cls, query=None): 

1264 res = ES.count(index=cls.index_name(), doc_type=cls.doc_type(), body=query) 

1265 return res.get("count") 

1266 # return requests.get(cls.target() + '_count').json()['count'] 

1267 

1268 @classmethod 

1269 def hit_count(cls, query, **kwargs) -> int: 

1270 countable_query = deepcopy(query) 

1271 if "track_total_hits" not in countable_query: 

1272 countable_query["track_total_hits"] = True 

1273 

1274 res = cls.query(q=countable_query, **kwargs) 

1275 return res.get("hits", {}).get("total", {}).get("value", 0) 

1276 

1277 @classmethod 

1278 def count_updated_since(cls, last_update): 

1279 """ 

1280 Count the number of records updated since a given date 

1281 :param last_update: The date to count from 

1282 :return: The number of records updated since the given date 

1283 """ 

1284 q = { 

1285 "query": { 

1286 "range": { 

1287 "last_updated": { 

1288 "gte": last_update 

1289 } 

1290 } 

1291 } 

1292 } 

1293 return cls.count(q) 

1294 

1295 @classmethod 

1296 def block(cls, id, last_updated=None, sleep=0.5, max_retry_seconds=30): 

1297 if app.config.get("ES_BLOCK_WAIT_OVERRIDE") is not None: 

1298 sleep = app.config["ES_BLOCK_WAIT_OVERRIDE"] 

1299 

1300 q = BlockQuery(id) 

1301 start_time = dates.now() 

1302 while True: 

1303 res = cls.query(q=q.query()) 

1304 hits = res.get("hits", {}).get("hits", []) 

1305 if len(hits) > 0: 

1306 if len(hits) > 1: 

1307 raise Exception("More than one record with id {x}".format(x=id)) 

1308 obj = hits[0].get("_source", {}) 

1309 if last_updated is not None: 

1310 lu = obj.get("last_updated") 

1311 if lu: 

1312 threshold = dates.parse(last_updated) 

1313 lud = dates.parse(lu) 

1314 if lud >= threshold: 

1315 return 

1316 else: 

1317 return 

1318 

1319 if (dates.now() - start_time).total_seconds() >= max_retry_seconds: 

1320 raise (BlockTimeOutException( 

1321 "Attempting to block until record with id {id} appears in the index, but this has not happened after {limit} seconds" 

1322 .format( 

1323 id=id, limit=max_retry_seconds))) 

1324 

1325 time.sleep(sleep) 

1326 

1327 @classmethod 

1328 def blockall(cls, ids_and_last_updateds, sleep=0.05, individual_max_retry_seconds=30): 

1329 for id, lu in ids_and_last_updateds: 

1330 cls.block(id, lu, sleep=sleep, max_retry_seconds=individual_max_retry_seconds) 

1331 

1332 @classmethod 

1333 def blockdeleted(cls, id, sleep=0.5, max_retry_seconds=30): 

1334 if app.config.get("ES_BLOCK_WAIT_OVERRIDE") is not None: 

1335 sleep = app.config["ES_BLOCK_WAIT_OVERRIDE"] 

1336 

1337 q = BlockQuery(id) 

1338 start_time = dates.now() 

1339 while True: 

1340 res = cls.query(q=q.query()) 

1341 hits = res.get("hits", {}).get("hits", []) 

1342 if len(hits) == 0: 

1343 return 

1344 else: 

1345 if (dates.now() - start_time).total_seconds() >= max_retry_seconds: 

1346 raise BlockTimeOutException( 

1347 "Attempting to block until record with id {id} deleted from Elasticsearch, but this has not happened after {limit}".format( 

1348 id=id, limit=max_retry_seconds)) 

1349 

1350 time.sleep(sleep) 

1351 

1352 @classmethod 

1353 def blockalldeleted(cls, ids, sleep=0.05, individual_max_retry_seconds=30): 

1354 for id in ids: 

1355 cls.blockdeleted(id, sleep, individual_max_retry_seconds) 

1356 

1357 @classmethod 

1358 def save_all(cls, models, blocking=False): 

1359 for m in models: 

1360 m.save() 

1361 if blocking: 

1362 cls.blockall((m.id, getattr(m, "last_updated", None)) for m in models) 

1363 

1364 @classmethod 

1365 def create_and_seed_index_and_rollover_alias(cls, documents: List[dict], mapping=None, keep_history=0): 

1366 """ 

1367 Create a new index based on this object's type, put the mapping if provided, and seed it with the documents. 

1368 The indexes alias is then repointed to the new index. 

1369 If a keep_history value is provided, the previous indexes with the same prefix as the alias will be deleted, 

1370 

1371 :param documents: 

1372 :param mapping: 

1373 :param keep_history: supply -1 to keep all history, 0 to keep no history, or a positive integer to keep that many previous indexes. 

1374 :return: 

1375 """ 

1376 full_alias = cls.index_name() 

1377 new_name = cls.__type__ + "-" + dates.now_str(dates.FMT_DATETIME_LONG) 

1378 if mapping is not None: 

1379 cls.put_mapping(mapping, override_index_name=new_name) 

1380 resp = cls.bulk(documents, refresh=True, override_index_name=new_name) 

1381 if resp.get('errors', False): 

1382 raise ESError("Error creating index {}: {}".format(new_name, resp)) 

1383 cls.move_alias(full_alias, new_name) 

1384 

1385 if keep_history > -1: 

1386 # Find all indexes with the same prefix as full_alias 

1387 all_indexes = find_indexes_by_prefix(full_alias) 

1388 all_indexes.sort() 

1389 keep_history += 1 

1390 for idx in all_indexes[:-keep_history]: 

1391 try: 

1392 ES.indices.delete(index=idx) 

1393 except elasticsearch.exceptions.NotFoundError: 

1394 pass 

1395 except elasticsearch.exceptions.RequestError as e: 

1396 raise ESError(e) 

1397 

1398 

1399 @classmethod 

1400 def put_mapping(cls, mapping: dict, override_index_name: str = None): 

1401 """ 

1402 Put a mapping for the index 

1403 :param mapping: The mapping to put. If None then use cls.mapping() 

1404 """ 

1405 try: 

1406 if "mappings" not in mapping: 

1407 mapping = {"mappings": mapping} 

1408 return ES.indices.create(index=cls.index_name(override_index_name), 

1409 body=mapping, 

1410 request_timeout=app.config.get("ES_SOCKET_TIMEOUT", None)) 

1411 except elasticsearch.exceptions.RequestError as e: 

1412 raise ESError(e) 

1413 

1414 @classmethod 

1415 def move_alias(cls, alias_name: str, target_index: str): 

1416 """ 

1417 Create or update an alias for the index 

1418 :param alias_name: The name of the alias to create or update 

1419 :param target_index: The name of the index to point the alias to 

1420 """ 

1421 

1422 try: 

1423 alias_info = ES.indices.get_alias(name=alias_name) 

1424 old_index_name = list(alias_info.keys())[0] 

1425 full_index_name = cls.index_name(target_index) 

1426 body = { 

1427 "actions": [ 

1428 { 

1429 "add": { 

1430 "index": full_index_name, 

1431 "alias": alias_name 

1432 } 

1433 }, 

1434 { 

1435 "remove": { 

1436 "index": old_index_name, 

1437 "alias": alias_name 

1438 } 

1439 } 

1440 ] 

1441 } 

1442 ES.indices.update_aliases(body=body) 

1443 except elasticsearch.exceptions.RequestError as e: 

1444 raise ESError(e) 

1445 

1446 

1447 

1448 

1449def any_pending_tasks(): 

1450 """ Check if there are any pending tasks in the elasticsearch task queue """ 

1451 results = ES.cluster.pending_tasks() 

1452 return len(results["tasks"]) > 0 

1453 

1454 

1455def query_data_tasks(timeout='30s'): 

1456 """ Check if there are any pending tasks in the elasticsearch task queue """ 

1457 results = ES.tasks.list(params={ 

1458 "actions": 'indices:data*', 

1459 "timeout": timeout, 

1460 "wait_for_completion": 'true', 

1461 }) 

1462 tasks = [] 

1463 for node in results['nodes'].values(): 

1464 tasks.extend(node['tasks'].values()) 

1465 return tasks 

1466 

1467 

1468def refresh(): 

1469 """ 

1470 refresh all indexes to make newly added or deleted documents immediately searchable 

1471 """ 

1472 return ES.indices.refresh() 

1473 

1474 

1475def find_indexes_by_prefix(index_prefix) -> list[str]: 

1476 data = ES.indices.get(f'{index_prefix}*') 

1477 return list(data.keys()) 

1478 

1479 

1480def find_index_aliases(alias_prefixes=None) -> Iterable[Tuple[str, str]]: 

1481 def _yield_index_alias(): 

1482 data = ES.indices.get_alias() 

1483 for index, d in data.items(): 

1484 for alias in d['aliases'].keys(): 

1485 yield index, alias 

1486 

1487 index_aliases = _yield_index_alias() 

1488 if alias_prefixes: 

1489 index_aliases = ((index, alias) for index, alias in index_aliases 

1490 if any(alias.startswith(p) for p in alias_prefixes)) 

1491 return index_aliases 

1492 

1493 

1494def is_exist(query: dict, index): 

1495 query['size'] = 1 

1496 query['_source'] = False 

1497 res = ES.search(body=query, index=index, size=1, ignore=[404]) 

1498 

1499 return res.get('hits', {}).get('total',{}).get('value', 0) > 0 

1500 

1501 

1502class BlockTimeOutException(Exception): 

1503 pass 

1504 

1505 

1506class DAOSaveExceptionMaxRetriesReached(Exception): 

1507 pass 

1508 

1509 

1510class ESResponseCannotBeInterpreted(Exception): 

1511 pass 

1512 

1513 

1514class ESMappingMissingError(Exception): 

1515 pass 

1516 

1517 

1518class ESError(Exception): 

1519 pass 

1520 

1521 

1522######################################################################## 

1523# Some useful ES queries 

1524######################################################################## 

1525 

1526 

1527class MatchAllQuery(object): 

1528 def query(self): 

1529 return { 

1530 "track_total_hits": True, 

1531 "query": { 

1532 "match_all": {} 

1533 } 

1534 } 

1535 

1536 

1537class BlockQuery(object): 

1538 def __init__(self, id): 

1539 self._id = id 

1540 

1541 def query(self): 

1542 return { 

1543 "query": { 

1544 "ids": { 

1545 "values": [self._id] 

1546 } 

1547 }, 

1548 "_source": ["last_updated"], 

1549 "size": 2 

1550 } 

1551 

1552 

1553class PrefixAutocompleteQuery(object): 

1554 def __init__(self, query_field, prefix, agg_name, agg_field, agg_size, filter_condition=None): 

1555 self._query_field = query_field 

1556 self._prefix = prefix 

1557 self._agg_name = agg_name 

1558 self._agg_field = agg_field 

1559 self._agg_size = agg_size 

1560 self._filter_condition = filter_condition 

1561 def query(self): 

1562 query_body = { 

1563 "track_total_hits": True, 

1564 "query": { 

1565 "bool": { 

1566 "must": [ 

1567 {"prefix": {self._query_field: self._prefix.lower()}} # Keep the prefix query 

1568 ] 

1569 } 

1570 }, 

1571 "size": 0, 

1572 "aggs": { 

1573 self._agg_name: {"terms": {"field": self._agg_field, "size": self._agg_size}} 

1574 } 

1575 } 

1576 

1577 if self._filter_condition: 

1578 query_body["query"]["bool"]["filter"] = [ 

1579 {"term": self._filter_condition} 

1580 ] 

1581 

1582 return query_body 

1583 

1584 

1585 

1586class WildcardAutocompleteQuery(object): 

1587 def __init__(self, wildcard_field, wildcard_query, agg_name, agg_field, agg_size): 

1588 self._wildcard_field = wildcard_field 

1589 self._wildcard_query = wildcard_query 

1590 self._agg_name = agg_name 

1591 self._agg_field = agg_field 

1592 self._agg_size = agg_size 

1593 

1594 def query(self): 

1595 return { 

1596 "track_total_hits": True, 

1597 "query": { 

1598 "wildcard": {self._wildcard_field: self._wildcard_query} 

1599 }, 

1600 "size": 0, 

1601 "aggs": { 

1602 self._agg_name: { 

1603 "terms": {"field": self._agg_field, "size": self._agg_size} 

1604 } 

1605 } 

1606 } 

1607 

1608 

1609######################################################################### 

1610# A query handler that knows how to speak facetview2 

1611######################################################################### 

1612 

1613 

1614class Facetview2(object): 

1615 """ 

1616 ~~SearchURLGenerator:Feature->Elasticsearch:Technology~~ 

1617 

1618 # Examples of queries 

1619 # {"query":{"filtered":{"filter":{"bool":{"must":[{"term":{"_type":"article"}}]}},"query":{"query_string":{"query":"richard","default_operator":"OR"}}}},"from":0,"size":10} 

1620 # {"query":{"query_string":{"query":"richard","default_operator":"OR"}},"from":0,"size":10} 

1621 """ 

1622 @staticmethod 

1623 def make_term_filter(term, value): 

1624 return {"term": {term: value}} 

1625 

1626 @staticmethod 

1627 def make_query(query_string=None, filters=None, default_operator="OR", sort_parameter=None, sort_order="asc", 

1628 default_field=None): 

1629 query_part = {"match_all": {}} 

1630 if query_string is not None: 

1631 query_part = {"query_string": {"query": query_string, "default_operator": default_operator}} 

1632 if default_field is not None: 

1633 query_part["query_string"]["default_field"] = default_field 

1634 query = {"query": query_part} 

1635 

1636 if filters is not None: 

1637 if not isinstance(filters, list): 

1638 filters = [filters] 

1639 filters.append(query_part) 

1640 bool_part = {"bool": {"must": filters}} 

1641 query = {"query": query_part} 

1642 

1643 if sort_parameter is not None: 

1644 # For facetview we can only have one sort parameter, but ES actually supports lists 

1645 sort_part = [{sort_parameter: {"order": sort_order}}] 

1646 query["sort"] = sort_part 

1647 

1648 return query 

1649 

1650 @staticmethod 

1651 def url_encode_query(query): 

1652 return urllib.parse.quote(json.dumps(query).replace(' ', '')) 

1653 

1654 

1655def patch_model_for_bulk(obj: DomainObject): 

1656 obj.data['es_type'] = obj.__type__ 

1657 obj.data['id'] = obj.makeid() 

1658 return obj