Coverage for portality/models/harvester.py: 98%

136 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-19 18:38 +0100

1from portality.lib import dataobj 

2from portality.dao import DomainObject 

3from portality.lib import dates, es_data_mapping 

4from portality.core import app 

5 

6 

7class HarvesterPlugin(object): 

8 def get_name(self): 

9 raise NotImplementedError() 

10 

11 def iterate(self, issn, since, to=None): 

12 """ 

13 Iterate over the records associated with the issn from "since" until "to" 

14 

15 This should return a generator (i.e. it should yield), and it needs to yeild a tuple 

16 containing (<doaj article>, <harvest date for this record>) 

17 

18 :param issn: 

19 :param since: 

20 :param to: 

21 :return: 

22 """ 

23 raise NotImplementedError() 

24 

25 

26class HarvestState(dataobj.DataObj, DomainObject): 

27 __type__ = 'harvester_state' 

28 

29 def __init__(self, **raw): 

30 if "_source" in raw: 

31 raw = raw["_source"] 

32 super(HarvestState, self).__init__(raw, HARVEST_STATE_STRUCT) 

33 

34 def mappings(self): 

35 return es_data_mapping.create_mapping(self.get_struct(), MAPPING_OPTS) 

36 

37 @classmethod 

38 def find_by_issn(cls, account, issn): 

39 q = ISSNQuery(account, issn) 

40 obs = cls.q2obj(q=q.query()) 

41 if len(obs) > 0: 

42 return obs[0] 

43 return None 

44 

45 @classmethod 

46 def find_by_account(cls, account): 

47 q = AccountQuery(account) 

48 # FIXME: in time we need to put scroll on the base DAO 

49 return cls.all(q=q.query()) 

50 # return cls.scroll(q=q.query()) 

51 

52 def _coerce_and_kwargs(self, path, dir): 

53 type, struct, instructions = dataobj.construct_lookup(path, self._struct) 

54 c = self._coerce_map.get(instructions.get("coerce", "unicode")) 

55 kwargs = dataobj.construct_kwargs(type, dir, instructions) 

56 return c, kwargs 

57 

58 @property 

59 def account(self): 

60 c, kwargs = self._coerce_and_kwargs("account", "get") 

61 return self._get_single("account", coerce=c, **kwargs) 

62 

63 @account.setter 

64 def account(self, val): 

65 c, kwargs = self._coerce_and_kwargs("account", "set") 

66 self._set_single("account", val, coerce=c, **kwargs) 

67 

68 @property 

69 def issn(self): 

70 c, kwargs = self._coerce_and_kwargs("issn", "get") 

71 return self._get_single("issn", coerce=c, **kwargs) 

72 

73 @issn.setter 

74 def issn(self, val): 

75 c, kwargs = self._coerce_and_kwargs("issn", "set") 

76 self._set_single("issn", val, coerce=c, **kwargs) 

77 

78 def suspend(self): 

79 self.status = "suspended" 

80 

81 @property 

82 def suspended(self): 

83 return self.status == "suspended" 

84 

85 @property 

86 def status(self): 

87 c, kwargs = self._coerce_and_kwargs("status", "get") 

88 return self._get_single("status", coerce=c, **kwargs) 

89 

90 @status.setter 

91 def status(self, val): 

92 c, kwargs = self._coerce_and_kwargs("status", "set") 

93 self._set_single("status", val, coerce=c, **kwargs) 

94 

95 def reactivate(self): 

96 self.status = "active" 

97 

98 def get_last_harvest(self, harvester_name): 

99 lhs = self._get_list("last_harvest") 

100 for lh in lhs: 

101 if lh.get("plugin") == harvester_name: 

102 return lh.get("date") 

103 return None 

104 

105 def set_harvested(self, harvester_name, last_harvest_date=None): 

106 # first ensure we have a last harvest date, and that it's in the right format 

107 if last_harvest_date is None: 

108 last_harvest_date = dates.now() 

109 last_harvest_date = dates.reformat(last_harvest_date) 

110 

111 self._delete_from_list("last_harvest", matchsub={"plugin" : harvester_name}) 

112 self._add_to_list("last_harvest", {"plugin" : harvester_name, "date" : last_harvest_date}) 

113 

114 def prep(self): 

115 if self.status is None: 

116 self.status = "active" 

117 

118 def save(self, *args, **kwargs): 

119 self.prep() 

120 super(HarvestState, self).save(*args, **kwargs) 

121 

122 

123class HarvesterProgressReport(object): 

124 current_states = {} 

125 last_harvest_dates_at_start_of_harvester = {} 

126 articles_processed = {} 

127 articles_saved_successfully = {} 

128 harvester_started = dates.now() 

129 error_messages = [] 

130 

131 @classmethod 

132 def set_start_by_issn(cls, plugin, issn, date): 

133 try: 

134 cls.last_harvest_dates_at_start_of_harvester[plugin][issn] = date 

135 except KeyError: 

136 cls.last_harvest_dates_at_start_of_harvester[plugin] = {issn: date} 

137 

138 @classmethod 

139 def set_state_by_issn(cls, issn, state): 

140 cls.current_states[issn] = state 

141 

142 @classmethod 

143 def increment_articles_processed(cls, plugin): 

144 try: 

145 cls.articles_processed[plugin] += 1 

146 except KeyError: 

147 cls.articles_processed[plugin] = 1 

148 

149 @classmethod 

150 def increment_articles_saved_successfully(cls, plugin): 

151 try: 

152 cls.articles_saved_successfully[plugin] += 1 

153 except KeyError: 

154 cls.articles_saved_successfully[plugin] = 1 

155 

156 @classmethod 

157 def record_error(cls, msg): 

158 cls.error_messages.append(msg) 

159 

160 @classmethod 

161 def write_report(cls): 

162 report = ["Harvester ran from {d1} to {d2}.".format(d1=cls.harvester_started, d2=dates.now())] 

163 for p_name in cls.last_harvest_dates_at_start_of_harvester.keys(): 

164 report.append("Plugin {p} harvested {n_total} articles. " 

165 "{n_succ} saved successfully to DOAJ; {n_fail} failed.".format( 

166 p=p_name, 

167 n_total=cls.articles_processed.get(p_name, 0), 

168 n_succ= cls.articles_saved_successfully.get(p_name, 0), 

169 n_fail=cls.articles_processed.get(p_name, 0) - cls.articles_saved_successfully.get(p_name, 0) 

170 )) 

171 

172 for issn in cls.last_harvest_dates_at_start_of_harvester[p_name].keys(): 

173 report.append("ISSN {i} processed period {d1} until {d2}.".format( 

174 i=issn, 

175 d1=cls.last_harvest_dates_at_start_of_harvester[p_name][issn], 

176 d2=cls.current_states[issn].get_last_harvest(p_name) 

177 )) 

178 report.append("Error messages/import failures:") 

179 report += cls.error_messages 

180 return "\n".join(report) 

181 

182 

183MAPPING_OPTS = { 

184 "dynamic": None, 

185 "coerces": app.config["DATAOBJ_TO_MAPPING_DEFAULTS"] 

186} 

187 

188 

189class ISSNQuery(object): 

190 def __init__(self, account, issn): 

191 self.issn = issn 

192 self.account = account 

193 

194 def query(self): 

195 return { 

196 "track_total_hits" : True, 

197 "query" : { 

198 "bool" : { 

199 "must" : [ 

200 {"term" : {"issn.exact" : self.issn}}, 

201 {"term" : {"account.exact" : self.account}} 

202 ] 

203 } 

204 } 

205 } 

206 

207class AccountQuery(object): 

208 def __init__(self, account): 

209 self.account = account 

210 

211 def query(self): 

212 return { 

213 "track_total_hits" : True, 

214 "query" : { 

215 "bool" : { 

216 "must" : [ 

217 {"term" : {"account.exact" : self.account}} 

218 ] 

219 } 

220 } 

221 } 

222 

223 

224HARVEST_STATE_STRUCT = { 

225 "fields" : { 

226 "id" : {"coerce" : "unicode"}, 

227 "last_updated" : {"coerce" : "utcdatetime"}, 

228 "created_date" : {"coerce" : "utcdatetime"}, 

229 "issn" : {"coerce" : "unicode"}, 

230 "status" : {"coerce" : "unicode", "allowed_values" : [u"suspended", u"active"]}, 

231 "account" : {"coerce" : "unicode"}, 

232 "es_type": {"coerce": "unicode"} 

233 }, 

234 "lists" : { 

235 "last_harvest" : {"contains" : "object"} 

236 }, 

237 

238 "structs" : { 

239 "last_harvest" : { 

240 "fields" : { 

241 "plugin" : {"coerce" : "unicode"}, 

242 "date" : {"coerce" : "utcdatetime"} 

243 } 

244 } 

245 } 

246}