Coverage for portality/tasks/ingestarticles.py: 77%

408 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-11-09 16:22 +0000

1import re 

2 

3from portality import models 

4from portality.core import app 

5from portality.crosswalks.exceptions import CrosswalkException 

6 

7from portality.tasks.redis_huey import main_queue, configure 

8from portality.decorators import write_required 

9 

10from portality.background import BackgroundTask, BackgroundApi, BackgroundException, RetryException 

11from portality.bll.exceptions import IngestException, DuplicateArticleException, ArticleNotAcceptable 

12from portality.bll import DOAJ 

13 

14from portality.lib import plugin 

15 

16import ftplib, os, requests, traceback, shutil 

17from urllib.parse import urlparse 

18 

19DEFAULT_MAX_REMOTE_SIZE = 262144000 

20CHUNK_SIZE = 1048576 

21 

22 

23def file_failed(path): 

24 filename = os.path.split(path)[1] 

25 fad = app.config.get("FAILED_ARTICLE_DIR") 

26 dest = os.path.join(fad, filename) 

27 shutil.move(path, dest) 

28 

29 

30def ftp_upload(job, path, parsed_url, file_upload): 

31 # 1. find out the file size 

32 # (in the meantime the size command will return an error if 

33 # the file does not exist) 

34 # 2. if not too big, download it 

35 size_limit = app.config.get("MAX_REMOTE_SIZE", DEFAULT_MAX_REMOTE_SIZE) 

36 

37 too_large = [False] # you CANNOT override (rebind) vars inside nested funcs in python 2 

38 # and we can't pass too_large and downloaded as args to 

39 # ftp_callback either since we don't control what args it gets 

40 # hence, widely adopted ugly hack - since you can READ nonlocal 

41 # vars from nested funcs, we use a mutable container! 

42 downloaded = [0] 

43 

44 def ftp_callback(chunk): 

45 """Callback for processing downloaded chunks of the FTP file""" 

46 if too_large[0]: 

47 return 

48 if type(chunk) == bytes: 

49 lc = len(chunk) 

50 else: 

51 lc = len(bytes(chunk, "utf-8")) 

52 downloaded[0] += lc 

53 if downloaded[0] > size_limit: 

54 too_large[0] = True 

55 return 

56 

57 if chunk: 

58 with open(path, 'a') as o: 

59 if type(chunk) == bytes: #TODO: why chunk are different types? check test #20 

60 o.write(chunk.decode("utf-8")) 

61 else: 

62 o.write(chunk) 

63 o.flush() 

64 

65 try: 

66 f = ftplib.FTP(parsed_url.hostname, parsed_url.username, parsed_url.password) 

67 

68 r = f.sendcmd('TYPE I') # SIZE is not usually allowed in ASCII mode, so set to binary mode 

69 if not r.startswith('2'): 

70 job.add_audit_message('could not set binary mode in target FTP server while checking file exists') 

71 file_upload.failed("Unable to download file from FTP site") 

72 return False 

73 

74 file_size = f.size(parsed_url.path) 

75 if file_size < 0: 

76 # this will either raise an error which will get caught below 

77 # or, very rarely, will return an invalid size 

78 job.add_audit_message('invalid file size: ' + str(file_size)) 

79 file_upload.failed("Unable to download file from FTP site") 

80 return False 

81 

82 if file_size > size_limit: 

83 file_upload.failed("The file at the URL was too large") 

84 job.add_audit_message("too large") 

85 return False 

86 

87 f.close() 

88 

89 f = ftplib.FTP(parsed_url.hostname, parsed_url.username, parsed_url.password) 

90 c = f.retrbinary('RETR ' + parsed_url.path, ftp_callback, CHUNK_SIZE) 

91 if too_large[0]: 

92 file_upload.failed("The file at the URL was too large") 

93 job.add_audit_message("too large") 

94 try: 

95 os.remove(path) # don't keep this file around 

96 except: 

97 pass 

98 return False 

99 

100 if c.startswith('226'): 

101 file_upload.downloaded() 

102 return True 

103 

104 msg = 'Bad code returned by FTP server for the file transfer: "{0}"'.format(c) 

105 job.add_audit_message(msg) 

106 file_upload.failed(msg) 

107 return False 

108 

109 except Exception as e: 

110 job.add_audit_message('error during FTP file download: ' + str(e.args)) 

111 file_upload.failed("Unable to download file from FTP site") 

112 return False 

113 

114 

115def http_upload(job, path, file_upload): 

116 try: 

117 r = requests.get(file_upload.filename, stream=True) 

118 if r.status_code != requests.codes.ok: 

119 job.add_audit_message("The URL could not be accessed. Status: {0}, Content: {1}".format(r.status_code, r.text)) 

120 file_upload.failed("The URL could not be accessed") 

121 return False 

122 

123 # check the content length 

124 size_limit = app.config.get("MAX_REMOTE_SIZE", DEFAULT_MAX_REMOTE_SIZE) 

125 cl = r.headers.get("content-length") 

126 try: 

127 cl = int(cl) 

128 except: 

129 cl = 0 

130 

131 if cl > size_limit: 

132 file_upload.failed("The file at the URL was too large") 

133 job.add_audit_message("too large") 

134 return False 

135 

136 too_large = False 

137 with open(path, 'w') as f: 

138 downloaded = 0 

139 for chunk in r.iter_content(chunk_size=CHUNK_SIZE): # 1Mb chunks 

140 if type(chunk) == bytes: #TODO: chunk here always should be the same type, ,refer to ingestarticle tests #15 and 17 

141 downloaded += len(chunk) 

142 else: downloaded += len(bytes(chunk, "utf-8")) 

143 # check the size limit again 

144 if downloaded > size_limit: 

145 file_upload.failed("The file at the URL was too large") 

146 job.add_audit_message("too large") 

147 too_large = True 

148 break 

149 if chunk: # filter out keep-alive new chunks 

150 if type(chunk) == bytes: 

151 f.write(chunk.decode("utf-8")) 

152 else: 

153 f.write(chunk) 

154 f.flush() 

155 except: 

156 job.add_audit_message("The URL could not be accessed") 

157 file_upload.failed("The URL could not be accessed") 

158 return False 

159 

160 if too_large: 

161 try: 

162 os.remove(path) # don't keep this file around 

163 except: 

164 pass 

165 return False 

166 

167 file_upload.downloaded() 

168 return True 

169 

170 

171class IngestArticlesBackgroundTask(BackgroundTask): 

172 

173 __action__ = "ingest_articles" 

174 

175 def run(self): 

176 """ 

177 Execute the task as specified by the background_jon 

178 :return: 

179 """ 

180 job = self.background_job 

181 params = job.params 

182 

183 if params is None: 

184 raise BackgroundException("IngestArticleBackgroundTask.run run without sufficient parameters") 

185 

186 file_upload_id = self.get_param(params, "file_upload_id") 

187 if file_upload_id is None: 

188 raise BackgroundException("IngestArticleBackgroundTask.run run without sufficient parameters") 

189 

190 file_upload = models.FileUpload.pull(file_upload_id) 

191 if file_upload is None: 

192 raise BackgroundException("IngestArticleBackgroundTask.run unable to find file upload with id {x}".format(x=file_upload_id)) 

193 

194 try: 

195 # if the file "exists", this means its a remote file that needs to be downloaded, so do that 

196 # if it's in "failed" state already but filename has a scheme, this is a retry of a download 

197 if file_upload.status == "exists" or (file_upload.status == "failed" and re.match(r'^(ht|f)tps?://', file_upload.filename)): 

198 job.add_audit_message("Downloading file for file upload {x}, job {y}".format(x=file_upload_id, y=job.id)) 

199 if self._download(file_upload) is False: 

200 # TODO: add 'outcome' error here 

201 job.add_audit_message("File download failed".format(x=file_upload_id, y=job.id)) 

202 

203 # if the file is validated, which will happen if it has been uploaded, or downloaded successfully, process it. 

204 if file_upload.status == "validated": 

205 job.add_audit_message("Importing file for file upload {x}, job {y}".format(x=file_upload_id, y=job.id)) 

206 self._process(file_upload) 

207 finally: 

208 file_upload.save() 

209 

210 def _download(self, file_upload): 

211 job = self.background_job 

212 upload_dir = app.config.get("UPLOAD_DIR") 

213 path = os.path.join(upload_dir, file_upload.local_filename) 

214 

215 # first, determine if ftp or http 

216 parsed_url = urlparse(file_upload.filename) 

217 if parsed_url.scheme == 'ftp': 

218 if not ftp_upload(job, path, parsed_url, file_upload): 

219 return False 

220 elif parsed_url.scheme in ['http', "https"]: 

221 if not http_upload(job, path, file_upload): 

222 return False 

223 else: 

224 msg = "We only support HTTP(s) and FTP uploads by URL. This is a: {x}".format(x=parsed_url.scheme) 

225 job.add_audit_message(msg) 

226 file_upload.failed(msg) 

227 return False 

228 

229 job.add_audit_message("Downloaded {x} as {y}".format(x=file_upload.filename, y=file_upload.local_filename)) 

230 

231 xwalk_name = app.config.get("ARTICLE_CROSSWALKS", {}).get(file_upload.schema) 

232 try: 

233 xwalk = plugin.load_class(xwalk_name)() 

234 except IngestException: 

235 raise RetryException("Unable to load schema {}".format(xwalk_name)) 

236 

237 # now we have the record in the index and on disk, we can attempt to 

238 # validate it 

239 try: 

240 with open(path) as handle: 

241 xwalk.validate_file(handle) 

242 except IngestException as e: 

243 job.add_audit_message("IngestException: {x}".format(x=e.trace())) 

244 file_upload.failed(e.message, e.inner_message) 

245 try: 

246 file_failed(path) 

247 except: 

248 job.add_audit_message("Error cleaning up file which caused IngestException: {x}".format(x=traceback.format_exc())) 

249 return False 

250 except Exception as e: 

251 job.add_audit_message("File system error while downloading file: {x}".format(x=traceback.format_exc())) 

252 file_upload.failed("File system error when downloading file") 

253 try: 

254 file_failed(path) 

255 except: 

256 job.add_audit_message("Error cleaning up file which caused Exception: {x}".format(x=traceback.format_exc())) 

257 return False 

258 

259 # if we get to here then we have a successfully downloaded and validated 

260 # document, so we can write it to the index 

261 job.add_audit_message("Validated file as schema {x}".format(x=file_upload.schema)) 

262 file_upload.validated(file_upload.schema) 

263 return True 

264 

265 def _process(self, file_upload): 

266 job = self.background_job 

267 upload_dir = app.config.get("UPLOAD_DIR") 

268 path = os.path.join(upload_dir, file_upload.local_filename) 

269 

270 if not os.path.exists(path): 

271 job.add_audit_message("File not found at path {} . Retrying job later.".format(path)) 

272 count = self.get_param(job.params, "attempts") 

273 retry_limit = app.config.get("HUEY_TASKS", {}).get("ingest_articles", {}).get("retries", 0) 

274 self.set_param(job.params, "attempts", count + 1) 

275 

276 if retry_limit <= count: 

277 job.add_audit_message("File still not found at path {} . Giving up.".format(path)) 

278 job.fail() 

279 

280 raise RetryException() 

281 

282 job.add_audit_message("Importing from {x}".format(x=path)) 

283 

284 articleService = DOAJ.articleService() 

285 account = models.Account.pull(file_upload.owner) 

286 

287 xwalk_name = app.config.get("ARTICLE_CROSSWALKS", {}).get(file_upload.schema) 

288 try: 

289 xwalk = plugin.load_class(xwalk_name)() 

290 except IngestException: 

291 raise RetryException("Unable to load schema {}".format(xwalk_name)) 

292 

293 ingest_exception = False 

294 result = {} 

295 articles = None 

296 try: 

297 with open(path) as handle: 

298 articles = xwalk.crosswalk_file(handle, add_journal_info=False) # don't import the journal info, as we haven't validated ownership of the ISSNs in the article yet 

299 for article in articles: 

300 article.set_upload_id(file_upload.id) 

301 result = articleService.batch_create_articles(articles, account, add_journal_info=True) 

302 except (IngestException, CrosswalkException) as e: 

303 job.add_audit_message("IngestException: {msg}. Inner message: {inner}. Stack: {x}".format(msg=e.message, inner=e.inner_message, x=e.trace())) 

304 file_upload.failed(e.message, e.inner_message) 

305 result = e.result 

306 try: 

307 file_failed(path) 

308 ingest_exception = True 

309 except: 

310 job.add_audit_message("Error cleaning up file which caused IngestException: {x}".format(x=traceback.format_exc())) 

311 except (DuplicateArticleException, ArticleNotAcceptable) as e: 

312 job.add_audit_message("One or more articles did not contain either a DOI or a Fulltext URL") 

313 file_upload.failed("One or more articles did not contain either a DOI or a Fulltext URL") 

314 try: 

315 file_failed(path) 

316 except: 

317 job.add_audit_message("Error cleaning up file which caused Exception: {x}".format(x=traceback.format_exc())) 

318 return 

319 except Exception as e: 

320 job.add_audit_message("Unanticipated error: {x}".format(x=traceback.format_exc())) 

321 file_upload.failed("Unanticipated error when importing articles") 

322 try: 

323 file_failed(path) 

324 except: 

325 job.add_audit_message("Error cleaning up file which caused Exception: {x}".format(x=traceback.format_exc())) 

326 return 

327 

328 success = result.get("success", 0) 

329 fail = result.get("fail", 0) 

330 update = result.get("update", 0) 

331 new = result.get("new", 0) 

332 shared = result.get("shared", []) 

333 unowned = result.get("unowned", []) 

334 unmatched = result.get("unmatched", []) 

335 

336 if success == 0 and fail > 0 and not ingest_exception: 

337 file_upload.failed("All articles in file failed to import") 

338 job.add_audit_message("All articles in file failed to import") 

339 if success > 0 and fail == 0: 

340 file_upload.processed(success, update, new) 

341 if success > 0 and fail > 0: 

342 file_upload.partial(success, fail, update, new) 

343 job.add_audit_message("Some articles in file failed to import correctly, so no articles imported") 

344 

345 file_upload.set_failure_reasons(list(shared), list(unowned), list(unmatched)) 

346 job.add_audit_message("Shared ISSNs: " + ", ".join(list(shared))) 

347 job.add_audit_message("Unowned ISSNs: " + ", ".join(list(unowned))) 

348 job.add_audit_message("Unmatched ISSNs: " + ", ".join(list(unmatched))) 

349 

350 if new: 

351 ids = [a.id for a in articles] 

352 job.add_audit_message("Created/updated articles: " + ", ".join(list(ids))) 

353 

354 if not ingest_exception: 

355 try: 

356 os.remove(path) # just remove the file, no need to keep it 

357 except Exception as e: 

358 job.add_audit_message(u"Error while deleting file {x}: {y}".format(x=path, y=str(e))) 

359 

360 def cleanup(self): 

361 """ 

362 Cleanup after a successful OR failed run of the task 

363 :return: 

364 """ 

365 job = self.background_job 

366 params = job.params 

367 

368 @classmethod 

369 def prepare(cls, username, **kwargs): 

370 """ 

371 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob, 

372 or fail with a suitable exception 

373 

374 :param kwargs: arbitrary keyword arguments pertaining to this task type 

375 :return: a BackgroundJob instance representing this task 

376 """ 

377 

378 upload_dir = app.config.get("UPLOAD_DIR") 

379 if upload_dir is None: 

380 raise BackgroundException("UPLOAD_DIR is not set in configuration") 

381 

382 f = kwargs.get("upload_file") 

383 schema = kwargs.get("schema") 

384 url = kwargs.get("url") 

385 previous = kwargs.get("previous", []) 

386 

387 if f is None and url is None: 

388 raise BackgroundException("You must specify one of 'upload_file' or 'url' as keyword arguments") 

389 if schema is None: 

390 raise BackgroundException("You must specify 'schema' in the keyword arguments") 

391 

392 file_upload_id = None 

393 if f is not None and f.filename != "": 

394 file_upload_id = cls._file_upload(username, f, schema, previous) 

395 elif url is not None and url != "": 

396 file_upload_id = cls._url_upload(username, url, schema, previous) 

397 

398 if file_upload_id is None: 

399 raise BackgroundException("No file upload record was created") 

400 

401 # first prepare a job record 

402 job = models.BackgroundJob() 

403 job.user = username 

404 job.action = cls.__action__ 

405 

406 params = {} 

407 cls.set_param(params, "file_upload_id", file_upload_id) 

408 cls.set_param(params, "attempts", 0) 

409 job.params = params 

410 

411 return job 

412 

413 @classmethod 

414 def submit(cls, background_job): 

415 """ 

416 Submit the specified BackgroundJob to the background queue 

417 

418 :param background_job: the BackgroundJob instance 

419 :return: 

420 """ 

421 background_job.save(blocking=True) 

422 ingest_articles.schedule(args=(background_job.id,), delay=10) 

423 

424 @classmethod 

425 def _file_upload(cls, username, f, schema, previous): 

426 # prep a record to go into the index, to record this upload 

427 record = models.FileUpload() 

428 record.upload(username, f.filename) 

429 record.set_id() 

430 

431 # the file path that we are going to write to 

432 xml = os.path.join(app.config.get("UPLOAD_DIR", "."), record.local_filename) 

433 

434 # it's critical here that no errors cause files to get left behind unrecorded 

435 try: 

436 # write the incoming file out to the XML file 

437 f.save(xml) 

438 

439 # save the index entry 

440 record.save() 

441 except: 

442 # if we can't record either of these things, we need to back right off 

443 try: 

444 file_failed(xml) 

445 except: 

446 pass 

447 try: 

448 record.delete() 

449 except: 

450 pass 

451 

452 raise BackgroundException("Failed to upload file - please contact an administrator") 

453 

454 xwalk_name = app.config.get("ARTICLE_CROSSWALKS", {}).get(schema) 

455 try: 

456 xwalk = plugin.load_class(xwalk_name)() 

457 except IngestException: 

458 raise RetryException(u"Unable to load schema {}".format(xwalk_name)) 

459 

460 # now we have the record in the index and on disk, we can attempt to 

461 # validate it 

462 try: 

463 with open(xml) as handle: 

464 xwalk.validate_file(handle) 

465 record.validated(schema) 

466 record.save() 

467 previous.insert(0, record) 

468 return record.id 

469 

470 except IngestException as e: 

471 record.failed(e.message, e.inner_message) 

472 try: 

473 file_failed(xml) 

474 except: 

475 pass 

476 record.save() 

477 previous.insert(0, record) 

478 raise BackgroundException("Failed to upload file: " + e.message + "; " + str(e.inner_message)) 

479 except Exception as e: 

480 record.failed("File system error when reading file") 

481 try: 

482 file_failed(xml) 

483 except: 

484 pass 

485 record.save() 

486 previous.insert(0, record) 

487 raise BackgroundException("Failed to upload file - please contact an administrator") 

488 

489 @classmethod 

490 def _url_upload(cls, username, url, schema, previous): 

491 # first define a few functions 

492 def __http_upload(record, previous, url): 

493 # first thing to try is a head request, supporting redirects 

494 head = requests.head(url, allow_redirects=True) 

495 if head.status_code == requests.codes.ok: 

496 return __ok(record, previous) 

497 

498 # if we get to here, the head request failed. This might be because the file 

499 # isn't there, but it might also be that the server doesn't support HEAD (a lot 

500 # of webapps [including this one] don't implement it) 

501 # 

502 # so we do an interruptable get request instead, so we don't download too much 

503 # unnecessary content 

504 get = requests.get(url, stream=True) 

505 get.close() 

506 if get.status_code == requests.codes.ok: 

507 return __ok(record, previous) 

508 return __fail(record, previous, error='error while checking submitted file reference: {0}'.format(get.status_code)) 

509 

510 def __ftp_upload(record, previous, parsed_url): 

511 # 1. find out whether the file exists 

512 # 2. that's it, return OK 

513 

514 # We might as well check if the file exists using the SIZE command. 

515 # If the FTP server does not support SIZE, our article ingestion 

516 # script is going to refuse to process the file anyway, so might as 

517 # well get a failure now. 

518 # Also it's more of a faff to check file existence using LIST commands. 

519 try: 

520 f = ftplib.FTP(parsed_url.hostname, parsed_url.username, parsed_url.password) 

521 r = f.sendcmd('TYPE I') # SIZE is not usually allowed in ASCII mode, so set to binary mode 

522 if not r.startswith('2'): 

523 return __fail(record, previous, error='could not set binary ' 

524 'mode in target FTP server while checking file exists') 

525 if f.size(parsed_url.path) < 0: 

526 # this will either raise an error which will get caught below 

527 # or, very rarely, will return an invalid size 

528 return __fail(record, previous, error='file does not seem to exist on FTP server') 

529 

530 except Exception as e: 

531 return __fail(record, previous, error='error during FTP file existence check: ' + str(e.args)) 

532 

533 return __ok(record, previous) 

534 

535 def __ok(record, previous): 

536 record.exists() 

537 record.save() 

538 previous.insert(0, record) 

539 return record.id 

540 

541 def __fail(record, previous, error): 

542 message = 'The URL could not be accessed; ' + error 

543 record.failed(message) 

544 record.save() 

545 previous.insert(0, record) 

546 raise BackgroundException(message) 

547 

548 # prep a record to go into the index, to record this upload. The filename is the url 

549 record = models.FileUpload() 

550 record.upload(username, url) 

551 record.set_id() 

552 record.set_schema(schema) # although it could be wrong, this will get checked later 

553 

554 # now we attempt to verify that the file is retrievable 

555 try: 

556 # first, determine if ftp or http 

557 parsed_url = urlparse(url) 

558 if parsed_url.scheme in ['http', "https"]: 

559 return __http_upload(record, previous, url) 

560 elif parsed_url.scheme == 'ftp': 

561 return __ftp_upload(record, previous, parsed_url) 

562 else: 

563 return __fail(record, previous, error='unsupported URL scheme "{0}". Only HTTP(s) and FTP are supported.'.format(parsed_url.scheme)) 

564 except BackgroundException as e: 

565 raise 

566 except Exception as e: 

567 return __fail(record, previous, error="please check it before submitting again; " + str(e)) 

568 

569 

570@main_queue.task(**configure("ingest_articles")) 

571@write_required(script=True) 

572def ingest_articles(job_id): 

573 job = models.BackgroundJob.pull(job_id) 

574 task = IngestArticlesBackgroundTask(job) 

575 BackgroundApi.execute(task)