Coverage for portality/tasks/harvester_helpers/workflow.py: 89%

84 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-22 15:59 +0100

1from portality.api.current import ArticlesCrudApi 

2from portality.core import app 

3from portality.lib import plugin, dates 

4from portality.models import Journal, Account 

5from portality.models.harvester import HarvesterProgressReport as Report 

6from portality.models.harvester import HarvestState 

7from portality.lib.dataobj import DataObjException 

8 

9 

10class DefaultLogger(object): 

11 def __init__(self): 

12 self._log = [] 

13 

14 def log(self, msg): 

15 self._log.append({ 

16 "timestamp": dates.now_with_microseconds(), 

17 "message": msg 

18 }) 

19 

20 

21class HarvesterWorkflow(object): 

22 

23 def __init__(self, custom_logger=None): 

24 self.logger = DefaultLogger() if custom_logger is None else custom_logger 

25 

26 def _write_to_logger(self, msg): 

27 self.logger.log(msg) 

28 

29 def process_account(self, account_id): 

30 self._write_to_logger("Harvesting for Account:{x}".format(x=account_id)) 

31 issns = self.get_journals_issns(account_id) 

32 self._write_to_logger("Account:{x} has {y} issns to harvest for: {z}".format(x=account_id, y=len(issns), z=",".join(issns))) 

33 

34 # now update the issn states 

35 self.process_issn_states(account_id, issns) 

36 

37 for issn in issns: 

38 self.process_issn(account_id, issn) 

39 

40 def get_journals_issns(self, account_id): 

41 return Journal.issns_by_owner(account_id, in_doaj=True) 

42 

43 def process_issn_states(self, account_id, issns): 

44 # first check that there are state records for all the provided issns, 

45 # and that if they were deactivated they are now reactivated 

46 for issn in issns: 

47 state = HarvestState.find_by_issn(account_id, issn) 

48 if state is not None: 

49 if state.suspended: 

50 state.reactivate() 

51 state.save(blocking=True) 

52 else: 

53 state = HarvestState() 

54 state.issn = issn 

55 state.account = account_id 

56 state.save(blocking=True) 

57 

58 # now check if there are are any other issns for this account that we haven't 

59 # been provided - in that case they need to be deactivated 

60 hss = [hs for hs in HarvestState.find_by_account(account_id)] # read straight away, as the iterator can timeout 

61 for hs in hss: 

62 if hs.issn not in issns and not hs.suspended: 

63 hs.suspend() 

64 hs.save(blocking=True) 

65 

66 def process_issn(self, account_id, issn): 

67 self._write_to_logger("Processing ISSN:{x} for Account:{y}".format(y=account_id, x=issn)) 

68 

69 state = HarvestState.find_by_issn(account_id, issn) 

70 # if this issn is suspended, don't process it 

71 if state.suspended: 

72 return 

73 Report.set_state_by_issn(issn, state) 

74 

75 try: 

76 # get all the plugins that we need to run 

77 harvesters = app.config.get("HARVESTERS", []) 

78 for h in harvesters: 

79 p = plugin.load_class(h)(self.logger) 

80 p_name = p.get_name() 

81 lh = state.get_last_harvest(p_name) 

82 if lh is None: 

83 lh = app.config.get("INITIAL_HARVEST_DATE") 

84 Report.set_start_by_issn(p_name, issn, lh) 

85 

86 for article, lhd in p.iterate(issn, lh): 

87 saved = self.process_article(account_id, article) 

88 Report.increment_articles_processed(p_name) 

89 

90 # if the above worked, then we can update the harvest state 

91 if saved: 

92 state.set_harvested(p_name, lhd) 

93 Report.increment_articles_saved_successfully(p_name) 

94 except Exception: 

95 self._write_to_logger("Exception Processing ISSN:{x} for Account:{y} ".format(y=account_id, x=issn)) 

96 raise 

97 finally: 

98 # once we've finished working with this issn, we should update the state 

99 # this is especially true if there is an exception, as this will allow us 

100 # to record where we got to, without having to do a save after each article 

101 # create 

102 state.save(blocking=True) 

103 self._write_to_logger("Saved state record for ISSN:{x} for Account:{y}".format(y=account_id, x=issn)) 

104 

105 def process_article(self, account_id, article): 

106 try: 

107 article.is_api_valid() 

108 except DataObjException as e: 

109 self._write_to_logger("Article for Account:{y} was not API valid ... skipping".format(y=account_id)) 

110 Report.record_error((article.get_identifier("doi") or "< DOI MISSING >") + " - " + str(e)) 

111 return False 

112 

113 acc = Account.pull(account_id) 

114 try: 

115 id = ArticlesCrudApi.create(article.data, acc).id 

116 except Exception as e: 

117 self._write_to_logger("Article caused DOAJException: {m} ... skipping".format(m=e)) 

118 Report.record_error((article.get_identifier("doi") or "< DOI MISSING >")) 

119 return False 

120 self._write_to_logger("Created article in DOAJ for Account:{x} with ID: {y}".format(x=account_id, y=id)) 

121 return True