Coverage for portality / tasks / harvester_helpers / workflow.py: 89%

85 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-04 09:41 +0100

1from portality.api.current import ArticlesCrudApi 

2from portality.core import app 

3from portality.lib import plugin, dates 

4from portality.models import Journal, Account 

5from portality.models.harvester import HarvesterProgressReport as Report 

6from portality.models.harvester import HarvestState 

7from portality.lib.dataobj import DataObjException 

8 

9 

10SYSTEM_ACCOUNT = { 

11 "email": app.config['ADMIN_EMAIL'], 

12 "name": "harvest_background_job", 

13 "role": ['admin'], 

14 "id": "harvest_background_job" 

15} 

16 

17sys_acc = Account(**SYSTEM_ACCOUNT) 

18 

19 

20class DefaultLogger(object): 

21 def __init__(self): 

22 self._log = [] 

23 

24 def log(self, msg): 

25 self._log.append({ 

26 "timestamp": dates.now_str_with_microseconds(), 

27 "message": msg 

28 }) 

29 

30 

31class HarvesterWorkflow(object): 

32 

33 def __init__(self, custom_logger=None): 

34 self.logger = DefaultLogger() if custom_logger is None else custom_logger 

35 

36 def _write_to_logger(self, msg): 

37 self.logger.log(msg) 

38 

39 def process_account(self, account_id): 

40 self._write_to_logger("Harvesting for Account:{x}".format(x=account_id)) 

41 issns = self.get_journals_issns(account_id) 

42 self._write_to_logger("Account:{x} has {y} issns to harvest for: {z}".format(x=account_id, y=len(issns), z=",".join(issns))) 

43 

44 # now update the issn states 

45 self.process_issn_states(account_id, issns) 

46 

47 for issn in issns: 

48 self.process_issn(account_id, issn) 

49 

50 def get_journals_issns(self, account_id): 

51 return Journal.issns_by_owner(account_id, in_doaj=True) 

52 

53 def process_issn_states(self, account_id, issns): 

54 # first check that there are state records for all the provided issns, 

55 # and that if they were deactivated they are now reactivated 

56 for issn in issns: 

57 state = HarvestState.find_by_issn(account_id, issn) 

58 if state is not None: 

59 if state.suspended: 

60 state.reactivate() 

61 state.save(blocking=True) 

62 else: 

63 state = HarvestState() 

64 state.issn = issn 

65 state.account = account_id 

66 state.save(blocking=True) 

67 

68 # now check if there are are any other issns for this account that we haven't 

69 # been provided - in that case they need to be deactivated 

70 hss = [hs for hs in HarvestState.find_by_account(account_id)] # read straight away, as the iterator can timeout 

71 for hs in hss: 

72 if hs.issn not in issns and not hs.suspended: 

73 hs.suspend() 

74 hs.save(blocking=True) 

75 

76 def process_issn(self, account_id, issn): 

77 self._write_to_logger("Processing ISSN:{x} for Account:{y}".format(y=account_id, x=issn)) 

78 

79 state = HarvestState.find_by_issn(account_id, issn) 

80 # if this issn is suspended, don't process it 

81 if state.suspended: 

82 return 

83 Report.set_state_by_issn(issn, state) 

84 

85 try: 

86 # get all the plugins that we need to run 

87 harvesters = app.config.get("HARVESTERS", []) 

88 for h in harvesters: 

89 p = plugin.load_class(h)(self.logger) 

90 p_name = p.get_name() 

91 lh = state.get_last_harvest(p_name) 

92 if lh is None: 

93 lh = app.config.get("INITIAL_HARVEST_DATE") 

94 Report.set_start_by_issn(p_name, issn, lh) 

95 

96 for article, lhd in p.iterate(issn, lh): 

97 saved = self.process_article(account_id, article) 

98 Report.increment_articles_processed(p_name) 

99 

100 # if the above worked, then we can update the harvest state 

101 if saved: 

102 state.set_harvested(p_name, lhd) 

103 Report.increment_articles_saved_successfully(p_name) 

104 except Exception: 

105 self._write_to_logger("Exception Processing ISSN:{x} for Account:{y} ".format(y=account_id, x=issn)) 

106 raise 

107 finally: 

108 # once we've finished working with this issn, we should update the state 

109 # this is especially true if there is an exception, as this will allow us 

110 # to record where we got to, without having to do a save after each article 

111 # create 

112 state.save(blocking=True) 

113 self._write_to_logger("Saved state record for ISSN:{x} for Account:{y}".format(y=account_id, x=issn)) 

114 

115 def process_article(self, account_id, article): 

116 try: 

117 article.is_api_valid() 

118 except DataObjException as e: 

119 self._write_to_logger("Article for Account:{y} was not API valid ... skipping".format(y=account_id)) 

120 Report.record_error((article.get_identifier("doi") or "< DOI MISSING >") + " - " + str(e)) 

121 return False 

122 

123 try: 

124 # Use the admin account to have sufficient rights to update full text or doi. 

125 # Related github issue #3857 

126 id = ArticlesCrudApi.create(article.data, sys_acc).id 

127 except Exception as e: 

128 self._write_to_logger("Article caused DOAJException: {m} ... skipping".format(m=e)) 

129 Report.record_error((article.get_identifier("doi") or "< DOI MISSING >")) 

130 return False 

131 self._write_to_logger("Created article in DOAJ for Account:{x} with ID: {y}".format(x=sys_acc.id, y=id)) 

132 return True