Coverage for portality/models/harvester.py: 98%
136 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-22 15:59 +0100
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-22 15:59 +0100
1from portality.lib import dataobj
2from portality.dao import DomainObject
3from portality.lib import dates, es_data_mapping
4from portality.core import app
7class HarvesterPlugin(object):
8 def get_name(self):
9 raise NotImplementedError()
11 def iterate(self, issn, since, to=None):
12 """
13 Iterate over the records associated with the issn from "since" until "to"
15 This should return a generator (i.e. it should yield), and it needs to yeild a tuple
16 containing (<doaj article>, <harvest date for this record>)
18 :param issn:
19 :param since:
20 :param to:
21 :return:
22 """
23 raise NotImplementedError()
26class HarvestState(dataobj.DataObj, DomainObject):
27 __type__ = 'harvester_state'
29 def __init__(self, **raw):
30 if "_source" in raw:
31 raw = raw["_source"]
32 super(HarvestState, self).__init__(raw, HARVEST_STATE_STRUCT)
34 def mappings(self):
35 return es_data_mapping.create_mapping(self.get_struct(), MAPPING_OPTS)
37 @classmethod
38 def find_by_issn(cls, account, issn):
39 q = ISSNQuery(account, issn)
40 obs = cls.q2obj(q=q.query())
41 if len(obs) > 0:
42 return obs[0]
43 return None
45 @classmethod
46 def find_by_account(cls, account):
47 q = AccountQuery(account)
48 # FIXME: in time we need to put scroll on the base DAO
49 return cls.all(q=q.query())
50 # return cls.scroll(q=q.query())
52 def _coerce_and_kwargs(self, path, dir):
53 type, struct, instructions = dataobj.construct_lookup(path, self._struct)
54 c = self._coerce_map.get(instructions.get("coerce", "unicode"))
55 kwargs = dataobj.construct_kwargs(type, dir, instructions)
56 return c, kwargs
58 @property
59 def account(self):
60 c, kwargs = self._coerce_and_kwargs("account", "get")
61 return self._get_single("account", coerce=c, **kwargs)
63 @account.setter
64 def account(self, val):
65 c, kwargs = self._coerce_and_kwargs("account", "set")
66 self._set_single("account", val, coerce=c, **kwargs)
68 @property
69 def issn(self):
70 c, kwargs = self._coerce_and_kwargs("issn", "get")
71 return self._get_single("issn", coerce=c, **kwargs)
73 @issn.setter
74 def issn(self, val):
75 c, kwargs = self._coerce_and_kwargs("issn", "set")
76 self._set_single("issn", val, coerce=c, **kwargs)
78 def suspend(self):
79 self.status = "suspended"
81 @property
82 def suspended(self):
83 return self.status == "suspended"
85 @property
86 def status(self):
87 c, kwargs = self._coerce_and_kwargs("status", "get")
88 return self._get_single("status", coerce=c, **kwargs)
90 @status.setter
91 def status(self, val):
92 c, kwargs = self._coerce_and_kwargs("status", "set")
93 self._set_single("status", val, coerce=c, **kwargs)
95 def reactivate(self):
96 self.status = "active"
98 def get_last_harvest(self, harvester_name):
99 lhs = self._get_list("last_harvest")
100 for lh in lhs:
101 if lh.get("plugin") == harvester_name:
102 return lh.get("date")
103 return None
105 def set_harvested(self, harvester_name, last_harvest_date=None):
106 # first ensure we have a last harvest date, and that it's in the right format
107 if last_harvest_date is None:
108 last_harvest_date = dates.now()
109 last_harvest_date = dates.reformat(last_harvest_date)
111 self._delete_from_list("last_harvest", matchsub={"plugin" : harvester_name})
112 self._add_to_list("last_harvest", {"plugin" : harvester_name, "date" : last_harvest_date})
114 def prep(self):
115 if self.status is None:
116 self.status = "active"
118 def save(self, *args, **kwargs):
119 self.prep()
120 super(HarvestState, self).save(*args, **kwargs)
123class HarvesterProgressReport(object):
124 current_states = {}
125 last_harvest_dates_at_start_of_harvester = {}
126 articles_processed = {}
127 articles_saved_successfully = {}
128 harvester_started = dates.now()
129 error_messages = []
131 @classmethod
132 def set_start_by_issn(cls, plugin, issn, date):
133 try:
134 cls.last_harvest_dates_at_start_of_harvester[plugin][issn] = date
135 except KeyError:
136 cls.last_harvest_dates_at_start_of_harvester[plugin] = {issn: date}
138 @classmethod
139 def set_state_by_issn(cls, issn, state):
140 cls.current_states[issn] = state
142 @classmethod
143 def increment_articles_processed(cls, plugin):
144 try:
145 cls.articles_processed[plugin] += 1
146 except KeyError:
147 cls.articles_processed[plugin] = 1
149 @classmethod
150 def increment_articles_saved_successfully(cls, plugin):
151 try:
152 cls.articles_saved_successfully[plugin] += 1
153 except KeyError:
154 cls.articles_saved_successfully[plugin] = 1
156 @classmethod
157 def record_error(cls, msg):
158 cls.error_messages.append(msg)
160 @classmethod
161 def write_report(cls):
162 report = ["Harvester ran from {d1} to {d2}.".format(d1=cls.harvester_started, d2=dates.now())]
163 for p_name in cls.last_harvest_dates_at_start_of_harvester.keys():
164 report.append("Plugin {p} harvested {n_total} articles. "
165 "{n_succ} saved successfully to DOAJ; {n_fail} failed.".format(
166 p=p_name,
167 n_total=cls.articles_processed.get(p_name, 0),
168 n_succ= cls.articles_saved_successfully.get(p_name, 0),
169 n_fail=cls.articles_processed.get(p_name, 0) - cls.articles_saved_successfully.get(p_name, 0)
170 ))
172 for issn in cls.last_harvest_dates_at_start_of_harvester[p_name].keys():
173 report.append("ISSN {i} processed period {d1} until {d2}.".format(
174 i=issn,
175 d1=cls.last_harvest_dates_at_start_of_harvester[p_name][issn],
176 d2=cls.current_states[issn].get_last_harvest(p_name)
177 ))
178 report.append("Error messages/import failures:")
179 report += cls.error_messages
180 return "\n".join(report)
183MAPPING_OPTS = {
184 "dynamic": None,
185 "coerces": app.config["DATAOBJ_TO_MAPPING_DEFAULTS"]
186}
189class ISSNQuery(object):
190 def __init__(self, account, issn):
191 self.issn = issn
192 self.account = account
194 def query(self):
195 return {
196 "track_total_hits" : True,
197 "query" : {
198 "bool" : {
199 "must" : [
200 {"term" : {"issn.exact" : self.issn}},
201 {"term" : {"account.exact" : self.account}}
202 ]
203 }
204 }
205 }
207class AccountQuery(object):
208 def __init__(self, account):
209 self.account = account
211 def query(self):
212 return {
213 "track_total_hits" : True,
214 "query" : {
215 "bool" : {
216 "must" : [
217 {"term" : {"account.exact" : self.account}}
218 ]
219 }
220 }
221 }
224HARVEST_STATE_STRUCT = {
225 "fields" : {
226 "id" : {"coerce" : "unicode"},
227 "last_updated" : {"coerce" : "utcdatetime"},
228 "created_date" : {"coerce" : "utcdatetime"},
229 "issn" : {"coerce" : "unicode"},
230 "status" : {"coerce" : "unicode", "allowed_values" : [u"suspended", u"active"]},
231 "account" : {"coerce" : "unicode"},
232 "es_type": {"coerce": "unicode"}
233 },
234 "lists" : {
235 "last_harvest" : {"contains" : "object"}
236 },
238 "structs" : {
239 "last_harvest" : {
240 "fields" : {
241 "plugin" : {"coerce" : "unicode"},
242 "date" : {"coerce" : "utcdatetime"}
243 }
244 }
245 }
246}