Coverage for portality/tasks/preservation.py: 64%
442 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-22 15:59 +0100
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-22 15:59 +0100
1import csv
2import hashlib
3import json
4import os
5import requests
6import shutil
7import tarfile
8from typing import List
9from bagit import make_bag, BagError
10from copy import deepcopy
11from datetime import datetime
12from zipfile import ZipFile
13from pathlib import Path
15from portality.background import BackgroundTask, BackgroundApi
16from portality.core import app
17from portality.decorators import write_required
18from portality.lib import dates
19from portality.models import Account, Article, BackgroundJob, PreservationState
20from portality.regex import DOI_COMPILED, HTTP_URL_COMPILED
21from portality.tasks.redis_huey import main_queue, configure
22from portality.bll import DOAJ
25class PreservationException(Exception):
26 """~~PreservationException:Exception~~"""
27 pass
30class PreservationStorageException(Exception):
31 pass
34class ValidationError(Exception):
35 pass
38class ArticlePackage:
39 """ ~~ArticlePackage:Feature~~"""
41 def __init__(self, article_dir, files):
42 self.issn = None
43 self.article_id = None
44 self.metadata = None
45 self.article_dir = article_dir
46 self.article_files = files
47 self.package_dir = None
48 self.has_error = False
49 self.error_details = None
51 def create_article_bagit_structure(self):
52 """ ~~-> BagIt:Library~~
53 Create directory structure for packaging
54 Create required additional files
55 Create bagit files
56 """
57 # Validate if required data is available
58 self.validate()
60 journal_dir = os.path.join(self.package_dir, self.issn)
61 if not os.path.exists(journal_dir):
62 os.mkdir(journal_dir)
64 dest_article_dir = os.path.join(journal_dir, self.article_id)
65 if not os.path.exists(dest_article_dir):
66 # Create article directory
67 os.mkdir(dest_article_dir)
69 # Create metadata directory
70 metada_dir = os.path.join(dest_article_dir, "metadata")
71 if not os.path.exists(metada_dir):
72 os.mkdir(metada_dir)
74 # Copy the files from user uploaded directory to the package
75 for file in self.article_files:
76 if not file == Preservation.IDENTIFIER_FILE:
77 src = os.path.join(self.article_dir, file)
78 dest = os.path.join(dest_article_dir, file)
79 shutil.copy(src, dest)
81 # Create metadata file with article information
82 with open(os.path.join(metada_dir, "metadata.json"), 'w+') as metadata_file:
83 metadata_file.write(json.dumps(self.metadata, indent=4))
85 # Create a identifier file with uuid of the article
86 with open(os.path.join(metada_dir, "identifier.txt"), 'w+') as metadata_file:
87 metadata_file.write(self.article_id)
89 try:
90 # Bag the article
91 make_bag(dest_article_dir, checksums=["sha256"])
92 except BagError:
93 app.logger.excception(f"Error while creating Bag for article {self.article_id}")
94 raise PreservationException("Error while creating Bag")
96 def validate(self):
97 variables_list = []
99 if not self.package_dir:
100 variables_list.append("package_dir")
101 if not self.metadata:
102 variables_list.append("metadata")
103 if not self.article_dir:
104 variables_list.append("article_dir")
105 if not self.article_files or len(self.article_files) == 0:
106 variables_list.append("article_files")
107 if not self.article_id:
108 variables_list.append("article_id")
109 if not self.issn:
110 variables_list.append("issn")
112 if len(variables_list) > 0:
113 app.logger.debug(f"Validation Values : package_dir {self.package_dir} "
114 f"metadata {self.metadata} article_dir {self.article_dir} "
115 f"article_files {self.article_files} article_id {self.article_id} issn {self.issn}")
116 raise ValidationError(f"Required fields cannot be empty {variables_list}")
119class ArticlesList:
120 """This class contains different types of lists depending on the article state"""
121 def __init__(self):
122 self.__successful_articles = []
123 self.__unowned_articles = []
124 self.__no_identifier_articles = []
125 self.__unbagged_articles = []
126 self.__not_found_articles = []
127 self.__no_files_articles = []
128 self.has_errors = False
130 def add_successful_article(self, article: ArticlePackage):
131 self.__successful_articles.append(os.path.basename(article.article_dir))
133 def add_unowned_articles(self, article: ArticlePackage):
134 self.has_errors = True
135 self.__unowned_articles.append(os.path.basename(article.article_dir))
137 def add_no_identifier_articles(self, article: ArticlePackage):
138 self.has_errors = True
139 self.__no_identifier_articles.append(os.path.basename(article.article_dir))
141 def add_unbagged_articles(self, article: ArticlePackage):
142 self.has_errors = True
143 self.__unbagged_articles.append(os.path.basename(article.article_dir))
145 def add_not_found_articles(self, article: ArticlePackage):
146 self.has_errors = True
147 self.__not_found_articles.append(os.path.basename(article.article_dir))
149 def add_no_files_articles(self, article: ArticlePackage):
150 self.__no_files_articles.append(os.path.basename(article.article_dir))
152 def successful_articles(self):
153 return self.__successful_articles
155 def unowned_articles(self):
156 return self.__unowned_articles
158 def no_identifier_articles(self):
159 return self.__no_identifier_articles
161 def unbagged_articles(self):
162 return self.__unbagged_articles
164 def not_found_articles(self):
165 return self.__not_found_articles
167 def no_files_articles(self):
168 return self.__no_files_articles
170 def get_count(self):
171 return len(self.__successful_articles) + \
172 len(self.__unowned_articles) + \
173 len(self.__no_identifier_articles) +\
174 len(self.__unbagged_articles) +\
175 len(self.__not_found_articles) + \
176 len(self.__no_files_articles)
178 def is_partial_success(self):
179 if len(self.__successful_articles) > 0 and \
180 (len(self.__unbagged_articles) > 0 or
181 len(self.__unowned_articles) > 0 or
182 len(self.__not_found_articles) > 0 or
183 len(self.__no_identifier_articles) > 0 or
184 len(self.__no_files_articles)):
185 return True
187 return False
190class PreservationBackgroundTask(BackgroundTask):
191 """~~PreservationBackground:Feature~~"""
193 __action__ = "preserve"
195 @classmethod
196 def prepare(cls, username, **kwargs):
197 """
198 Create necessary directories and save the file.
199 Creates the background job
200 :param username:
201 :param kwargs:
202 :return: background job
203 """
205 created_time = dates.format(datetime.utcnow(), "%Y-%m-%d-%H-%M-%S")
206 dir_name = username + "-" + created_time
207 local_dir = os.path.join(Preservation.UPLOAD_DIR, dir_name)
208 file = kwargs.get("upload_file")
210 preservation = Preservation(local_dir, username)
211 preservation.save_file(file)
213 # prepare a job record
214 job = BackgroundJob()
215 job.user = username
216 job.action = cls.__action__
218 params = {}
219 cls.set_param(params, "local_dir", local_dir)
220 job.params = params
222 return job
224 def run(self):
226 job = self.background_job
228 params = job.params
229 local_dir = self.get_param(params, "local_dir")
230 model_id = self.get_param(params, "model_id")
231 app.logger.debug(f"Local dir {local_dir}")
232 app.logger.debug(f"model_id {model_id}")
234 preserve_model = PreservationState.pull(model_id)
235 preserve_model.background_task_id = job.id
236 preserve_model.pending()
237 preserve_model.save()
239 # ~~-> Preservation:Feature~~
240 preserv = Preservation(local_dir, job.user)
241 preserv.upload_filename = preserve_model.filename
242 try:
243 job.add_audit_message("Extract zip file")
244 preserv.extract_zip_file()
245 app.logger.debug("Extracted zip file")
247 job.add_audit_message("Create Package structure")
248 articles_list = preserv.create_package_structure()
249 self.save_articles_list(articles_list, preserve_model)
250 app.logger.debug("Created package structure")
252 if len(articles_list.successful_articles()) > 0:
253 package = PreservationPackage(preserv.preservation_dir, job.user)
254 job.add_audit_message("Create preservation package")
255 tar_file = package.create_package()
256 app.logger.debug(f"Created tar file {tar_file}")
258 job.add_audit_message("Create shasum")
259 sha256 = package.sha256()
261 job.add_audit_message("Upload package")
262 response = package.upload_package(sha256)
263 app.logger.debug(f"Uploaded. Response{response.text}")
265 job.add_audit_message("Validate response")
266 self.validate_response(response, tar_file, sha256, preserve_model)
268 # Check if the only few articles are successful
269 if articles_list.is_partial_success():
270 preserve_model.partial()
271 preserve_model.save()
272 else:
273 # If no previous errors found, check other failure reasons
274 if not preserve_model.error:
275 # Check if any articles available
276 if articles_list.get_count() == 0:
277 preserve_model.failed(FailedReasons.no_article_found)
278 preserve_model.save()
279 # All the articles available are invalid
280 else:
281 preserve_model.failed(FailedReasons.no_valid_article_available)
282 preserve_model.save()
284 except (PreservationException, Exception) as exp:
285 # ~~-> PreservationException:Exception~~
286 preserve_model.failed(str(exp))
287 preserve_model.save()
288 app.logger.exception("Error at background task")
289 raise
291 def save_articles_list(self, articles_list: ArticlesList, model: PreservationState):
292 """
293 Saves articles info to the model
294 :param articles_list: articles list
295 :param model: model object
296 """
297 if len(articles_list.successful_articles()) > 0:
298 model.successful_articles(articles_list.successful_articles())
299 if len(articles_list.not_found_articles()) > 0:
300 model.not_found_articles(articles_list.not_found_articles())
301 if len(articles_list.no_identifier_articles()) > 0:
302 model.no_identifier_articles(articles_list.no_identifier_articles())
303 if len(articles_list.no_identifier_articles()) == articles_list.get_count():
304 model.failed(FailedReasons.no_identifier)
305 if len(articles_list.unowned_articles()) > 0:
306 model.unowned_articles(articles_list.unowned_articles())
307 if len(articles_list.unbagged_articles()) > 0:
308 model.unbagged_articles(articles_list.unbagged_articles())
309 if len(articles_list.no_files_articles()) > 0:
310 model.no_files_articles(articles_list.no_files_articles())
311 model.save()
313 def cleanup(self):
314 """
315 Cleanup any resources
316 :return:
317 """
318 job = self.background_job
319 params = job.params
320 local_dir = self.get_param(params, "local_dir")
321 Preservation.delete_local_directory(local_dir)
323 def validate_response(self, response, tar_file, sha256, model):
324 """
325 Validate the response from server
326 :param response: response object
327 :param tar_file: tar file name
328 :param sha256: sha256sum value
329 :param model: model object to update status
330 """
331 if response.status_code == 200:
332 res_json = json.loads(response.text)
333 files = res_json["files"]
334 # Success response
335 # {"files": [{"name": "name_of_tarball.tar.gz",
336 # "sha256": "decafbad"}]}
337 if files:
338 # Check if the response is type dict or list
339 res_filename = None
340 res_shasum = None
341 if isinstance(files, dict):
342 res_filename = files["name"]
343 res_shasum = files["sha256"]
344 elif isinstance(files, list):
345 if len(files) > 0:
346 res_filename = files[0]["name"]
347 res_shasum = files[0]["sha256"]
349 if res_filename and res_filename == tar_file:
350 if res_shasum and res_shasum == sha256:
351 app.logger.info("successfully uploaded")
352 model.uploaded_to_ia()
353 else:
354 model.failed(FailedReasons.checksum_doesnot_match)
355 else:
356 model.failed(FailedReasons.tar_filename_doesnot_match)
358 else:
359 # Error response
360 # {"result": "ERROR","manifest_type": "BagIt",
361 # "manifests": [
362 # {"id": "033168cd016a49eb8c3097d800f1b85f",
363 # "result": "SUCCESS"},
364 # {"id": "00003741594643f4996e2555a01e03c7",
365 # "result": "ERROR",
366 # "errors": [
367 # "missing_files": [],
368 # "mismatch_hashes": [{
369 # "file": "path/to/file",
370 # "expected": "decafbad",
371 # "actual": "deadbeaf"}],
372 # "manifest_parsing_errors": [
373 # "some weird error"]]}]}
374 result = res_json["result"]
375 if result and result == "ERROR":
376 error_str = FailedReasons.error_response
377 else:
378 error_str = FailedReasons.unknown_error_response
380 app.logger.error(error_str)
381 model.failed(error_str)
383 model.save()
384 else:
385 app.logger.error(f"Upload failed {response.text}")
386 model.failed(response.text)
387 model.save()
389 @classmethod
390 def submit(cls, background_job):
391 """
392 Submit Background job"""
393 background_job.save(blocking=True)
394 preserve.schedule(args=(background_job.id,), delay=10)
397@main_queue.task(**configure("preserve"))
398@write_required(script=True)
399def preserve(job_id):
400 """~~-> PreservationBackgroundTask:Queue"""
401 job = BackgroundJob.pull(job_id)
402 task = PreservationBackgroundTask(job)
403 BackgroundApi.execute(task)
406class CSVReader:
407 """~~CSVReader:Feature~~"""
409 # column names for csv file.
410 # Given more identifiers just to handle any mistakes by user like empty identifiers
411 # Max expected identifier are 2 (Full Text URL, DOI) in any order
412 FIELD_DIR = "dir_name"
413 FIELDS = (FIELD_DIR, "id_1", "id_2", "id_3", "id_4")
415 def __init__(self, csv_file):
416 self.__csv_file = csv_file
418 def articles_info(self):
419 """
420 Reads the csv file and returns dictionary with first column(directory name) as keys
421 and remaining columns as array elements.
423 Ex: {'article_1': ['http://link.springer.com/article/10.1186/s40478-018-0619-9',
424 '10.1136/bmjophth-2021-000774'], 'article_2': ['10.1136/bmjophth-2021-000775']}
426 :return: Dictionary with articles info
427 """
428 data = {}
430 with open(self.__csv_file, mode='r', encoding='utf-8-sig') as file:
431 reader = csv.DictReader(file, CSVReader.FIELDS)
432 for row in reader:
433 dir_name = row[CSVReader.FIELD_DIR]
434 # Remove first column so it will not be part of iteration later
435 row.pop(CSVReader.FIELD_DIR)
436 data[dir_name] = []
437 for key in row.keys():
438 if row[key]:
439 data[dir_name].append(row[key])
440 return data
443class Preservation:
444 """~~Preservation:Feature~~"""
446 # Zip file name to download the zip file to temp directory
447 ARTICLES_ZIP_NAME = "articles.zip"
448 # Identifier file name
449 IDENTIFIER_FILE = "identifier.txt"
450 # CSV file for identifiers
451 IDENTIFIERS_CSV = "identifiers.csv"
452 # Temp directory
453 UPLOAD_DIR = app.config.get("UPLOAD_DIR", ".")
455 def __init__(self, local_dir, owner):
456 self.__dir_name = os.path.basename(local_dir)
457 self.__local_dir = os.path.join(local_dir, "tmp")
458 self.__preservation_dir = os.path.join(local_dir, self.__dir_name)
459 self.__csv_articles_dict = None
460 self.__owner = owner
461 self.upload_filename = None
463 @property
464 def dir_name(self):
465 return self.__dir_name
467 @property
468 def preservation_dir(self):
469 return self.__preservation_dir
471 def create_local_directories(self):
472 """
473 Create local directories to download the files and
474 to create preservation package
475 """
476 try:
477 os.makedirs(self.__local_dir, exist_ok=True)
478 os.makedirs(self.__preservation_dir, exist_ok=True)
479 except OSError:
480 raise PreservationStorageException("Could not create temp directory")
482 @classmethod
483 def delete_local_directory(cls, local_dir):
484 """
485 Deletes the directory
486 """
487 if os.path.exists(local_dir):
488 try:
489 shutil.rmtree(local_dir)
490 except Exception:
491 raise PreservationStorageException("Could not delete Temp directory")
493 def save_file(self, file):
494 """
495 Save the file on to local directory
496 :param file: File object
497 """
498 self.create_local_directories()
499 file_path = os.path.join(self.__local_dir, Preservation.ARTICLES_ZIP_NAME)
500 try:
501 file.save(file_path)
502 except Exception:
503 raise PreservationStorageException("Could not save file in Temp directory")
505 def extract_zip_file(self):
506 """
507 Extracts zip file in the Temp directory
508 """
509 file_path = os.path.join(self.__local_dir, Preservation.ARTICLES_ZIP_NAME)
511 if os.path.exists(file_path):
512 with ZipFile(file_path, 'r') as zFile:
513 zFile.extractall(self.__local_dir)
514 else:
515 raise PreservationException(f"Could not find zip file at Temp directory {file_path}")
517 def create_package_structure(self) -> ArticlesList:
518 """
519 Create preservation package
521 Iterates through the sub directories.
522 Retrieve article info for each article.
523 Creates preservation directories
525 """
526 articles_list = ArticlesList()
528 for dir, subdirs, files in os.walk(self.__local_dir):
530 if dir == self.__local_dir:
531 continue
533 app.logger.debug("Directory : " + dir)
534 app.logger.debug("Sub Directories : " + str(subdirs))
535 app.logger.debug("Files : " + str(files))
537 # Fetch identifiers at the root directory
538 if os.path.dirname(dir) == self.__local_dir:
539 if Preservation.IDENTIFIERS_CSV in files:
540 # Get articles info from csv file
541 # ~~-> CSVReader:Feature~~
542 csv_reader = CSVReader(os.path.join(dir, Preservation.IDENTIFIERS_CSV))
543 self.__csv_articles_dict = csv_reader.articles_info()
544 # process only the directories that has articles
545 else:
546 self.__process_article(dir, files, articles_list)
548 return articles_list
550 def __process_article(self, dir_path, files, articles_list):
552 identifiers = None
553 dir_name = os.path.basename(dir_path)
554 package = ArticlePackage(dir_path, files)
556 if not os.path.dirname(dir_path) == self.__local_dir:
557 if not self.__has_article_files(files):
558 articles_list.add_no_files_articles(package)
559 return
561 # check if identifier file exist
562 if Preservation.IDENTIFIER_FILE in files:
563 with open(os.path.join(dir_path, Preservation.IDENTIFIER_FILE)) as file:
564 identifiers = file.read().splitlines()
565 elif self.__csv_articles_dict:
566 if dir_name in self.__csv_articles_dict:
567 identifiers = self.__csv_articles_dict[dir_name]
569 if identifiers:
570 article = self.get_article(identifiers)
572 if article:
573 article_data = article.data
575 if not self.owner_of_article(article):
576 articles_list.add_unowned_articles(package)
578 else:
579 issn, article_id, metadata_json = self.get_article_info(article_data)
580 try:
581 package = ArticlePackage(dir_path, files)
582 package.issn = issn
583 package.article_id = article_id
584 package.metadata = metadata_json
585 package.package_dir = self.__preservation_dir
587 package.create_article_bagit_structure()
589 articles_list.add_successful_article(package)
590 except Exception:
591 articles_list.add_unbagged_articles(package)
592 app.logger.exception(f"Error while create article ( {article_id} ) package")
594 else:
595 # skip the article if not found
596 app.logger.error(f"Could not retrieve article for identifier(s) {identifiers}")
597 articles_list.add_not_found_articles(package)
599 else:
600 # did not find any identifier for article
601 articles_list.add_no_identifier_articles(package)
603 def __has_article_files(self, files):
604 """
605 Checks if any article files available
606 :param files:
607 :return: True if files available otherwise returns False
608 """
609 no_of_files = len(files)
610 if Preservation.IDENTIFIER_FILE in files:
611 if no_of_files > 1:
612 return True
613 else:
614 return False
615 else:
616 if no_of_files > 0:
617 return True
618 else:
619 return False
621 def owner_of_article(self, article):
622 """
623 Checks if the article is owned by the user
624 :param article:
625 :return:
626 """
627 articleService = DOAJ.articleService()
628 account = Account.pull(self.__owner)
629 is_owner = articleService.has_permissions(account, article, True)
630 return is_owner
632 def get_article(self, identifiers):
633 """
634 Checks if the identifier is doi or full text
635 Pulls article related to the identifier
636 :param identifiers:
637 :return: article
638 """
639 article = None
640 for identifier in identifiers:
641 if DOI_COMPILED.match(identifier):
642 article = Article.pull_by_key("bibjson.identifier.id", identifier)
643 elif HTTP_URL_COMPILED.match(identifier):
644 article = Article.pull_by_key("bibjson.link.url", identifier)
645 if article:
646 return article
647 else:
648 return None
650 def get_article_info(self, article_json):
651 """
652 Returns article info
653 :param article_json:
654 :return: issn, article id, metadata json
655 """
657 metadata_json = self.get_metadata_json(article_json)
658 issn = article_json["bibjson"]["journal"]["issns"][0]
659 article_id = article_json["id"]
661 return issn, article_id, metadata_json
663 def get_metadata_json(self, article_json):
664 """
665 Returns metadata of article which is required for preservation
666 :return: metadata
667 """
668 # Remove unnecessary data
669 metadata = deepcopy(article_json)
670 metadata.pop("index")
671 metadata.pop("admin")
672 metadata.pop("es_type")
674 return metadata
677class PreservationPackage:
678 """~~PreservationPackage:Feature~~
679 Creates preservation package and upload to Internet Server
680 """
682 def __init__(self, directory, owner):
683 self.package_dir = directory
684 self.tar_file = self.package_dir + ".tar.gz"
685 self.tar_file_name = os.path.basename(self.tar_file)
686 self.__owner = owner
688 def create_package(self):
689 """
690 Creates tar file for the package directory
691 :return: tar file name
692 """
693 try:
694 with tarfile.open(self.tar_file, "w:gz") as tar:
695 tar.add(self.package_dir, arcname=os.path.basename(self.package_dir))
696 except Exception as exp:
697 app.logger.exception("Error creating tar file")
698 raise PreservationException("Error while creating the tar file")
700 return self.tar_file_name
702 def upload_package(self, sha256sum):
704 url = app.config.get("PRESERVATION_URL")
705 username = app.config.get("PRESERVATION_USERNAME")
706 password = app.config.get("PRESERVATION_PASSWD")
707 collection_dict = app.config.get("PRESERVATION_COLLECTION")
708 params = collection_dict[self.__owner]
709 collection = params[0]
710 collection_id = params[1]
712 file_name = os.path.basename(self.tar_file)
714 # payload for upload request
715 payload = {
716 'directories': file_name,
717 'org': 'DOAJ',
718 'client': 'DOAJ_CLI',
719 'username': 'doaj_uploader',
720 'size': '',
721 'organization': '1',
722 'orgname': 'DOAJ',
723 'collection': collection_id,
724 'collname': collection,
725 'sha256sum': sha256sum
726 }
727 app.logger.info(payload)
729 headers = {}
730 # get the file to upload
731 try:
732 with open(self.tar_file, "rb") as f:
733 files = {'file_field': (file_name, f)}
734 response = requests.post(url, headers=headers, auth=(username, password), files=files, data=payload)
735 except (IOError, Exception) as exp:
736 app.logger.exception("Error opening the tar file")
737 raise PreservationException("Error Uploading tar file to IA server")
739 return response
741 def sha256(self):
742 """
743 Creates sha256 hash for the tar file
744 """
745 sha256_hash = hashlib.sha256()
747 with open(self.tar_file, "rb") as f:
748 # Read and update hash string value in blocks of 64K
749 for byte_block in iter(lambda: f.read(65536), b""):
750 sha256_hash.update(byte_block)
752 return sha256_hash.hexdigest()
755class FailedReasons:
756 no_identifier = "no_identifier"
757 unknown = "unknown"
758 checksum_doesnot_match = "checksum_doesnot_match"
759 no_article_found = "no_article_found"
760 no_valid_article_available = "no_valid_article_available"
761 tar_filename_doesnot_match = "response_tar_filename_doesnot_match"
762 error_response = "error_response"
763 unknown_error_response = "unknown_error_response"
764 collection_not_available = "collection_not_available"