Coverage for portality / tasks / ingestarticles.py: 78%

342 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 00:09 +0100

1import ftplib 

2import os 

3import re 

4import traceback 

5from urllib.parse import urlparse 

6 

7import requests 

8 

9from portality import models 

10from portality.background import BackgroundTask, BackgroundApi, BackgroundException 

11from huey.exceptions import RetryTask 

12from portality.bll.exceptions import IngestException 

13from portality.core import app 

14from portality.lib import plugin 

15from portality.tasks.helpers import background_helper, articles_upload_helper 

16from portality.tasks.redis_huey import events_queue as queue 

17from portality.ui.messages import Messages 

18 

19DEFAULT_MAX_REMOTE_SIZE = 262144000 

20CHUNK_SIZE = 1048576 

21 

22 

23def load_xwalk(schema): 

24 xwalk_name = app.config.get("ARTICLE_CROSSWALKS", {}).get(schema) 

25 try: 

26 return plugin.load_class(xwalk_name)() 

27 except IngestException: 

28 raise RetryTask("Unable to load schema {}".format(xwalk_name)) 

29 

30 

31def ftp_upload(job, path, parsed_url, file_upload): 

32 # 1. find out the file size 

33 # (in the meantime the size command will return an error if 

34 # the file does not exist) 

35 # 2. if not too big, download it 

36 size_limit = app.config.get("MAX_REMOTE_SIZE", DEFAULT_MAX_REMOTE_SIZE) 

37 

38 """ 

39 you CANNOT override (rebind) vars inside nested funcs in python 2 

40 and we can't pass too_large and downloaded as args to 

41 ftp_callback either since we don't control what args it gets 

42 hence, widely adopted ugly hack - since you can READ nonlocal 

43 vars from nested funcs, we use a mutable container! 

44 """ 

45 too_large = [False] 

46 downloaded = [0] 

47 

48 def ftp_callback(chunk): 

49 """Callback for processing downloaded chunks of the FTP file""" 

50 if too_large[0]: 

51 return 

52 if type(chunk) == bytes: 

53 lc = len(chunk) 

54 else: 

55 lc = len(bytes(chunk, "utf-8")) 

56 downloaded[0] += lc 

57 if downloaded[0] > size_limit: 

58 too_large[0] = True 

59 return 

60 

61 if chunk: 

62 with open(path, 'a') as o: 

63 if type(chunk) == bytes: # TODO: why chunk are different types? check test #20 

64 o.write(chunk.decode("utf-8")) 

65 else: 

66 o.write(chunk) 

67 o.flush() 

68 

69 try: 

70 f = ftplib.FTP(parsed_url.hostname, parsed_url.username, parsed_url.password) 

71 

72 r = f.sendcmd('TYPE I') # SIZE is not usually allowed in ASCII mode, so set to binary mode 

73 if not r.startswith('2'): 

74 job.add_audit_message('could not set binary mode in target FTP server while checking file exists') 

75 file_upload.failed("Unable to download file from FTP site") 

76 return False 

77 

78 file_size = f.size(parsed_url.path) 

79 if file_size < 0: 

80 # this will either raise an error which will get caught below 

81 # or, very rarely, will return an invalid size 

82 job.add_audit_message('invalid file size: ' + str(file_size)) 

83 file_upload.failed("Unable to download file from FTP site") 

84 return False 

85 

86 if file_size > size_limit: 

87 file_upload.failed("The file at the URL was too large") 

88 job.add_audit_message("too large") 

89 return False 

90 

91 f.close() 

92 

93 f = ftplib.FTP(parsed_url.hostname, parsed_url.username, parsed_url.password) 

94 c = f.retrbinary('RETR ' + parsed_url.path, ftp_callback, CHUNK_SIZE) 

95 if too_large[0]: 

96 file_upload.failed("The file at the URL was too large") 

97 job.add_audit_message("too large") 

98 try: 

99 os.remove(path) # don't keep this file around 

100 except: 

101 pass 

102 return False 

103 

104 if c.startswith('226'): 

105 file_upload.downloaded() 

106 return True 

107 

108 msg = 'Bad code returned by FTP server for the file transfer: "{0}"'.format(c) 

109 job.add_audit_message(msg) 

110 file_upload.failed(msg) 

111 return False 

112 

113 except Exception as e: 

114 job.add_audit_message('error during FTP file download: ' + str(e.args)) 

115 file_upload.failed("Unable to download file from FTP site") 

116 return False 

117 

118 

119def http_upload(job, path, file_upload): 

120 try: 

121 r = requests.get(file_upload.filename, stream=True) 

122 if r.status_code != requests.codes.ok: 

123 job.add_audit_message("The URL could not be accessed. Status: {0}, Content: {1}" 

124 .format(r.status_code, r.text)) 

125 file_upload.failed("The URL could not be accessed") 

126 return False 

127 

128 # check the content length 

129 size_limit = app.config.get("MAX_REMOTE_SIZE", DEFAULT_MAX_REMOTE_SIZE) 

130 cl = r.headers.get("content-length") 

131 try: 

132 cl = int(cl) 

133 except: 

134 cl = 0 

135 

136 if cl > size_limit: 

137 file_upload.failed("The file at the URL was too large") 

138 job.add_audit_message("too large") 

139 return False 

140 

141 too_large = False 

142 with open(path, 'w') as f: 

143 downloaded = 0 

144 for chunk in r.iter_content(chunk_size=CHUNK_SIZE): # 1Mb chunks 

145 if type(chunk) == bytes: # TODO: chunk here always should be the same type, ,refer to ingestarticle tests #15 and 17 

146 downloaded += len(chunk) 

147 else: 

148 downloaded += len(bytes(chunk, "utf-8")) 

149 # check the size limit again 

150 if downloaded > size_limit: 

151 file_upload.failed("The file at the URL was too large") 

152 job.add_audit_message("too large") 

153 too_large = True 

154 break 

155 if chunk: # filter out keep-alive new chunks 

156 if type(chunk) == bytes: 

157 f.write(chunk.decode("utf-8")) 

158 else: 

159 f.write(chunk) 

160 f.flush() 

161 except: 

162 job.add_audit_message("The URL could not be accessed") 

163 file_upload.failed("The URL could not be accessed") 

164 return False 

165 

166 if too_large: 

167 try: 

168 os.remove(path) # don't keep this file around 

169 except: 

170 pass 

171 return False 

172 

173 file_upload.downloaded() 

174 return True 

175 

176 

177class IngestArticlesBackgroundTask(BackgroundTask): 

178 __action__ = "ingest_articles" 

179 

180 def run(self): 

181 """ 

182 Execute the task as specified by the background_jon 

183 :return: 

184 """ 

185 job = self.background_job 

186 params = job.params 

187 

188 if params is None: 

189 raise BackgroundException("IngestArticleBackgroundTask.run run without sufficient parameters") 

190 

191 file_upload_id = self.get_param(params, "file_upload_id") 

192 if file_upload_id is None: 

193 raise BackgroundException("IngestArticleBackgroundTask.run run without sufficient parameters") 

194 

195 file_upload = models.FileUpload.pull(file_upload_id) 

196 if file_upload is None: 

197 raise BackgroundException("IngestArticleBackgroundTask.run unable to find file upload with id {x}" 

198 .format(x=file_upload_id)) 

199 

200 try: 

201 # if the file "exists", this means its a remote file that needs to be downloaded, so do that 

202 # if it's in "failed" state already but filename has a scheme, this is a retry of a download 

203 if file_upload.status == "exists" or (file_upload.status == "failed" 

204 and re.match(r'^(ht|f)tps?://', file_upload.filename)): 

205 job.add_audit_message("Downloading file for file upload {x}, job {y}" 

206 .format(x=file_upload_id, y=job.id)) 

207 if self._download(file_upload) is False: 

208 # TODO: add 'outcome' error here 

209 job.add_audit_message("File download failed".format(x=file_upload_id, y=job.id)) 

210 job.outcome_fail() 

211 

212 # if the file is validated, which will happen if it has been uploaded, or downloaded successfully, process it. 

213 if file_upload.status == "validated": 

214 job.add_audit_message("Importing file for file upload {x}, job {y}".format(x=file_upload_id, y=job.id)) 

215 self._process(file_upload) 

216 finally: 

217 file_upload.save() 

218 

219 def _download(self, file_upload: models.FileUpload) -> bool: 

220 job = self.background_job 

221 upload_dir = app.config.get("UPLOAD_DIR") 

222 path = os.path.join(upload_dir, file_upload.local_filename) 

223 

224 # first, determine if ftp or http 

225 parsed_url = urlparse(file_upload.filename) 

226 if parsed_url.scheme == 'ftp': 

227 if not ftp_upload(job, path, parsed_url, file_upload): 

228 return False 

229 elif parsed_url.scheme in ['http', "https"]: 

230 if not http_upload(job, path, file_upload): 

231 return False 

232 else: 

233 msg = "We only support HTTP(s) and FTP uploads by URL. This is a: {x}".format(x=parsed_url.scheme) 

234 job.add_audit_message(msg) 

235 file_upload.failed(msg) 

236 return False 

237 

238 job.add_audit_message("Downloaded {x} as {y}".format(x=file_upload.filename, y=file_upload.local_filename)) 

239 

240 xwalk = load_xwalk(file_upload.schema) 

241 

242 # now we have the record in the index and on disk, we can attempt to 

243 # validate it 

244 try: 

245 with open(path) as handle: 

246 xwalk.validate_file(handle) 

247 except IngestException as e: 

248 job.add_audit_message("IngestException: {x}".format(x=e.trace())) 

249 file_upload.failed(e.message, e.inner_message) 

250 try: 

251 articles_upload_helper.file_failed(path) 

252 except: 

253 job.add_audit_message("Error cleaning up file which caused IngestException: {x}" 

254 .format(x=traceback.format_exc())) 

255 return False 

256 except Exception as e: 

257 job.add_audit_message("File system error while downloading file: {x}" 

258 .format(x=traceback.format_exc())) 

259 file_upload.failed("File system error when downloading file") 

260 try: 

261 articles_upload_helper.file_failed(path) 

262 except: 

263 job.add_audit_message("Error cleaning up file which caused Exception: {x}" 

264 .format(x=traceback.format_exc())) 

265 return False 

266 

267 # if we get to here then we have a successfully downloaded and validated 

268 # document, so we can write it to the index 

269 job.add_audit_message("Validated file as schema {x}".format(x=file_upload.schema)) 

270 file_upload.validated(file_upload.schema) 

271 return True 

272 

273 def _process(self, file_upload: models.FileUpload): 

274 job = self.background_job 

275 upload_dir = app.config.get("UPLOAD_DIR") 

276 path = os.path.join(upload_dir, file_upload.local_filename) 

277 

278 if not os.path.exists(path): 

279 job.add_audit_message("File not found at path {} . Retrying job later.".format(path)) 

280 count = self.get_param(job.params, "attempts") 

281 retry_limit = app.config.get("HUEY_TASKS", {}).get("ingest_articles", {}).get("retries", 0) 

282 self.set_param(job.params, "attempts", count + 1) 

283 

284 if retry_limit <= count: 

285 job.add_audit_message("File still not found at path {} . Giving up.".format(path)) 

286 job.fail() 

287 else: 

288 raise RetryTask() 

289 

290 job.add_audit_message("Importing from {x}".format(x=path)) 

291 

292 xwalk = load_xwalk(file_upload.schema) 

293 

294 def _articles_factory(p): 

295 with open(p) as f: 

296 # don't import the journal info, as we haven't validated ownership of the ISSNs in the article yet 

297 articles = xwalk.crosswalk_file(f, add_journal_info=False) 

298 for article in articles: 

299 article.set_upload_id(file_upload.id) 

300 return articles 

301 

302 articles_upload_helper.upload_process(file_upload, job, path, _articles_factory) 

303 

304 def cleanup(self): 

305 """ 

306 Cleanup after a successful OR failed run of the task 

307 :return: 

308 """ 

309 job = self.background_job 

310 params = job.params 

311 

312 @classmethod 

313 def prepare(cls, username, **kwargs): 

314 """ 

315 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob, 

316 or fail with a suitable exception 

317 

318 :param kwargs: arbitrary keyword arguments pertaining to this task type 

319 :return: a BackgroundJob instance representing this task 

320 """ 

321 

322 upload_dir = app.config.get("UPLOAD_DIR") 

323 if upload_dir is None: 

324 raise BackgroundException("UPLOAD_DIR is not set in configuration") 

325 

326 f = kwargs.get("upload_file") 

327 schema = kwargs.get("schema") 

328 url = kwargs.get("url") 

329 previous = kwargs.get("previous", []) 

330 

331 if f is None and url is None: 

332 raise BackgroundException("You must specify one of 'upload_file' or 'url' as keyword arguments") 

333 if schema is None: 

334 raise BackgroundException("You must specify 'schema' in the keyword arguments") 

335 

336 file_upload_id = None 

337 if f is not None and f.filename != "": 

338 file_upload_id = cls._file_upload(username, f, schema, previous) 

339 elif url is not None and url != "": 

340 file_upload_id = cls._url_upload(username, url, schema, previous) 

341 

342 if file_upload_id is None: 

343 raise BackgroundException(Messages.NO_FILE_UPLOAD_ID) 

344 

345 # first prepare a job record 

346 params = {} 

347 cls.set_param(params, "file_upload_id", file_upload_id) 

348 cls.set_param(params, "attempts", 0) 

349 return background_helper.create_job(username, cls.__action__, params=params, 

350 queue_id=huey_helper.queue_id) 

351 

352 @classmethod 

353 def submit(cls, background_job): 

354 """ 

355 Submit the specified BackgroundJob to the background queue 

356 

357 :param background_job: the BackgroundJob instance 

358 :return: 

359 """ 

360 background_job.save(blocking=True) 

361 ingest_articles.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10), retries=app.config.get("HUEY_TASKS", {}).get("ingest_articles", {}).get("retries", 10), retry_delay=app.config.get("HUEY_TASKS", {}).get("ingest_articles", {}).get("retry_delay", 15)) 

362 

363 @classmethod 

364 def _file_upload(cls, username, f, schema, previous): 

365 # prep a record to go into the index, to record this upload 

366 record = models.FileUpload() 

367 record.upload(username, f.filename) 

368 record.set_id() 

369 

370 # the file path that we are going to write to 

371 xml = os.path.join(app.config.get("UPLOAD_DIR", "."), record.local_filename) 

372 

373 # it's critical here that no errors cause files to get left behind unrecorded 

374 try: 

375 # write the incoming file out to the XML file 

376 f.save(xml) 

377 

378 # save the index entry 

379 record.save() 

380 except: 

381 # if we can't record either of these things, we need to back right off 

382 try: 

383 articles_upload_helper.file_failed(xml) 

384 except: 

385 pass 

386 try: 

387 record.delete() 

388 except: 

389 pass 

390 

391 raise BackgroundException("Failed to upload file - please contact an administrator") 

392 

393 xwalk = load_xwalk(schema) 

394 

395 # now we have the record in the index and on disk, we can attempt to 

396 # validate it 

397 try: 

398 with open(xml) as handle: 

399 xwalk.validate_file(handle) 

400 record.validated(schema) 

401 record.save() 

402 previous.insert(0, record) 

403 return record.id 

404 

405 except IngestException as e: 

406 record.failed(e.message, e.inner_message) 

407 try: 

408 articles_upload_helper.file_failed(xml) 

409 except: 

410 pass 

411 record.save() 

412 previous.insert(0, record) 

413 raise BackgroundException("Failed to upload file: " + e.message + "; " + str(e.inner_message)) 

414 except Exception as e: 

415 record.failed("File system error when reading file") 

416 try: 

417 articles_upload_helper.file_failed(xml) 

418 except: 

419 pass 

420 record.save() 

421 previous.insert(0, record) 

422 raise BackgroundException("Failed to upload file - please contact an administrator") 

423 

424 @classmethod 

425 def _url_upload(cls, username, url, schema, previous): 

426 # first define a few functions 

427 def __http_upload(record, previous, url): 

428 # first thing to try is a head request, supporting redirects 

429 head = requests.head(url, allow_redirects=True) 

430 if head.status_code == requests.codes.ok: 

431 return __ok(record, previous) 

432 

433 # if we get to here, the head request failed. This might be because the file 

434 # isn't there, but it might also be that the server doesn't support HEAD (a lot 

435 # of webapps [including this one] don't implement it) 

436 # 

437 # so we do an interruptable get request instead, so we don't download too much 

438 # unnecessary content 

439 get = requests.get(url, stream=True) 

440 get.close() 

441 if get.status_code == requests.codes.ok: 

442 return __ok(record, previous) 

443 return __fail(record, previous, 

444 error='error while checking submitted file reference: {0}'.format(get.status_code)) 

445 

446 def __ftp_upload(record, previous, parsed_url): 

447 # 1. find out whether the file exists 

448 # 2. that's it, return OK 

449 

450 # We might as well check if the file exists using the SIZE command. 

451 # If the FTP server does not support SIZE, our article ingestion 

452 # script is going to refuse to process the file anyway, so might as 

453 # well get a failure now. 

454 # Also it's more of a faff to check file existence using LIST commands. 

455 try: 

456 f = ftplib.FTP(parsed_url.hostname, parsed_url.username, parsed_url.password) 

457 r = f.sendcmd('TYPE I') # SIZE is not usually allowed in ASCII mode, so set to binary mode 

458 if not r.startswith('2'): 

459 return __fail(record, previous, error='could not set binary ' 

460 'mode in target FTP server while checking file exists') 

461 if f.size(parsed_url.path) < 0: 

462 # this will either raise an error which will get caught below 

463 # or, very rarely, will return an invalid size 

464 return __fail(record, previous, error='file does not seem to exist on FTP server') 

465 

466 except Exception as e: 

467 return __fail(record, previous, error='error during FTP file existence check: ' + str(e.args)) 

468 

469 return __ok(record, previous) 

470 

471 def __ok(record, previous): 

472 record.exists() 

473 record.save() 

474 previous.insert(0, record) 

475 return record.id 

476 

477 def __fail(record, previous, error): 

478 message = 'The URL could not be accessed; ' + error 

479 record.failed(message) 

480 record.save() 

481 previous.insert(0, record) 

482 raise BackgroundException(message) 

483 

484 # prep a record to go into the index, to record this upload. The filename is the url 

485 record = models.FileUpload() 

486 record.upload(username, url) 

487 record.set_id() 

488 record.set_schema(schema) # although it could be wrong, this will get checked later 

489 

490 # now we attempt to verify that the file is retrievable 

491 try: 

492 # first, determine if ftp or http 

493 parsed_url = urlparse(url) 

494 if parsed_url.scheme in ['http', "https"]: 

495 return __http_upload(record, previous, url) 

496 elif parsed_url.scheme == 'ftp': 

497 return __ftp_upload(record, previous, parsed_url) 

498 else: 

499 return __fail(record, previous, 

500 error='unsupported URL scheme "{0}". Only HTTP(s) and FTP are supported.'.format( 

501 parsed_url.scheme)) 

502 except BackgroundException as e: 

503 raise 

504 except Exception as e: 

505 return __fail(record, previous, error="please check it before submitting again; " + str(e)) 

506 

507 

508huey_helper = IngestArticlesBackgroundTask.create_huey_helper(queue) 

509 

510 

511@huey_helper.register_execute(is_load_config=True) 

512def ingest_articles(job_id): 

513 job = models.BackgroundJob.pull(job_id) 

514 task = IngestArticlesBackgroundTask(job) 

515 BackgroundApi.execute(task)