Coverage for portality/tasks/harvester_helpers/workflow.py: 89%
84 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-22 15:59 +0100
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-22 15:59 +0100
1from portality.api.current import ArticlesCrudApi
2from portality.core import app
3from portality.lib import plugin, dates
4from portality.models import Journal, Account
5from portality.models.harvester import HarvesterProgressReport as Report
6from portality.models.harvester import HarvestState
7from portality.lib.dataobj import DataObjException
10class DefaultLogger(object):
11 def __init__(self):
12 self._log = []
14 def log(self, msg):
15 self._log.append({
16 "timestamp": dates.now_with_microseconds(),
17 "message": msg
18 })
21class HarvesterWorkflow(object):
23 def __init__(self, custom_logger=None):
24 self.logger = DefaultLogger() if custom_logger is None else custom_logger
26 def _write_to_logger(self, msg):
27 self.logger.log(msg)
29 def process_account(self, account_id):
30 self._write_to_logger("Harvesting for Account:{x}".format(x=account_id))
31 issns = self.get_journals_issns(account_id)
32 self._write_to_logger("Account:{x} has {y} issns to harvest for: {z}".format(x=account_id, y=len(issns), z=",".join(issns)))
34 # now update the issn states
35 self.process_issn_states(account_id, issns)
37 for issn in issns:
38 self.process_issn(account_id, issn)
40 def get_journals_issns(self, account_id):
41 return Journal.issns_by_owner(account_id, in_doaj=True)
43 def process_issn_states(self, account_id, issns):
44 # first check that there are state records for all the provided issns,
45 # and that if they were deactivated they are now reactivated
46 for issn in issns:
47 state = HarvestState.find_by_issn(account_id, issn)
48 if state is not None:
49 if state.suspended:
50 state.reactivate()
51 state.save(blocking=True)
52 else:
53 state = HarvestState()
54 state.issn = issn
55 state.account = account_id
56 state.save(blocking=True)
58 # now check if there are are any other issns for this account that we haven't
59 # been provided - in that case they need to be deactivated
60 hss = [hs for hs in HarvestState.find_by_account(account_id)] # read straight away, as the iterator can timeout
61 for hs in hss:
62 if hs.issn not in issns and not hs.suspended:
63 hs.suspend()
64 hs.save(blocking=True)
66 def process_issn(self, account_id, issn):
67 self._write_to_logger("Processing ISSN:{x} for Account:{y}".format(y=account_id, x=issn))
69 state = HarvestState.find_by_issn(account_id, issn)
70 # if this issn is suspended, don't process it
71 if state.suspended:
72 return
73 Report.set_state_by_issn(issn, state)
75 try:
76 # get all the plugins that we need to run
77 harvesters = app.config.get("HARVESTERS", [])
78 for h in harvesters:
79 p = plugin.load_class(h)(self.logger)
80 p_name = p.get_name()
81 lh = state.get_last_harvest(p_name)
82 if lh is None:
83 lh = app.config.get("INITIAL_HARVEST_DATE")
84 Report.set_start_by_issn(p_name, issn, lh)
86 for article, lhd in p.iterate(issn, lh):
87 saved = self.process_article(account_id, article)
88 Report.increment_articles_processed(p_name)
90 # if the above worked, then we can update the harvest state
91 if saved:
92 state.set_harvested(p_name, lhd)
93 Report.increment_articles_saved_successfully(p_name)
94 except Exception:
95 self._write_to_logger("Exception Processing ISSN:{x} for Account:{y} ".format(y=account_id, x=issn))
96 raise
97 finally:
98 # once we've finished working with this issn, we should update the state
99 # this is especially true if there is an exception, as this will allow us
100 # to record where we got to, without having to do a save after each article
101 # create
102 state.save(blocking=True)
103 self._write_to_logger("Saved state record for ISSN:{x} for Account:{y}".format(y=account_id, x=issn))
105 def process_article(self, account_id, article):
106 try:
107 article.is_api_valid()
108 except DataObjException as e:
109 self._write_to_logger("Article for Account:{y} was not API valid ... skipping".format(y=account_id))
110 Report.record_error((article.get_identifier("doi") or "< DOI MISSING >") + " - " + str(e))
111 return False
113 acc = Account.pull(account_id)
114 try:
115 id = ArticlesCrudApi.create(article.data, acc).id
116 except Exception as e:
117 self._write_to_logger("Article caused DOAJException: {m} ... skipping".format(m=e))
118 Report.record_error((article.get_identifier("doi") or "< DOI MISSING >"))
119 return False
120 self._write_to_logger("Created article in DOAJ for Account:{x} with ID: {y}".format(x=account_id, y=id))
121 return True