Coverage for portality / tasks / harvester_helpers / workflow.py: 89%
85 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
1from portality.api.current import ArticlesCrudApi
2from portality.core import app
3from portality.lib import plugin, dates
4from portality.models import Journal, Account
5from portality.models.harvester import HarvesterProgressReport as Report
6from portality.models.harvester import HarvestState
7from portality.lib.dataobj import DataObjException
10SYSTEM_ACCOUNT = {
11 "email": app.config['ADMIN_EMAIL'],
12 "name": "harvest_background_job",
13 "role": ['admin'],
14 "id": "harvest_background_job"
15}
17sys_acc = Account(**SYSTEM_ACCOUNT)
20class DefaultLogger(object):
21 def __init__(self):
22 self._log = []
24 def log(self, msg):
25 self._log.append({
26 "timestamp": dates.now_str_with_microseconds(),
27 "message": msg
28 })
31class HarvesterWorkflow(object):
33 def __init__(self, custom_logger=None):
34 self.logger = DefaultLogger() if custom_logger is None else custom_logger
36 def _write_to_logger(self, msg):
37 self.logger.log(msg)
39 def process_account(self, account_id):
40 self._write_to_logger("Harvesting for Account:{x}".format(x=account_id))
41 issns = self.get_journals_issns(account_id)
42 self._write_to_logger("Account:{x} has {y} issns to harvest for: {z}".format(x=account_id, y=len(issns), z=",".join(issns)))
44 # now update the issn states
45 self.process_issn_states(account_id, issns)
47 for issn in issns:
48 self.process_issn(account_id, issn)
50 def get_journals_issns(self, account_id):
51 return Journal.issns_by_owner(account_id, in_doaj=True)
53 def process_issn_states(self, account_id, issns):
54 # first check that there are state records for all the provided issns,
55 # and that if they were deactivated they are now reactivated
56 for issn in issns:
57 state = HarvestState.find_by_issn(account_id, issn)
58 if state is not None:
59 if state.suspended:
60 state.reactivate()
61 state.save(blocking=True)
62 else:
63 state = HarvestState()
64 state.issn = issn
65 state.account = account_id
66 state.save(blocking=True)
68 # now check if there are are any other issns for this account that we haven't
69 # been provided - in that case they need to be deactivated
70 hss = [hs for hs in HarvestState.find_by_account(account_id)] # read straight away, as the iterator can timeout
71 for hs in hss:
72 if hs.issn not in issns and not hs.suspended:
73 hs.suspend()
74 hs.save(blocking=True)
76 def process_issn(self, account_id, issn):
77 self._write_to_logger("Processing ISSN:{x} for Account:{y}".format(y=account_id, x=issn))
79 state = HarvestState.find_by_issn(account_id, issn)
80 # if this issn is suspended, don't process it
81 if state.suspended:
82 return
83 Report.set_state_by_issn(issn, state)
85 try:
86 # get all the plugins that we need to run
87 harvesters = app.config.get("HARVESTERS", [])
88 for h in harvesters:
89 p = plugin.load_class(h)(self.logger)
90 p_name = p.get_name()
91 lh = state.get_last_harvest(p_name)
92 if lh is None:
93 lh = app.config.get("INITIAL_HARVEST_DATE")
94 Report.set_start_by_issn(p_name, issn, lh)
96 for article, lhd in p.iterate(issn, lh):
97 saved = self.process_article(account_id, article)
98 Report.increment_articles_processed(p_name)
100 # if the above worked, then we can update the harvest state
101 if saved:
102 state.set_harvested(p_name, lhd)
103 Report.increment_articles_saved_successfully(p_name)
104 except Exception:
105 self._write_to_logger("Exception Processing ISSN:{x} for Account:{y} ".format(y=account_id, x=issn))
106 raise
107 finally:
108 # once we've finished working with this issn, we should update the state
109 # this is especially true if there is an exception, as this will allow us
110 # to record where we got to, without having to do a save after each article
111 # create
112 state.save(blocking=True)
113 self._write_to_logger("Saved state record for ISSN:{x} for Account:{y}".format(y=account_id, x=issn))
115 def process_article(self, account_id, article):
116 try:
117 article.is_api_valid()
118 except DataObjException as e:
119 self._write_to_logger("Article for Account:{y} was not API valid ... skipping".format(y=account_id))
120 Report.record_error((article.get_identifier("doi") or "< DOI MISSING >") + " - " + str(e))
121 return False
123 try:
124 # Use the admin account to have sufficient rights to update full text or doi.
125 # Related github issue #3857
126 id = ArticlesCrudApi.create(article.data, sys_acc).id
127 except Exception as e:
128 self._write_to_logger("Article caused DOAJException: {m} ... skipping".format(m=e))
129 Report.record_error((article.get_identifier("doi") or "< DOI MISSING >"))
130 return False
131 self._write_to_logger("Created article in DOAJ for Account:{x} with ID: {y}".format(x=sys_acc.id, y=id))
132 return True