Coverage for portality / tasks / ingestarticles.py: 78%
342 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
1import ftplib
2import os
3import re
4import traceback
5from urllib.parse import urlparse
7import requests
9from portality import models
10from portality.background import BackgroundTask, BackgroundApi, BackgroundException
11from huey.exceptions import RetryTask
12from portality.bll.exceptions import IngestException
13from portality.core import app
14from portality.lib import plugin
15from portality.tasks.helpers import background_helper, articles_upload_helper
16from portality.tasks.redis_huey import events_queue as queue
17from portality.ui.messages import Messages
19DEFAULT_MAX_REMOTE_SIZE = 262144000
20CHUNK_SIZE = 1048576
23def load_xwalk(schema):
24 xwalk_name = app.config.get("ARTICLE_CROSSWALKS", {}).get(schema)
25 try:
26 return plugin.load_class(xwalk_name)()
27 except IngestException:
28 raise RetryTask("Unable to load schema {}".format(xwalk_name))
31def ftp_upload(job, path, parsed_url, file_upload):
32 # 1. find out the file size
33 # (in the meantime the size command will return an error if
34 # the file does not exist)
35 # 2. if not too big, download it
36 size_limit = app.config.get("MAX_REMOTE_SIZE", DEFAULT_MAX_REMOTE_SIZE)
38 """
39 you CANNOT override (rebind) vars inside nested funcs in python 2
40 and we can't pass too_large and downloaded as args to
41 ftp_callback either since we don't control what args it gets
42 hence, widely adopted ugly hack - since you can READ nonlocal
43 vars from nested funcs, we use a mutable container!
44 """
45 too_large = [False]
46 downloaded = [0]
48 def ftp_callback(chunk):
49 """Callback for processing downloaded chunks of the FTP file"""
50 if too_large[0]:
51 return
52 if type(chunk) == bytes:
53 lc = len(chunk)
54 else:
55 lc = len(bytes(chunk, "utf-8"))
56 downloaded[0] += lc
57 if downloaded[0] > size_limit:
58 too_large[0] = True
59 return
61 if chunk:
62 with open(path, 'a') as o:
63 if type(chunk) == bytes: # TODO: why chunk are different types? check test #20
64 o.write(chunk.decode("utf-8"))
65 else:
66 o.write(chunk)
67 o.flush()
69 try:
70 f = ftplib.FTP(parsed_url.hostname, parsed_url.username, parsed_url.password)
72 r = f.sendcmd('TYPE I') # SIZE is not usually allowed in ASCII mode, so set to binary mode
73 if not r.startswith('2'):
74 job.add_audit_message('could not set binary mode in target FTP server while checking file exists')
75 file_upload.failed("Unable to download file from FTP site")
76 return False
78 file_size = f.size(parsed_url.path)
79 if file_size < 0:
80 # this will either raise an error which will get caught below
81 # or, very rarely, will return an invalid size
82 job.add_audit_message('invalid file size: ' + str(file_size))
83 file_upload.failed("Unable to download file from FTP site")
84 return False
86 if file_size > size_limit:
87 file_upload.failed("The file at the URL was too large")
88 job.add_audit_message("too large")
89 return False
91 f.close()
93 f = ftplib.FTP(parsed_url.hostname, parsed_url.username, parsed_url.password)
94 c = f.retrbinary('RETR ' + parsed_url.path, ftp_callback, CHUNK_SIZE)
95 if too_large[0]:
96 file_upload.failed("The file at the URL was too large")
97 job.add_audit_message("too large")
98 try:
99 os.remove(path) # don't keep this file around
100 except:
101 pass
102 return False
104 if c.startswith('226'):
105 file_upload.downloaded()
106 return True
108 msg = 'Bad code returned by FTP server for the file transfer: "{0}"'.format(c)
109 job.add_audit_message(msg)
110 file_upload.failed(msg)
111 return False
113 except Exception as e:
114 job.add_audit_message('error during FTP file download: ' + str(e.args))
115 file_upload.failed("Unable to download file from FTP site")
116 return False
119def http_upload(job, path, file_upload):
120 try:
121 r = requests.get(file_upload.filename, stream=True)
122 if r.status_code != requests.codes.ok:
123 job.add_audit_message("The URL could not be accessed. Status: {0}, Content: {1}"
124 .format(r.status_code, r.text))
125 file_upload.failed("The URL could not be accessed")
126 return False
128 # check the content length
129 size_limit = app.config.get("MAX_REMOTE_SIZE", DEFAULT_MAX_REMOTE_SIZE)
130 cl = r.headers.get("content-length")
131 try:
132 cl = int(cl)
133 except:
134 cl = 0
136 if cl > size_limit:
137 file_upload.failed("The file at the URL was too large")
138 job.add_audit_message("too large")
139 return False
141 too_large = False
142 with open(path, 'w') as f:
143 downloaded = 0
144 for chunk in r.iter_content(chunk_size=CHUNK_SIZE): # 1Mb chunks
145 if type(chunk) == bytes: # TODO: chunk here always should be the same type, ,refer to ingestarticle tests #15 and 17
146 downloaded += len(chunk)
147 else:
148 downloaded += len(bytes(chunk, "utf-8"))
149 # check the size limit again
150 if downloaded > size_limit:
151 file_upload.failed("The file at the URL was too large")
152 job.add_audit_message("too large")
153 too_large = True
154 break
155 if chunk: # filter out keep-alive new chunks
156 if type(chunk) == bytes:
157 f.write(chunk.decode("utf-8"))
158 else:
159 f.write(chunk)
160 f.flush()
161 except:
162 job.add_audit_message("The URL could not be accessed")
163 file_upload.failed("The URL could not be accessed")
164 return False
166 if too_large:
167 try:
168 os.remove(path) # don't keep this file around
169 except:
170 pass
171 return False
173 file_upload.downloaded()
174 return True
177class IngestArticlesBackgroundTask(BackgroundTask):
178 __action__ = "ingest_articles"
180 def run(self):
181 """
182 Execute the task as specified by the background_jon
183 :return:
184 """
185 job = self.background_job
186 params = job.params
188 if params is None:
189 raise BackgroundException("IngestArticleBackgroundTask.run run without sufficient parameters")
191 file_upload_id = self.get_param(params, "file_upload_id")
192 if file_upload_id is None:
193 raise BackgroundException("IngestArticleBackgroundTask.run run without sufficient parameters")
195 file_upload = models.FileUpload.pull(file_upload_id)
196 if file_upload is None:
197 raise BackgroundException("IngestArticleBackgroundTask.run unable to find file upload with id {x}"
198 .format(x=file_upload_id))
200 try:
201 # if the file "exists", this means its a remote file that needs to be downloaded, so do that
202 # if it's in "failed" state already but filename has a scheme, this is a retry of a download
203 if file_upload.status == "exists" or (file_upload.status == "failed"
204 and re.match(r'^(ht|f)tps?://', file_upload.filename)):
205 job.add_audit_message("Downloading file for file upload {x}, job {y}"
206 .format(x=file_upload_id, y=job.id))
207 if self._download(file_upload) is False:
208 # TODO: add 'outcome' error here
209 job.add_audit_message("File download failed".format(x=file_upload_id, y=job.id))
210 job.outcome_fail()
212 # if the file is validated, which will happen if it has been uploaded, or downloaded successfully, process it.
213 if file_upload.status == "validated":
214 job.add_audit_message("Importing file for file upload {x}, job {y}".format(x=file_upload_id, y=job.id))
215 self._process(file_upload)
216 finally:
217 file_upload.save()
219 def _download(self, file_upload: models.FileUpload) -> bool:
220 job = self.background_job
221 upload_dir = app.config.get("UPLOAD_DIR")
222 path = os.path.join(upload_dir, file_upload.local_filename)
224 # first, determine if ftp or http
225 parsed_url = urlparse(file_upload.filename)
226 if parsed_url.scheme == 'ftp':
227 if not ftp_upload(job, path, parsed_url, file_upload):
228 return False
229 elif parsed_url.scheme in ['http', "https"]:
230 if not http_upload(job, path, file_upload):
231 return False
232 else:
233 msg = "We only support HTTP(s) and FTP uploads by URL. This is a: {x}".format(x=parsed_url.scheme)
234 job.add_audit_message(msg)
235 file_upload.failed(msg)
236 return False
238 job.add_audit_message("Downloaded {x} as {y}".format(x=file_upload.filename, y=file_upload.local_filename))
240 xwalk = load_xwalk(file_upload.schema)
242 # now we have the record in the index and on disk, we can attempt to
243 # validate it
244 try:
245 with open(path) as handle:
246 xwalk.validate_file(handle)
247 except IngestException as e:
248 job.add_audit_message("IngestException: {x}".format(x=e.trace()))
249 file_upload.failed(e.message, e.inner_message)
250 try:
251 articles_upload_helper.file_failed(path)
252 except:
253 job.add_audit_message("Error cleaning up file which caused IngestException: {x}"
254 .format(x=traceback.format_exc()))
255 return False
256 except Exception as e:
257 job.add_audit_message("File system error while downloading file: {x}"
258 .format(x=traceback.format_exc()))
259 file_upload.failed("File system error when downloading file")
260 try:
261 articles_upload_helper.file_failed(path)
262 except:
263 job.add_audit_message("Error cleaning up file which caused Exception: {x}"
264 .format(x=traceback.format_exc()))
265 return False
267 # if we get to here then we have a successfully downloaded and validated
268 # document, so we can write it to the index
269 job.add_audit_message("Validated file as schema {x}".format(x=file_upload.schema))
270 file_upload.validated(file_upload.schema)
271 return True
273 def _process(self, file_upload: models.FileUpload):
274 job = self.background_job
275 upload_dir = app.config.get("UPLOAD_DIR")
276 path = os.path.join(upload_dir, file_upload.local_filename)
278 if not os.path.exists(path):
279 job.add_audit_message("File not found at path {} . Retrying job later.".format(path))
280 count = self.get_param(job.params, "attempts")
281 retry_limit = app.config.get("HUEY_TASKS", {}).get("ingest_articles", {}).get("retries", 0)
282 self.set_param(job.params, "attempts", count + 1)
284 if retry_limit <= count:
285 job.add_audit_message("File still not found at path {} . Giving up.".format(path))
286 job.fail()
287 else:
288 raise RetryTask()
290 job.add_audit_message("Importing from {x}".format(x=path))
292 xwalk = load_xwalk(file_upload.schema)
294 def _articles_factory(p):
295 with open(p) as f:
296 # don't import the journal info, as we haven't validated ownership of the ISSNs in the article yet
297 articles = xwalk.crosswalk_file(f, add_journal_info=False)
298 for article in articles:
299 article.set_upload_id(file_upload.id)
300 return articles
302 articles_upload_helper.upload_process(file_upload, job, path, _articles_factory)
304 def cleanup(self):
305 """
306 Cleanup after a successful OR failed run of the task
307 :return:
308 """
309 job = self.background_job
310 params = job.params
312 @classmethod
313 def prepare(cls, username, **kwargs):
314 """
315 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob,
316 or fail with a suitable exception
318 :param kwargs: arbitrary keyword arguments pertaining to this task type
319 :return: a BackgroundJob instance representing this task
320 """
322 upload_dir = app.config.get("UPLOAD_DIR")
323 if upload_dir is None:
324 raise BackgroundException("UPLOAD_DIR is not set in configuration")
326 f = kwargs.get("upload_file")
327 schema = kwargs.get("schema")
328 url = kwargs.get("url")
329 previous = kwargs.get("previous", [])
331 if f is None and url is None:
332 raise BackgroundException("You must specify one of 'upload_file' or 'url' as keyword arguments")
333 if schema is None:
334 raise BackgroundException("You must specify 'schema' in the keyword arguments")
336 file_upload_id = None
337 if f is not None and f.filename != "":
338 file_upload_id = cls._file_upload(username, f, schema, previous)
339 elif url is not None and url != "":
340 file_upload_id = cls._url_upload(username, url, schema, previous)
342 if file_upload_id is None:
343 raise BackgroundException(Messages.NO_FILE_UPLOAD_ID)
345 # first prepare a job record
346 params = {}
347 cls.set_param(params, "file_upload_id", file_upload_id)
348 cls.set_param(params, "attempts", 0)
349 return background_helper.create_job(username, cls.__action__, params=params,
350 queue_id=huey_helper.queue_id)
352 @classmethod
353 def submit(cls, background_job):
354 """
355 Submit the specified BackgroundJob to the background queue
357 :param background_job: the BackgroundJob instance
358 :return:
359 """
360 background_job.save(blocking=True)
361 ingest_articles.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10), retries=app.config.get("HUEY_TASKS", {}).get("ingest_articles", {}).get("retries", 10), retry_delay=app.config.get("HUEY_TASKS", {}).get("ingest_articles", {}).get("retry_delay", 15))
363 @classmethod
364 def _file_upload(cls, username, f, schema, previous):
365 # prep a record to go into the index, to record this upload
366 record = models.FileUpload()
367 record.upload(username, f.filename)
368 record.set_id()
370 # the file path that we are going to write to
371 xml = os.path.join(app.config.get("UPLOAD_DIR", "."), record.local_filename)
373 # it's critical here that no errors cause files to get left behind unrecorded
374 try:
375 # write the incoming file out to the XML file
376 f.save(xml)
378 # save the index entry
379 record.save()
380 except:
381 # if we can't record either of these things, we need to back right off
382 try:
383 articles_upload_helper.file_failed(xml)
384 except:
385 pass
386 try:
387 record.delete()
388 except:
389 pass
391 raise BackgroundException("Failed to upload file - please contact an administrator")
393 xwalk = load_xwalk(schema)
395 # now we have the record in the index and on disk, we can attempt to
396 # validate it
397 try:
398 with open(xml) as handle:
399 xwalk.validate_file(handle)
400 record.validated(schema)
401 record.save()
402 previous.insert(0, record)
403 return record.id
405 except IngestException as e:
406 record.failed(e.message, e.inner_message)
407 try:
408 articles_upload_helper.file_failed(xml)
409 except:
410 pass
411 record.save()
412 previous.insert(0, record)
413 raise BackgroundException("Failed to upload file: " + e.message + "; " + str(e.inner_message))
414 except Exception as e:
415 record.failed("File system error when reading file")
416 try:
417 articles_upload_helper.file_failed(xml)
418 except:
419 pass
420 record.save()
421 previous.insert(0, record)
422 raise BackgroundException("Failed to upload file - please contact an administrator")
424 @classmethod
425 def _url_upload(cls, username, url, schema, previous):
426 # first define a few functions
427 def __http_upload(record, previous, url):
428 # first thing to try is a head request, supporting redirects
429 head = requests.head(url, allow_redirects=True)
430 if head.status_code == requests.codes.ok:
431 return __ok(record, previous)
433 # if we get to here, the head request failed. This might be because the file
434 # isn't there, but it might also be that the server doesn't support HEAD (a lot
435 # of webapps [including this one] don't implement it)
436 #
437 # so we do an interruptable get request instead, so we don't download too much
438 # unnecessary content
439 get = requests.get(url, stream=True)
440 get.close()
441 if get.status_code == requests.codes.ok:
442 return __ok(record, previous)
443 return __fail(record, previous,
444 error='error while checking submitted file reference: {0}'.format(get.status_code))
446 def __ftp_upload(record, previous, parsed_url):
447 # 1. find out whether the file exists
448 # 2. that's it, return OK
450 # We might as well check if the file exists using the SIZE command.
451 # If the FTP server does not support SIZE, our article ingestion
452 # script is going to refuse to process the file anyway, so might as
453 # well get a failure now.
454 # Also it's more of a faff to check file existence using LIST commands.
455 try:
456 f = ftplib.FTP(parsed_url.hostname, parsed_url.username, parsed_url.password)
457 r = f.sendcmd('TYPE I') # SIZE is not usually allowed in ASCII mode, so set to binary mode
458 if not r.startswith('2'):
459 return __fail(record, previous, error='could not set binary '
460 'mode in target FTP server while checking file exists')
461 if f.size(parsed_url.path) < 0:
462 # this will either raise an error which will get caught below
463 # or, very rarely, will return an invalid size
464 return __fail(record, previous, error='file does not seem to exist on FTP server')
466 except Exception as e:
467 return __fail(record, previous, error='error during FTP file existence check: ' + str(e.args))
469 return __ok(record, previous)
471 def __ok(record, previous):
472 record.exists()
473 record.save()
474 previous.insert(0, record)
475 return record.id
477 def __fail(record, previous, error):
478 message = 'The URL could not be accessed; ' + error
479 record.failed(message)
480 record.save()
481 previous.insert(0, record)
482 raise BackgroundException(message)
484 # prep a record to go into the index, to record this upload. The filename is the url
485 record = models.FileUpload()
486 record.upload(username, url)
487 record.set_id()
488 record.set_schema(schema) # although it could be wrong, this will get checked later
490 # now we attempt to verify that the file is retrievable
491 try:
492 # first, determine if ftp or http
493 parsed_url = urlparse(url)
494 if parsed_url.scheme in ['http', "https"]:
495 return __http_upload(record, previous, url)
496 elif parsed_url.scheme == 'ftp':
497 return __ftp_upload(record, previous, parsed_url)
498 else:
499 return __fail(record, previous,
500 error='unsupported URL scheme "{0}". Only HTTP(s) and FTP are supported.'.format(
501 parsed_url.scheme))
502 except BackgroundException as e:
503 raise
504 except Exception as e:
505 return __fail(record, previous, error="please check it before submitting again; " + str(e))
508huey_helper = IngestArticlesBackgroundTask.create_huey_helper(queue)
511@huey_helper.register_execute(is_load_config=True)
512def ingest_articles(job_id):
513 job = models.BackgroundJob.pull(job_id)
514 task = IngestArticlesBackgroundTask(job)
515 BackgroundApi.execute(task)