Coverage for portality/tasks/ingestarticles.py: 77%
408 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-11-09 15:10 +0000
« prev ^ index » next coverage.py v6.4.2, created at 2022-11-09 15:10 +0000
1import re
3from portality import models
4from portality.core import app
5from portality.crosswalks.exceptions import CrosswalkException
7from portality.tasks.redis_huey import main_queue, configure
8from portality.decorators import write_required
10from portality.background import BackgroundTask, BackgroundApi, BackgroundException, RetryException
11from portality.bll.exceptions import IngestException, DuplicateArticleException, ArticleNotAcceptable
12from portality.bll import DOAJ
14from portality.lib import plugin
16import ftplib, os, requests, traceback, shutil
17from urllib.parse import urlparse
19DEFAULT_MAX_REMOTE_SIZE = 262144000
20CHUNK_SIZE = 1048576
23def file_failed(path):
24 filename = os.path.split(path)[1]
25 fad = app.config.get("FAILED_ARTICLE_DIR")
26 dest = os.path.join(fad, filename)
27 shutil.move(path, dest)
30def ftp_upload(job, path, parsed_url, file_upload):
31 # 1. find out the file size
32 # (in the meantime the size command will return an error if
33 # the file does not exist)
34 # 2. if not too big, download it
35 size_limit = app.config.get("MAX_REMOTE_SIZE", DEFAULT_MAX_REMOTE_SIZE)
37 too_large = [False] # you CANNOT override (rebind) vars inside nested funcs in python 2
38 # and we can't pass too_large and downloaded as args to
39 # ftp_callback either since we don't control what args it gets
40 # hence, widely adopted ugly hack - since you can READ nonlocal
41 # vars from nested funcs, we use a mutable container!
42 downloaded = [0]
44 def ftp_callback(chunk):
45 """Callback for processing downloaded chunks of the FTP file"""
46 if too_large[0]:
47 return
48 if type(chunk) == bytes:
49 lc = len(chunk)
50 else:
51 lc = len(bytes(chunk, "utf-8"))
52 downloaded[0] += lc
53 if downloaded[0] > size_limit:
54 too_large[0] = True
55 return
57 if chunk:
58 with open(path, 'a') as o:
59 if type(chunk) == bytes: #TODO: why chunk are different types? check test #20
60 o.write(chunk.decode("utf-8"))
61 else:
62 o.write(chunk)
63 o.flush()
65 try:
66 f = ftplib.FTP(parsed_url.hostname, parsed_url.username, parsed_url.password)
68 r = f.sendcmd('TYPE I') # SIZE is not usually allowed in ASCII mode, so set to binary mode
69 if not r.startswith('2'):
70 job.add_audit_message('could not set binary mode in target FTP server while checking file exists')
71 file_upload.failed("Unable to download file from FTP site")
72 return False
74 file_size = f.size(parsed_url.path)
75 if file_size < 0:
76 # this will either raise an error which will get caught below
77 # or, very rarely, will return an invalid size
78 job.add_audit_message('invalid file size: ' + str(file_size))
79 file_upload.failed("Unable to download file from FTP site")
80 return False
82 if file_size > size_limit:
83 file_upload.failed("The file at the URL was too large")
84 job.add_audit_message("too large")
85 return False
87 f.close()
89 f = ftplib.FTP(parsed_url.hostname, parsed_url.username, parsed_url.password)
90 c = f.retrbinary('RETR ' + parsed_url.path, ftp_callback, CHUNK_SIZE)
91 if too_large[0]:
92 file_upload.failed("The file at the URL was too large")
93 job.add_audit_message("too large")
94 try:
95 os.remove(path) # don't keep this file around
96 except:
97 pass
98 return False
100 if c.startswith('226'):
101 file_upload.downloaded()
102 return True
104 msg = 'Bad code returned by FTP server for the file transfer: "{0}"'.format(c)
105 job.add_audit_message(msg)
106 file_upload.failed(msg)
107 return False
109 except Exception as e:
110 job.add_audit_message('error during FTP file download: ' + str(e.args))
111 file_upload.failed("Unable to download file from FTP site")
112 return False
115def http_upload(job, path, file_upload):
116 try:
117 r = requests.get(file_upload.filename, stream=True)
118 if r.status_code != requests.codes.ok:
119 job.add_audit_message("The URL could not be accessed. Status: {0}, Content: {1}".format(r.status_code, r.text))
120 file_upload.failed("The URL could not be accessed")
121 return False
123 # check the content length
124 size_limit = app.config.get("MAX_REMOTE_SIZE", DEFAULT_MAX_REMOTE_SIZE)
125 cl = r.headers.get("content-length")
126 try:
127 cl = int(cl)
128 except:
129 cl = 0
131 if cl > size_limit:
132 file_upload.failed("The file at the URL was too large")
133 job.add_audit_message("too large")
134 return False
136 too_large = False
137 with open(path, 'w') as f:
138 downloaded = 0
139 for chunk in r.iter_content(chunk_size=CHUNK_SIZE): # 1Mb chunks
140 if type(chunk) == bytes: #TODO: chunk here always should be the same type, ,refer to ingestarticle tests #15 and 17
141 downloaded += len(chunk)
142 else: downloaded += len(bytes(chunk, "utf-8"))
143 # check the size limit again
144 if downloaded > size_limit:
145 file_upload.failed("The file at the URL was too large")
146 job.add_audit_message("too large")
147 too_large = True
148 break
149 if chunk: # filter out keep-alive new chunks
150 if type(chunk) == bytes:
151 f.write(chunk.decode("utf-8"))
152 else:
153 f.write(chunk)
154 f.flush()
155 except:
156 job.add_audit_message("The URL could not be accessed")
157 file_upload.failed("The URL could not be accessed")
158 return False
160 if too_large:
161 try:
162 os.remove(path) # don't keep this file around
163 except:
164 pass
165 return False
167 file_upload.downloaded()
168 return True
171class IngestArticlesBackgroundTask(BackgroundTask):
173 __action__ = "ingest_articles"
175 def run(self):
176 """
177 Execute the task as specified by the background_jon
178 :return:
179 """
180 job = self.background_job
181 params = job.params
183 if params is None:
184 raise BackgroundException("IngestArticleBackgroundTask.run run without sufficient parameters")
186 file_upload_id = self.get_param(params, "file_upload_id")
187 if file_upload_id is None:
188 raise BackgroundException("IngestArticleBackgroundTask.run run without sufficient parameters")
190 file_upload = models.FileUpload.pull(file_upload_id)
191 if file_upload is None:
192 raise BackgroundException("IngestArticleBackgroundTask.run unable to find file upload with id {x}".format(x=file_upload_id))
194 try:
195 # if the file "exists", this means its a remote file that needs to be downloaded, so do that
196 # if it's in "failed" state already but filename has a scheme, this is a retry of a download
197 if file_upload.status == "exists" or (file_upload.status == "failed" and re.match(r'^(ht|f)tps?://', file_upload.filename)):
198 job.add_audit_message("Downloading file for file upload {x}, job {y}".format(x=file_upload_id, y=job.id))
199 if self._download(file_upload) is False:
200 # TODO: add 'outcome' error here
201 job.add_audit_message("File download failed".format(x=file_upload_id, y=job.id))
203 # if the file is validated, which will happen if it has been uploaded, or downloaded successfully, process it.
204 if file_upload.status == "validated":
205 job.add_audit_message("Importing file for file upload {x}, job {y}".format(x=file_upload_id, y=job.id))
206 self._process(file_upload)
207 finally:
208 file_upload.save()
210 def _download(self, file_upload):
211 job = self.background_job
212 upload_dir = app.config.get("UPLOAD_DIR")
213 path = os.path.join(upload_dir, file_upload.local_filename)
215 # first, determine if ftp or http
216 parsed_url = urlparse(file_upload.filename)
217 if parsed_url.scheme == 'ftp':
218 if not ftp_upload(job, path, parsed_url, file_upload):
219 return False
220 elif parsed_url.scheme in ['http', "https"]:
221 if not http_upload(job, path, file_upload):
222 return False
223 else:
224 msg = "We only support HTTP(s) and FTP uploads by URL. This is a: {x}".format(x=parsed_url.scheme)
225 job.add_audit_message(msg)
226 file_upload.failed(msg)
227 return False
229 job.add_audit_message("Downloaded {x} as {y}".format(x=file_upload.filename, y=file_upload.local_filename))
231 xwalk_name = app.config.get("ARTICLE_CROSSWALKS", {}).get(file_upload.schema)
232 try:
233 xwalk = plugin.load_class(xwalk_name)()
234 except IngestException:
235 raise RetryException("Unable to load schema {}".format(xwalk_name))
237 # now we have the record in the index and on disk, we can attempt to
238 # validate it
239 try:
240 with open(path) as handle:
241 xwalk.validate_file(handle)
242 except IngestException as e:
243 job.add_audit_message("IngestException: {x}".format(x=e.trace()))
244 file_upload.failed(e.message, e.inner_message)
245 try:
246 file_failed(path)
247 except:
248 job.add_audit_message("Error cleaning up file which caused IngestException: {x}".format(x=traceback.format_exc()))
249 return False
250 except Exception as e:
251 job.add_audit_message("File system error while downloading file: {x}".format(x=traceback.format_exc()))
252 file_upload.failed("File system error when downloading file")
253 try:
254 file_failed(path)
255 except:
256 job.add_audit_message("Error cleaning up file which caused Exception: {x}".format(x=traceback.format_exc()))
257 return False
259 # if we get to here then we have a successfully downloaded and validated
260 # document, so we can write it to the index
261 job.add_audit_message("Validated file as schema {x}".format(x=file_upload.schema))
262 file_upload.validated(file_upload.schema)
263 return True
265 def _process(self, file_upload):
266 job = self.background_job
267 upload_dir = app.config.get("UPLOAD_DIR")
268 path = os.path.join(upload_dir, file_upload.local_filename)
270 if not os.path.exists(path):
271 job.add_audit_message("File not found at path {} . Retrying job later.".format(path))
272 count = self.get_param(job.params, "attempts")
273 retry_limit = app.config.get("HUEY_TASKS", {}).get("ingest_articles", {}).get("retries", 0)
274 self.set_param(job.params, "attempts", count + 1)
276 if retry_limit <= count:
277 job.add_audit_message("File still not found at path {} . Giving up.".format(path))
278 job.fail()
280 raise RetryException()
282 job.add_audit_message("Importing from {x}".format(x=path))
284 articleService = DOAJ.articleService()
285 account = models.Account.pull(file_upload.owner)
287 xwalk_name = app.config.get("ARTICLE_CROSSWALKS", {}).get(file_upload.schema)
288 try:
289 xwalk = plugin.load_class(xwalk_name)()
290 except IngestException:
291 raise RetryException("Unable to load schema {}".format(xwalk_name))
293 ingest_exception = False
294 result = {}
295 articles = None
296 try:
297 with open(path) as handle:
298 articles = xwalk.crosswalk_file(handle, add_journal_info=False) # don't import the journal info, as we haven't validated ownership of the ISSNs in the article yet
299 for article in articles:
300 article.set_upload_id(file_upload.id)
301 result = articleService.batch_create_articles(articles, account, add_journal_info=True)
302 except (IngestException, CrosswalkException) as e:
303 job.add_audit_message("IngestException: {msg}. Inner message: {inner}. Stack: {x}".format(msg=e.message, inner=e.inner_message, x=e.trace()))
304 file_upload.failed(e.message, e.inner_message)
305 result = e.result
306 try:
307 file_failed(path)
308 ingest_exception = True
309 except:
310 job.add_audit_message("Error cleaning up file which caused IngestException: {x}".format(x=traceback.format_exc()))
311 except (DuplicateArticleException, ArticleNotAcceptable) as e:
312 job.add_audit_message("One or more articles did not contain either a DOI or a Fulltext URL")
313 file_upload.failed("One or more articles did not contain either a DOI or a Fulltext URL")
314 try:
315 file_failed(path)
316 except:
317 job.add_audit_message("Error cleaning up file which caused Exception: {x}".format(x=traceback.format_exc()))
318 return
319 except Exception as e:
320 job.add_audit_message("Unanticipated error: {x}".format(x=traceback.format_exc()))
321 file_upload.failed("Unanticipated error when importing articles")
322 try:
323 file_failed(path)
324 except:
325 job.add_audit_message("Error cleaning up file which caused Exception: {x}".format(x=traceback.format_exc()))
326 return
328 success = result.get("success", 0)
329 fail = result.get("fail", 0)
330 update = result.get("update", 0)
331 new = result.get("new", 0)
332 shared = result.get("shared", [])
333 unowned = result.get("unowned", [])
334 unmatched = result.get("unmatched", [])
336 if success == 0 and fail > 0 and not ingest_exception:
337 file_upload.failed("All articles in file failed to import")
338 job.add_audit_message("All articles in file failed to import")
339 if success > 0 and fail == 0:
340 file_upload.processed(success, update, new)
341 if success > 0 and fail > 0:
342 file_upload.partial(success, fail, update, new)
343 job.add_audit_message("Some articles in file failed to import correctly, so no articles imported")
345 file_upload.set_failure_reasons(list(shared), list(unowned), list(unmatched))
346 job.add_audit_message("Shared ISSNs: " + ", ".join(list(shared)))
347 job.add_audit_message("Unowned ISSNs: " + ", ".join(list(unowned)))
348 job.add_audit_message("Unmatched ISSNs: " + ", ".join(list(unmatched)))
350 if new:
351 ids = [a.id for a in articles]
352 job.add_audit_message("Created/updated articles: " + ", ".join(list(ids)))
354 if not ingest_exception:
355 try:
356 os.remove(path) # just remove the file, no need to keep it
357 except Exception as e:
358 job.add_audit_message(u"Error while deleting file {x}: {y}".format(x=path, y=str(e)))
360 def cleanup(self):
361 """
362 Cleanup after a successful OR failed run of the task
363 :return:
364 """
365 job = self.background_job
366 params = job.params
368 @classmethod
369 def prepare(cls, username, **kwargs):
370 """
371 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob,
372 or fail with a suitable exception
374 :param kwargs: arbitrary keyword arguments pertaining to this task type
375 :return: a BackgroundJob instance representing this task
376 """
378 upload_dir = app.config.get("UPLOAD_DIR")
379 if upload_dir is None:
380 raise BackgroundException("UPLOAD_DIR is not set in configuration")
382 f = kwargs.get("upload_file")
383 schema = kwargs.get("schema")
384 url = kwargs.get("url")
385 previous = kwargs.get("previous", [])
387 if f is None and url is None:
388 raise BackgroundException("You must specify one of 'upload_file' or 'url' as keyword arguments")
389 if schema is None:
390 raise BackgroundException("You must specify 'schema' in the keyword arguments")
392 file_upload_id = None
393 if f is not None and f.filename != "":
394 file_upload_id = cls._file_upload(username, f, schema, previous)
395 elif url is not None and url != "":
396 file_upload_id = cls._url_upload(username, url, schema, previous)
398 if file_upload_id is None:
399 raise BackgroundException("No file upload record was created")
401 # first prepare a job record
402 job = models.BackgroundJob()
403 job.user = username
404 job.action = cls.__action__
406 params = {}
407 cls.set_param(params, "file_upload_id", file_upload_id)
408 cls.set_param(params, "attempts", 0)
409 job.params = params
411 return job
413 @classmethod
414 def submit(cls, background_job):
415 """
416 Submit the specified BackgroundJob to the background queue
418 :param background_job: the BackgroundJob instance
419 :return:
420 """
421 background_job.save(blocking=True)
422 ingest_articles.schedule(args=(background_job.id,), delay=10)
424 @classmethod
425 def _file_upload(cls, username, f, schema, previous):
426 # prep a record to go into the index, to record this upload
427 record = models.FileUpload()
428 record.upload(username, f.filename)
429 record.set_id()
431 # the file path that we are going to write to
432 xml = os.path.join(app.config.get("UPLOAD_DIR", "."), record.local_filename)
434 # it's critical here that no errors cause files to get left behind unrecorded
435 try:
436 # write the incoming file out to the XML file
437 f.save(xml)
439 # save the index entry
440 record.save()
441 except:
442 # if we can't record either of these things, we need to back right off
443 try:
444 file_failed(xml)
445 except:
446 pass
447 try:
448 record.delete()
449 except:
450 pass
452 raise BackgroundException("Failed to upload file - please contact an administrator")
454 xwalk_name = app.config.get("ARTICLE_CROSSWALKS", {}).get(schema)
455 try:
456 xwalk = plugin.load_class(xwalk_name)()
457 except IngestException:
458 raise RetryException(u"Unable to load schema {}".format(xwalk_name))
460 # now we have the record in the index and on disk, we can attempt to
461 # validate it
462 try:
463 with open(xml) as handle:
464 xwalk.validate_file(handle)
465 record.validated(schema)
466 record.save()
467 previous.insert(0, record)
468 return record.id
470 except IngestException as e:
471 record.failed(e.message, e.inner_message)
472 try:
473 file_failed(xml)
474 except:
475 pass
476 record.save()
477 previous.insert(0, record)
478 raise BackgroundException("Failed to upload file: " + e.message + "; " + str(e.inner_message))
479 except Exception as e:
480 record.failed("File system error when reading file")
481 try:
482 file_failed(xml)
483 except:
484 pass
485 record.save()
486 previous.insert(0, record)
487 raise BackgroundException("Failed to upload file - please contact an administrator")
489 @classmethod
490 def _url_upload(cls, username, url, schema, previous):
491 # first define a few functions
492 def __http_upload(record, previous, url):
493 # first thing to try is a head request, supporting redirects
494 head = requests.head(url, allow_redirects=True)
495 if head.status_code == requests.codes.ok:
496 return __ok(record, previous)
498 # if we get to here, the head request failed. This might be because the file
499 # isn't there, but it might also be that the server doesn't support HEAD (a lot
500 # of webapps [including this one] don't implement it)
501 #
502 # so we do an interruptable get request instead, so we don't download too much
503 # unnecessary content
504 get = requests.get(url, stream=True)
505 get.close()
506 if get.status_code == requests.codes.ok:
507 return __ok(record, previous)
508 return __fail(record, previous, error='error while checking submitted file reference: {0}'.format(get.status_code))
510 def __ftp_upload(record, previous, parsed_url):
511 # 1. find out whether the file exists
512 # 2. that's it, return OK
514 # We might as well check if the file exists using the SIZE command.
515 # If the FTP server does not support SIZE, our article ingestion
516 # script is going to refuse to process the file anyway, so might as
517 # well get a failure now.
518 # Also it's more of a faff to check file existence using LIST commands.
519 try:
520 f = ftplib.FTP(parsed_url.hostname, parsed_url.username, parsed_url.password)
521 r = f.sendcmd('TYPE I') # SIZE is not usually allowed in ASCII mode, so set to binary mode
522 if not r.startswith('2'):
523 return __fail(record, previous, error='could not set binary '
524 'mode in target FTP server while checking file exists')
525 if f.size(parsed_url.path) < 0:
526 # this will either raise an error which will get caught below
527 # or, very rarely, will return an invalid size
528 return __fail(record, previous, error='file does not seem to exist on FTP server')
530 except Exception as e:
531 return __fail(record, previous, error='error during FTP file existence check: ' + str(e.args))
533 return __ok(record, previous)
535 def __ok(record, previous):
536 record.exists()
537 record.save()
538 previous.insert(0, record)
539 return record.id
541 def __fail(record, previous, error):
542 message = 'The URL could not be accessed; ' + error
543 record.failed(message)
544 record.save()
545 previous.insert(0, record)
546 raise BackgroundException(message)
548 # prep a record to go into the index, to record this upload. The filename is the url
549 record = models.FileUpload()
550 record.upload(username, url)
551 record.set_id()
552 record.set_schema(schema) # although it could be wrong, this will get checked later
554 # now we attempt to verify that the file is retrievable
555 try:
556 # first, determine if ftp or http
557 parsed_url = urlparse(url)
558 if parsed_url.scheme in ['http', "https"]:
559 return __http_upload(record, previous, url)
560 elif parsed_url.scheme == 'ftp':
561 return __ftp_upload(record, previous, parsed_url)
562 else:
563 return __fail(record, previous, error='unsupported URL scheme "{0}". Only HTTP(s) and FTP are supported.'.format(parsed_url.scheme))
564 except BackgroundException as e:
565 raise
566 except Exception as e:
567 return __fail(record, previous, error="please check it before submitting again; " + str(e))
570@main_queue.task(**configure("ingest_articles"))
571@write_required(script=True)
572def ingest_articles(job_id):
573 job = models.BackgroundJob.pull(job_id)
574 task = IngestArticlesBackgroundTask(job)
575 BackgroundApi.execute(task)