Coverage for portality/dao.py: 80%

596 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-22 15:59 +0100

1import time 

2import re 

3import sys 

4import uuid 

5import json 

6import elasticsearch 

7import urllib.parse 

8 

9from collections import UserDict 

10from copy import deepcopy 

11from datetime import datetime, timedelta 

12from typing import List 

13 

14from portality.core import app, es_connection as ES 

15 

16 

17# All models in models.py should inherit this DomainObject to know how to save themselves in the index and so on. 

18# You can overwrite and add to the DomainObject functions as required. See models.py for some examples. 

19 

20 

21ES_MAPPING_MISSING_REGEX = re.compile(r'.*No mapping found for \[[a-zA-Z0-9-_\.]+?\] in order to sort on.*', re.DOTALL) 

22CONTENT_TYPE_JSON = {'Content-Type': 'application/json'} 

23 

24 

25class ElasticSearchWriteException(Exception): 

26 pass 

27 

28 

29class ScrollException(Exception): 

30 pass 

31 

32 

33class ScrollInitialiseException(ScrollException): 

34 pass 

35 

36 

37class ScrollTimeoutException(ScrollException): 

38 pass 

39 

40 

41class BulkException(Exception): 

42 pass 

43 

44 

45class DomainObject(UserDict, object): 

46 """ 

47 ~~DomainObject:Model->Elasticsearch:Technology~~ 

48 """ 

49 __type__ = None # set the type on the model that inherits this 

50 

51 def __init__(self, **kwargs): 

52 # if self.data is already set, don't do anything here 

53 try: 

54 object.__getattribute__(self, "data") 

55 except: 

56 if '_source' in kwargs: 

57 self.data = dict(kwargs['_source']) 

58 self.meta = dict(kwargs) 

59 del self.meta['_source'] 

60 else: 

61 self.data = dict(kwargs) 

62 # FIXME: calling super() breaks everything, even thought this is the correct thing to do 

63 # this is because the DomainObject incorrectly overrides properties of the super class 

64 # super(DomainObject, self).__init__() 

65 

66 @classmethod 

67 def index_name(cls): 

68 if app.config['ELASTIC_SEARCH_INDEX_PER_TYPE'] and cls.__type__ is not None: 

69 name = ','.join([app.config['ELASTIC_SEARCH_DB_PREFIX'] + t for t in cls.__type__.split(',')]) 

70 else: 

71 name = app.config['ELASTIC_SEARCH_DB'] 

72 return name 

73 

74 @classmethod 

75 def doc_type(cls): 

76 if app.config['ELASTIC_SEARCH_INDEX_PER_TYPE']: 

77 return None 

78 else: 

79 return cls.__type__ 

80 

81 @classmethod 

82 def makeid(cls): 

83 """Create a new id for data object overwrite this in specific model types if required""" 

84 return str(uuid.uuid4().hex) 

85 

86 @property 

87 def id(self): 

88 rawid = self.data.get("id", None) 

89 if rawid is not None: 

90 return str(rawid) 

91 return None 

92 

93 def set_id(self, id=None): 

94 if id is None: 

95 id = self.makeid() 

96 self.data["id"] = str(id) 

97 

98 @property 

99 def version(self): 

100 return self.meta.get('_version', None) 

101 

102 @property 

103 def json(self): 

104 return json.dumps(self.data) 

105 

106 def set_created(self, date=None): 

107 if date is None: 

108 self.data['created_date'] = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") 

109 else: 

110 self.data['created_date'] = date 

111 

112 @property 

113 def created_date(self): 

114 return self.data.get("created_date") 

115 

116 @property 

117 def created_timestamp(self): 

118 return datetime.strptime(self.data.get("created_date"), "%Y-%m-%dT%H:%M:%SZ") 

119 

120 @property 

121 def last_updated(self): 

122 return self.data.get("last_updated") 

123 

124 @property 

125 def last_updated_timestamp(self): 

126 return datetime.strptime(self.last_updated, "%Y-%m-%dT%H:%M:%SZ") 

127 

128 def save(self, retries=0, back_off_factor=1, differentiate=False, blocking=False, block_wait=0.25): 

129 """ 

130 ~~->ReadOnlyMode:Feature~~ 

131 :param retries: 

132 :param back_off_factor: 

133 :param differentiate: 

134 :param blocking: 

135 :return: 

136 """ 

137 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False): 

138 app.logger.warn("System is in READ-ONLY mode, save command cannot run") 

139 return 

140 

141 if retries > app.config.get("ES_RETRY_HARD_LIMIT", 1000): # an arbitrary large number 

142 retries = app.config.get("ES_RETRY_HARD_LIMIT", 1000) 

143 

144 if app.config.get("ES_BLOCK_WAIT_OVERRIDE") is not None: 

145 block_wait = app.config["ES_BLOCK_WAIT_OVERRIDE"] 

146 

147 if 'id' not in self.data: 

148 self.data['id'] = self.makeid() 

149 

150 self.data['es_type'] = self.__type__ 

151 

152 now = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") 

153 if (blocking or differentiate) and "last_updated" in self.data: 

154 diff = datetime.now() - datetime.strptime(self.data["last_updated"], "%Y-%m-%dT%H:%M:%SZ") 

155 

156 # we need the new last_updated time to be later than the new one 

157 if diff.total_seconds() < 1: 

158 soon = datetime.utcnow() + timedelta(seconds=1) 

159 now = soon.strftime("%Y-%m-%dT%H:%M:%SZ") 

160 

161 self.data['last_updated'] = now 

162 

163 if 'created_date' not in self.data: 

164 self.data['created_date'] = now 

165 

166 attempt = 0 

167 d = json.dumps(self.data) 

168 r = None 

169 while attempt <= retries: 

170 try: 

171 r = ES.index(self.index_name(), d, doc_type=self.doc_type(), id=self.data.get("id"), headers=CONTENT_TYPE_JSON) 

172 break 

173 

174 except (elasticsearch.ConnectionError, elasticsearch.ConnectionTimeout): 

175 app.logger.exception("Failed to connect to ES") 

176 attempt += 1 

177 

178 except elasticsearch.TransportError as e: 

179 # Retries depend on which end the error lies. 

180 if 400 <= e.status_code < 500: 

181 # Bad request, do not retry as it won't work. Fail with ElasticSearchWriteException. 

182 app.logger.exception("Bad Request to ES, save failed. Details: {0}".format(e.error)) 

183 raise ElasticSearchWriteException(e.error) 

184 elif e.status_code >= 500: 

185 # Server error, this could be temporary so we may want to retry 

186 app.logger.exception("Server Error from ES, retrying. Details: {0}".format(e.error)) 

187 attempt += 1 

188 except Exception as e: 

189 # if any other exception occurs, make sure it's at least logged. 

190 app.logger.exception("Unhandled exception in save method of DAO") 

191 raise ElasticSearchWriteException(e) 

192 

193 # wait before retrying 

194 time.sleep((2**attempt) * back_off_factor) 

195 

196 if attempt > retries: 

197 raise DAOSaveExceptionMaxRetriesReached( 

198 "After {attempts} attempts the record with " 

199 "id {id} failed to save.".format( 

200 attempts=attempt, id=self.data['id'])) 

201 

202 if blocking: 

203 bq = BlockQuery(self.id) 

204 while True: 

205 res = self.query(q=bq.query()) 

206 j = self._unwrap_search_result(res) 

207 if len(j) == 0: 

208 time.sleep(block_wait) 

209 continue 

210 if len(j) > 1: 

211 raise Exception("More than one record with id {x}".format(x=self.id)) 

212 if j[0].get("last_updated", [])[0] == now: 

213 break 

214 else: 

215 time.sleep(block_wait) 

216 continue 

217 

218 return r 

219 

220 def delete(self): 

221 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False): 

222 app.logger.warn("System is in READ-ONLY mode, delete command cannot run") 

223 return 

224 

225 # r = requests.delete(self.target() + self.id) 

226 try: 

227 ES.delete(self.index_name(), self.id, doc_type=self.doc_type()) 

228 except elasticsearch.NotFoundError: 

229 pass # This is to preserve the old behaviour 

230 

231 @staticmethod 

232 def make_query(theq=None, should_terms=None, consistent_order=True, **kwargs): 

233 """ 

234 Generate a query object based on parameters but don't send to 

235 backend - return it instead. Must always have the same 

236 parameters as the query method. See query method for explanation 

237 of parameters. 

238 """ 

239 if theq is None: 

240 theq = "" 

241 q = deepcopy(theq) 

242 if isinstance(q, dict): 

243 query = q 

244 if 'bool' not in query['query']: 

245 boolean = {'bool': {'must': []}} 

246 boolean['bool']['must'].append(query['query']) 

247 query['query'] = boolean 

248 if 'must' not in query['query']['bool']: 

249 query['query']['bool']['must'] = [] 

250 elif q: 

251 query = { 

252 'query': { 

253 'bool': { 

254 'must': [ 

255 {'query_string': {'query': q}} 

256 ] 

257 } 

258 } 

259 } 

260 else: 

261 query = { 

262 'query': { 

263 'bool': { 

264 'must': [ 

265 {'match_all': {}} 

266 ] 

267 } 

268 } 

269 } 

270 

271 if should_terms is not None and len(should_terms) > 0: 

272 for s in should_terms: 

273 if not isinstance(should_terms[s], list): 

274 should_terms[s] = [should_terms[s]] 

275 query["query"]["bool"]["must"].append({"terms": {s: should_terms[s]}}) 

276 

277 sort_specified = False 

278 for k, v in kwargs.items(): 

279 if k == '_from': 

280 query['from'] = v 

281 elif k == 'sort': 

282 sort_specified = True 

283 query['sort'] = v 

284 else: 

285 query[k] = v 

286 if "sort" in query: 

287 sort_specified = True 

288 

289 if not sort_specified and consistent_order: 

290 # FIXME: review this - where is default sort necessary, and why do we want this in ID order? 

291 query['sort'] = [{"id.exact": {"order": "asc", "unmapped_type": "keyword"}}] 

292 

293 return query 

294 

295 @classmethod 

296 def _unwrap_search_result(cls, res): 

297 return [i.get("_source") if "_source" in i else i.get("fields") for i in 

298 res.get('hits', {}).get('hits', [])] 

299 

300 @classmethod 

301 def bulk_delete(cls, id_list, idkey='id', refresh=False): 

302 return cls.bulk(documents=[{'id': i} for i in id_list], idkey=idkey, refresh=refresh, action='delete') 

303 

304 @classmethod 

305 def bulk(cls, documents: List[dict], idkey='id', refresh=False, action='index', req_timeout=10, **kwargs): 

306 """ 

307 :param documents: a list of objects to perform bulk actions on (list of dicts) 

308 :param idkey: The path to extract an ID from the object, e.g. 'id', 'identifiers.id' 

309 :param refresh: Refresh the index in each operation (make immediately available for search) - expensive! 

310 :param req_timeout: Request timeout for bulk operation 

311 :param kwargs: kwargs are passed into the bulk instruction for each record 

312 """ 

313 # ~~->ReadOnlyMode:Feature~~ 

314 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False): 

315 app.logger.warn("System is in READ-ONLY mode, bulk command cannot run") 

316 return 

317 

318 if action not in ['index', 'update', 'delete']: 

319 raise Exception("Unrecognised bulk action '{0}'".format(action)) 

320 

321 data = '' 

322 for d in documents: 

323 data += cls.to_bulk_single_rec(d, idkey=idkey, action=action, **kwargs) 

324 resp = ES.bulk(body=data, index=cls.index_name(), doc_type=cls.doc_type(), refresh=refresh, 

325 request_timeout=req_timeout) 

326 return resp 

327 

328 @staticmethod 

329 def to_bulk_single_rec(record, idkey="id", action="index", **kwargs): 

330 """ Adapted from esprit. Create a bulk instruction from a single record. """ 

331 data = '' 

332 idpath = idkey.split(".") 

333 

334 # traverse down the object in search of the supplied ID key 

335 context = record 

336 for pathseg in idpath: 

337 if pathseg in context: 

338 context = context[pathseg] 

339 else: 

340 raise BulkException( 

341 "'{0}' not available in record to generate bulk _id: {1}".format(idkey, json.dumps(record))) 

342 

343 datadict = {action: {'_id': context}} 

344 datadict[action].update(kwargs) 

345 

346 data += json.dumps(datadict) + '\n' 

347 

348 if action == 'delete': 

349 return data 

350 

351 # For update, we wrap the document in {doc: document} if not already supplied 

352 if action == 'update' and not (record.get('doc') and len(record.keys()) == 1): 

353 data += json.dumps({'doc': record}) + '\n' 

354 else: 

355 data += json.dumps(record) + '\n' 

356 return data 

357 

358 @classmethod 

359 def refresh(cls): 

360 """ 

361 ~~->ReadOnlyMode:Feature~~ 

362 :return: 

363 """ 

364 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False): 

365 app.logger.warn("System is in READ-ONLY mode, refresh command cannot run") 

366 return 

367 

368 # r = requests.post(cls.target() + '_refresh', headers=CONTENT_TYPE_JSON) 

369 # return r.json() 

370 

371 return ES.indices.refresh(index=cls.index_name()) 

372 

373 @classmethod 

374 def pull(cls, id_): 

375 """Retrieve object by id.""" 

376 if id_ is None or id_ == '': 

377 return None 

378 

379 try: 

380 # out = requests.get(cls.target() + id_) 

381 out = ES.get(cls.index_name(), id_, doc_type=cls.doc_type()) 

382 except elasticsearch.NotFoundError: 

383 return None 

384 except elasticsearch.TransportError as e: 

385 raise Exception("ES returned an error: {x}".format(x=e.info)) 

386 except Exception as e: 

387 return None 

388 if out is None: 

389 return None 

390 

391 return cls(**out) 

392 

393 @classmethod 

394 def pull_by_key(cls, key, value): 

395 res = cls.query(q={"query": {"term": {key+app.config['FACET_FIELD']: value}}}) 

396 if res.get('hits', {}).get('total', {}).get('value', 0) == 1: 

397 return cls.pull(res['hits']['hits'][0]['_source']['id']) 

398 else: 

399 return None 

400 

401 @classmethod 

402 def object_query(cls, q=None, **kwargs): 

403 result = cls.query(q, **kwargs) 

404 return [cls(**r.get("_source")) for r in result.get("hits", {}).get("hits", [])] 

405 

406 @classmethod 

407 def query(cls, q=None, **kwargs): 

408 """Perform a query on backend. 

409 

410 :param q: maps to query_string parameter if string, or query dict if dict. 

411 :param kwargs: any keyword args as per 

412 http://www.elasticsearch.org/guide/reference/api/search/uri-request.html 

413 """ 

414 query = cls.make_query(q, **kwargs) 

415 return cls.send_query(query) 

416 

417 @classmethod 

418 def send_query(cls, qobj, retry=50, **kwargs): 

419 """Actually send a query object to the backend. 

420 :param kwargs are passed directly to Elasticsearch search() function 

421 """ 

422 

423 if retry > app.config.get("ES_RETRY_HARD_LIMIT", 1000) + 1: # an arbitrary large number 

424 retry = app.config.get("ES_RETRY_HARD_LIMIT", 1000) + 1 

425 

426 r = None 

427 count = 0 

428 exception = None 

429 while count < retry: 

430 count += 1 

431 try: 

432 # ES 7.10 updated target to whole index, since specifying type for search is deprecated 

433 # r = requests.post(cls.target_whole_index() + recid + "_search", data=json.dumps(qobj), headers=CONTENT_TYPE_JSON) 

434 r = ES.search(body=json.dumps(qobj), index=cls.index_name(), doc_type=cls.doc_type(), headers=CONTENT_TYPE_JSON, **kwargs) 

435 break 

436 except Exception as e: 

437 exception = ESMappingMissingError(e) if ES_MAPPING_MISSING_REGEX.match(json.dumps(e.args[2])) else e 

438 if isinstance(exception, ESMappingMissingError): 

439 raise exception 

440 time.sleep(0.5) 

441 

442 if r is not None: 

443 return r 

444 if exception is not None: 

445 raise exception 

446 raise Exception("Couldn't get the ES query endpoint to respond. Also, you shouldn't be seeing this.") 

447 

448 @classmethod 

449 def remove_by_id(cls, id): 

450 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False): 

451 app.logger.warn("System is in READ-ONLY mode, delete_by_id command cannot run") 

452 return 

453 

454 # r = requests.delete(cls.target() + id) 

455 try: 

456 ES.delete(cls.index_name(), id) 

457 except elasticsearch.NotFoundError: 

458 return 

459 

460 @classmethod 

461 def delete_by_query(cls, query): 

462 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False): 

463 app.logger.warn("System is in READ-ONLY mode, delete_by_query command cannot run") 

464 return 

465 

466 #r = requests.delete(cls.target() + "_query", data=json.dumps(query)) 

467 #return r 

468 return ES.delete_by_query(cls.index_name(), json.dumps(query), doc_type=cls.doc_type()) 

469 

470 

471 @classmethod 

472 def destroy_index(cls): 

473 if app.config.get("READ_ONLY_MODE", False) and app.config.get("SCRIPTS_READ_ONLY_MODE", False): 

474 app.logger.warn("System is in READ-ONLY mode, destroy_index command cannot run") 

475 return 

476 

477 # if app.config['ELASTIC_SEARCH_INDEX_PER_TYPE']: 

478 # return esprit.raw.delete_index_by_prefix(es_connection, app.config['ELASTIC_SEARCH_DB_PREFIX']) 

479 # else: 

480 # return esprit.raw.delete_index(es_connection) 

481 print('Destroying indexes with prefix ' + app.config['ELASTIC_SEARCH_DB_PREFIX'] + '*') 

482 return ES.indices.delete(app.config['ELASTIC_SEARCH_DB_PREFIX'] + '*') 

483 

484 @classmethod 

485 def check_es_raw_response(cls, res, extra_trace_info=''): 

486 if 'error' in res: 

487 es_resp = json.dumps(res, indent=2) 

488 

489 error_to_raise = ESMappingMissingError if ES_MAPPING_MISSING_REGEX.match(es_resp) else ESError 

490 

491 raise error_to_raise( 

492 ( 

493 "Elasticsearch returned an error:" 

494 "\nES HTTP Response status: {es_status}" 

495 "\nES Response:{es_resp}" 

496 .format(es_status=res.get('status', 'unknown'), es_resp=es_resp) 

497 ) + extra_trace_info 

498 ) 

499 

500 if 'hits' not in res and 'hits' not in res['hits']: # i.e. if res['hits']['hits'] does not exist 

501 raise ESResponseCannotBeInterpreted( 

502 ( 

503 "Elasticsearch did not return any records. " 

504 "It probably returned an error we did not understand instead." 

505 "\nES HTTP Response status: {es_status}" 

506 "\nES Response:{es_resp}\n" 

507 .format(es_status=res.get('status', 'unknown'), es_resp=json.dumps(res, indent=2)) 

508 ) + extra_trace_info 

509 ) 

510 return True 

511 

512 @classmethod 

513 def handle_es_raw_response(cls, res, wrap, extra_trace_info=''): 

514 """ 

515 Handles the JSON returned by ES, raising errors as needed. If no problems are detected it returns its input 

516 unchanged. 

517 

518 :param res: The full ES raw response to a query in a Python dict (this method does not handle the raw JSON ES 

519 outputs). Usually this parameter is the return value of the .query or .send_query methods. 

520 :param wrap: Did the caller request wrapping of each ES record inside a model object? This matters for handling 

521 records that have no '_source' or 'fields' keys, but do have an '_id' key. Such records should raise an error 

522 if wrapping was requested, since there is nothing to wrap. If no wrapping was requested, perhaps the caller 

523 simply needed the object IDs and nothing else, so we do not need to raise an error. 

524 :param extra_trace_info: A string with additional diagnostic information to be put into exceptions. 

525 """ 

526 

527 cls.check_es_raw_response(res) 

528 

529 rs = [] 

530 for i, each in enumerate(res['hits']['hits']): 

531 if '_source' in each: 

532 rs.append(each['_source']) 

533 elif 'fields' in each: 

534 rs.append(each['fields']) 

535 elif '_id' in each and not wrap: 

536 # "_id" is a sibling (not child) of "_source" so it can only be used with unwrapped raw responses. 

537 # wrap = True only makes sense if "_source" or "fields" were returned. 

538 # So, if "_id" is the only one present, extract it into an object that's shaped the same as the item 

539 # in the raw response. 

540 rs.append({"_id": each['_id']}) 

541 else: 

542 msg1 = "Can't find any useful data in the ES response.\n" + extra_trace_info 

543 msg2 = "\nItem {i}.\nItem data:\n{each}".format(i=i, each=json.dumps(each, indent=2)) 

544 raise ESResponseCannotBeInterpreted(msg1 + msg2) 

545 

546 return rs 

547 

548 @classmethod 

549 def iterate(cls, q: dict = None, page_size: int = 1000, limit: int = None, wrap: bool = True, keepalive: str = '1m'): 

550 """ Provide an iterable of all items in a model, use 

551 :param q: The query to scroll results on 

552 :param page_size: limited by ElasticSearch, check settings to override 

553 :param limit: Limit the number of results returned (e.g. to take a slice) 

554 :param wrap: Whether to return the results in raw json or wrapped as an object 

555 :param keepalive: scroll timeout 

556 """ 

557 theq = {"query": {"match_all": {}}} if q is None else deepcopy(q) 

558 theq["size"] = page_size 

559 theq["from"] = 0 

560 if "sort" not in theq: 

561 # This gives the same performance enhancement as scan, use it by default. This is the order of indexing like sort by ID 

562 theq["sort"] = ["_doc"] 

563 

564 # Initialise the scroll 

565 try: 

566 res = cls.send_query(theq, scroll=keepalive) 

567 except Exception as e: 

568 raise ScrollInitialiseException("Unable to initialise scroll - could be your mappings are broken", e) 

569 

570 # unpack scroll response 

571 scroll_id = res.get('_scroll_id') 

572 total_results = res.get('hits', {}).get('total', {}).get('value') 

573 

574 # Supply the first set of results 

575 counter = 0 

576 for r in cls.handle_es_raw_response( 

577 res, 

578 wrap=wrap, 

579 extra_trace_info= 

580 "\nScroll initialised:\n{q}\n" 

581 "\n\nPage #{counter} of the ES response with size {page_size}." 

582 .format(q=json.dumps(theq, indent=2), counter=counter, page_size=page_size)): 

583 

584 # apply the limit 

585 if limit is not None and counter >= int(limit): 

586 break 

587 counter += 1 

588 if wrap: 

589 yield cls(**r) 

590 else: 

591 yield r 

592 

593 # Continue to scroll through the rest of the results 

594 while True: 

595 # apply the limit 

596 if limit is not None and counter >= int(limit): 

597 break 

598 

599 # if we consumed all the results we were expecting, we can just stop here 

600 if counter >= total_results: 

601 break 

602 

603 # get the next page and check that we haven't timed out 

604 try: 

605 res = ES.scroll(scroll_id=scroll_id, scroll=keepalive) 

606 except elasticsearch.exceptions.NotFoundError as e: 

607 raise ScrollTimeoutException( 

608 "Scroll timed out; {status} - {message}".format(status=e.status_code, message=e.info)) 

609 except Exception as e: 

610 # if any other exception occurs, make sure it's at least logged. 

611 app.logger.exception("Unhandled exception in scroll method of DAO") 

612 raise ScrollException(e) 

613 

614 # if we didn't get any results back, this means we're at the end 

615 if len(res.get('hits', {}).get('hits')) == 0: 

616 break 

617 

618 for r in cls.handle_es_raw_response( 

619 res, 

620 wrap=wrap, 

621 extra_trace_info= 

622 "\nScroll:\n{q}\n" 

623 "\n\nPage #{counter} of the ES response with size {page_size}." 

624 .format(q=json.dumps(theq, indent=2), counter=counter, page_size=page_size)): 

625 

626 # apply the limit 

627 if limit is not None and counter >= int(limit): 

628 break 

629 counter += 1 

630 if wrap: 

631 yield cls(**r) 

632 else: 

633 yield r 

634 

635 @classmethod 

636 def iterall(cls, page_size=1000, limit=None): 

637 return cls.iterate(MatchAllQuery().query(), page_size, limit) 

638 

639 # an alias for the iterate function 

640 scroll = iterate 

641 

642 @classmethod 

643 def dump(cls, q=None, page_size=1000, limit=None, out=None, out_template=None, out_batch_sizes=100000, 

644 out_rollover_callback=None, transform=None, es_bulk_format=True, idkey='id', es_bulk_fields=None, 

645 scroll_keepalive='2m'): 

646 """ Export to file, bulk format or just a json dump of the record """ 

647 

648 q = q if q is not None else {"query": {"match_all": {}}} 

649 

650 filenames = [] 

651 n = 1 

652 current_file = None 

653 if out_template is not None: 

654 current_file = out_template + "." + str(n) 

655 filenames.append(current_file) 

656 if out is None and current_file is not None: 

657 out = open(current_file, "w") 

658 else: 

659 out = sys.stdout 

660 

661 count = 0 

662 for record in cls.scroll(q, page_size=page_size, limit=limit, wrap=False, keepalive=scroll_keepalive): 

663 if transform is not None: 

664 record = transform(record) 

665 

666 if es_bulk_format: 

667 kwargs = {} 

668 if es_bulk_fields is None: 

669 es_bulk_fields = ["_id", "_index", "_type"] 

670 for key in es_bulk_fields: 

671 if key == "_id": 

672 kwargs["idkey"] = idkey 

673 if key == "_index": 

674 kwargs["index"] = cls.index_name() 

675 if key == "_type": 

676 kwargs["type_"] = type 

677 data = cls.to_bulk_single_rec(record, **kwargs) 

678 else: 

679 data = json.dumps(record) + "\n" 

680 

681 out.write(data) 

682 if out_template is not None: 

683 count += 1 

684 if count > out_batch_sizes: 

685 out.close() 

686 if out_rollover_callback is not None: 

687 out_rollover_callback(current_file) 

688 

689 count = 0 

690 n += 1 

691 current_file = out_template + "." + str(n) 

692 filenames.append(current_file) 

693 out = open(current_file, "w") 

694 

695 if out_template is not None: 

696 out.close() 

697 if out_rollover_callback is not None: 

698 out_rollover_callback(current_file) 

699 

700 return filenames 

701 

702 @classmethod 

703 def prefix_query(cls, field, prefix, size=5, facet_field=None, analyzed_field=True): 

704 # example of a prefix query 

705 # { 

706 # "query": {"prefix" : { "bibjson.publisher" : "ope" } }, 

707 # "size": 0, 

708 # "facets" : { 

709 # "publisher" : { "terms" : {"field" : "bibjson.publisher.exact", "size": 5} } 

710 # } 

711 # } 

712 

713 suffix = app.config['FACET_FIELD'] 

714 query_field = field 

715 if analyzed_field: 

716 if field.endswith(suffix): 

717 # strip .exact (or whatever it's configured as) off the end 

718 query_field = field[:field.rfind(suffix)] 

719 else: 

720 if not field.endswith(suffix): 

721 query_field = field + suffix 

722 

723 # the actual terms should come from the .exact version of the 

724 # field - we are suggesting whole values, not fragments 

725 if facet_field is None: 

726 facet_field = query_field + suffix 

727 if not facet_field.endswith(suffix): 

728 facet_field = facet_field + suffix 

729 

730 q = PrefixAutocompleteQuery(query_field, prefix, field, facet_field, size) 

731 return cls.send_query(q.query()) 

732 

733 @classmethod 

734 def wildcard_autocomplete_query(cls, field, substring, before=True, after=True, facet_size=5, facet_field=None): 

735 """ 

736 Example of a wildcard query 

737 Works only on .exact fields 

738 

739 { 

740 "query" : { 

741 "wildcard" : {"bibjson.publisher.exact" : "De *"} 

742 }, 

743 "size" : 0, 

744 "facets" : { 

745 "bibjson.publisher.exact" : { 

746 "terms" : {"field" : "bibjson.publisher.exact", "size" : 5} 

747 } 

748 } 

749 } 

750 :param field: 

751 :param substring: 

752 :param facet_size: 

753 :return: 

754 """ 

755 # wildcard queries need to be on unanalyzed fields 

756 suffix = app.config['FACET_FIELD'] 

757 filter_field = field 

758 if not filter_field.endswith(suffix): 

759 filter_field = filter_field + suffix 

760 

761 # add the wildcard before/after 

762 if before: 

763 substring = "*" + substring 

764 if after: 

765 substring = substring + "*" 

766 

767 # sort out the facet field 

768 if facet_field is None: 

769 facet_field = filter_field 

770 if not facet_field.endswith(suffix): 

771 facet_field = facet_field + suffix 

772 

773 # build the query 

774 q = WildcardAutocompleteQuery(filter_field, substring, field, facet_field, facet_size) 

775 return cls.send_query(q.query()) 

776 

777 @classmethod 

778 def advanced_autocomplete(cls, filter_field, facet_field, substring, size=5, prefix_only=True): 

779 analyzed = True 

780 if " " in substring: 

781 analyzed = False 

782 

783 substring = substring.lower() 

784 

785 if " " in substring and not prefix_only: 

786 res = cls.wildcard_autocomplete_query(filter_field, substring, before=True, after=True, facet_size=size, facet_field=facet_field) 

787 else: 

788 res = cls.prefix_query(filter_field, substring, size=size, facet_field=facet_field, analyzed_field=analyzed) 

789 

790 result = [] 

791 for term in res['aggregations'][filter_field]['buckets']: 

792 # keep ordering - it's by count by default, so most frequent 

793 # terms will now go to the front of the result list 

794 result.append({"id": term['key'], "text": term['key']}) 

795 return result 

796 

797 @classmethod 

798 def autocomplete(cls, field, prefix, size=5): 

799 res = None 

800 # if there is a space in the prefix, the prefix query won't work, so we fall back to a wildcard 

801 # we only do this if we have to, because the wildcard query is a little expensive 

802 if " " in prefix: 

803 res = cls.wildcard_autocomplete_query(field, prefix, before=False, after=True, facet_size=size) 

804 else: 

805 prefix = prefix.lower() 

806 res = cls.prefix_query(field, prefix, size=size) 

807 

808 result = [] 

809 for term in res['aggregations'][field]['buckets']: 

810 # keep ordering - it's by count by default, so most frequent 

811 # terms will now go to the front of the result list 

812 result.append({"id": term['key'], "text": term['key']}) 

813 return result 

814 

815 @classmethod 

816 def q2obj(cls, **kwargs): 

817 extra_trace_info = '' 

818 if 'q' in kwargs: 

819 extra_trace_info = "\nQuery sent to ES (before manipulation in DomainObject.query):\n{}\n".format(json.dumps(kwargs['q'], indent=2)) 

820 

821 res = cls.query(**kwargs) 

822 rs = cls.handle_es_raw_response(res, wrap=True, extra_trace_info=extra_trace_info) 

823 results = [cls(**r) for r in rs] 

824 return results 

825 

826 @classmethod 

827 def all(cls, size=10000, **kwargs): 

828 """ This is a shortcut to a match_all query with a large size, to return all records """ 

829 # FIXME: is this only used in tests? ES now limits size so we can't guarantee ALL without using scroll / scan 

830 return cls.q2obj(size=size, **kwargs) 

831 

832 @classmethod 

833 def count(cls): 

834 res = ES.count(index=cls.index_name(), doc_type=cls.doc_type()) 

835 return res.get("count") 

836 # return requests.get(cls.target() + '_count').json()['count'] 

837 

838 @classmethod 

839 def hit_count(cls, query, **kwargs): 

840 countable_query = deepcopy(query) 

841 if "track_total_hits" not in countable_query: 

842 countable_query["track_total_hits"] = True 

843 

844 res = cls.query(q=countable_query, **kwargs) 

845 return res.get("hits", {}).get("total", {}).get("value", 0) 

846 

847 @classmethod 

848 def block(cls, id, last_updated=None, sleep=0.5, max_retry_seconds=30): 

849 if app.config.get("ES_BLOCK_WAIT_OVERRIDE") is not None: 

850 sleep = app.config["ES_BLOCK_WAIT_OVERRIDE"] 

851 

852 q = BlockQuery(id) 

853 start_time = datetime.now() 

854 while True: 

855 res = cls.query(q=q.query()) 

856 hits = res.get("hits", {}).get("hits", []) 

857 if len(hits) > 0: 

858 obj = hits[0].get("fields") 

859 if last_updated is not None: 

860 if "last_updated" in obj: 

861 lu = obj["last_updated"] 

862 if len(lu) > 0: 

863 threshold = datetime.strptime(last_updated, "%Y-%m-%dT%H:%M:%SZ") 

864 lud = datetime.strptime(lu[0], "%Y-%m-%dT%H:%M:%SZ") 

865 if lud >= threshold: 

866 return 

867 else: 

868 return 

869 else: 

870 if (datetime.now() - start_time).total_seconds() >= max_retry_seconds: 

871 raise BlockTimeOutException("Attempting to block until record with id {id} appears in Elasticsearch, but this has not happened after {limit}".format(id=id, limit=max_retry_seconds)) 

872 

873 time.sleep(sleep) 

874 

875 @classmethod 

876 def blockall(cls, ids_and_last_updateds, sleep=0.05, individual_max_retry_seconds=30): 

877 for id, lu in ids_and_last_updateds: 

878 cls.block(id, lu, sleep=sleep, max_retry_seconds=individual_max_retry_seconds) 

879 

880 @classmethod 

881 def blockdeleted(cls, id, sleep=0.5, max_retry_seconds=30): 

882 if app.config.get("ES_BLOCK_WAIT_OVERRIDE") is not None: 

883 sleep = app.config["ES_BLOCK_WAIT_OVERRIDE"] 

884 

885 q = BlockQuery(id) 

886 start_time = datetime.now() 

887 while True: 

888 res = cls.query(q=q.query()) 

889 hits = res.get("hits", {}).get("hits", []) 

890 if len(hits) == 0: 

891 return 

892 else: 

893 if (datetime.now() - start_time).total_seconds() >= max_retry_seconds: 

894 raise BlockTimeOutException( 

895 "Attempting to block until record with id {id} deleted from Elasticsearch, but this has not happened after {limit}".format( 

896 id=id, limit=max_retry_seconds)) 

897 

898 time.sleep(sleep) 

899 

900 @classmethod 

901 def blockalldeleted(cls, ids, sleep=0.05, individual_max_retry_seconds=30): 

902 for id in ids: 

903 cls.blockdeleted(id, sleep, individual_max_retry_seconds) 

904 

905 

906class BlockTimeOutException(Exception): 

907 pass 

908 

909 

910class DAOSaveExceptionMaxRetriesReached(Exception): 

911 pass 

912 

913 

914class ESResponseCannotBeInterpreted(Exception): 

915 pass 

916 

917 

918class ESMappingMissingError(Exception): 

919 pass 

920 

921 

922class ESError(Exception): 

923 pass 

924 

925######################################################################## 

926# Some useful ES queries 

927######################################################################## 

928 

929 

930class MatchAllQuery(object): 

931 def query(self): 

932 return { 

933 "track_total_hits" : True, 

934 "query": { 

935 "match_all": {} 

936 } 

937 } 

938 

939 

940class BlockQuery(object): 

941 def __init__(self, id): 

942 self._id = id 

943 

944 def query(self): 

945 return { 

946 "query": { 

947 "bool": { 

948 "must": [ 

949 {"term": {"id.exact": self._id}} 

950 ] 

951 } 

952 }, 

953 "_source" : False, 

954 "docvalue_fields": [ 

955 {"field": "last_updated", "format": "date_time_no_millis"} 

956 ] 

957 } 

958 

959 

960class PrefixAutocompleteQuery(object): 

961 def __init__(self, query_field, prefix, agg_name, agg_field, agg_size): 

962 self._query_field = query_field 

963 self._prefix = prefix 

964 self._agg_name = agg_name 

965 self._agg_field = agg_field 

966 self._agg_size = agg_size 

967 

968 def query(self): 

969 return { 

970 "track_total_hits": True, 

971 "query": {"prefix": {self._query_field: self._prefix.lower()}}, 

972 "size": 0, 

973 "aggs": { 

974 self._agg_name: {"terms": {"field": self._agg_field, "size": self._agg_size}} 

975 } 

976 } 

977 

978class WildcardAutocompleteQuery(object): 

979 def __init__(self, wildcard_field, wildcard_query, agg_name, agg_field, agg_size): 

980 self._wildcard_field = wildcard_field 

981 self._wildcard_query = wildcard_query 

982 self._agg_name = agg_name 

983 self._agg_field = agg_field 

984 self._agg_size = agg_size 

985 

986 def query(self): 

987 return { 

988 "track_total_hits": True, 

989 "query": { 

990 "wildcard": {self._wildcard_field: self._wildcard_query} 

991 }, 

992 "size": 0, 

993 "aggs": { 

994 self._agg_name: { 

995 "terms": {"field": self._agg_field, "size": self._agg_size} 

996 } 

997 } 

998 } 

999 

1000 

1001######################################################################### 

1002# A query handler that knows how to speak facetview2 

1003######################################################################### 

1004 

1005 

1006class Facetview2(object): 

1007 """ 

1008 ~~SearchURLGenerator:Feature->Elasticsearch:Technology~~ 

1009 """ 

1010 

1011 # Examples of queries 

1012 # {"query":{"filtered":{"filter":{"bool":{"must":[{"term":{"_type":"article"}}]}},"query":{"query_string":{"query":"richard","default_operator":"OR"}}}},"from":0,"size":10} 

1013 # {"query":{"query_string":{"query":"richard","default_operator":"OR"}},"from":0,"size":10} 

1014 

1015 @staticmethod 

1016 def make_term_filter(term, value): 

1017 return {"term": {term: value}} 

1018 

1019 @staticmethod 

1020 def make_query(query_string=None, filters=None, default_operator="OR", sort_parameter=None, sort_order="asc", default_field=None): 

1021 query_part = {"match_all": {}} 

1022 if query_string is not None: 

1023 query_part = {"query_string": {"query": query_string, "default_operator": default_operator}} 

1024 if default_field is not None: 

1025 query_part["query_string"]["default_field"] = default_field 

1026 query = {"query": query_part} 

1027 

1028 if filters is not None: 

1029 if not isinstance(filters, list): 

1030 filters = [filters] 

1031 filters.append(query_part) 

1032 bool_part = {"bool": {"must": filters}} 

1033 query = {"query": query_part} 

1034 

1035 if sort_parameter is not None: 

1036 # For facetview we can only have one sort parameter, but ES actually supports lists 

1037 sort_part = [{sort_parameter: {"order": sort_order}}] 

1038 query["sort"] = sort_part 

1039 

1040 return query 

1041 

1042 @staticmethod 

1043 def url_encode_query(query): 

1044 return urllib.parse.quote(json.dumps(query).replace(' ', ''))