Coverage for portality/tasks/reporting.py: 92%

263 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-22 15:59 +0100

1from portality import models 

2from portality.lib import dates 

3from portality import datasets 

4from portality.core import app 

5 

6from portality.background import BackgroundApi, BackgroundTask 

7from portality.tasks.redis_huey import main_queue, schedule 

8from portality.app_email import email_archive 

9from portality.decorators import write_required 

10from portality.dao import ESMappingMissingError, ScrollInitialiseException 

11 

12import os, shutil, csv 

13 

14 

15def provenance_reports(fr, to, outdir): 

16 pipeline = [ 

17 ActionCounter("edit", "month"), 

18 ActionCounter("edit", "year"), 

19 StatusCounter("month"), 

20 StatusCounter("year") 

21 ] 

22 

23 q = ProvenanceList(fr, to) 

24 try: 

25 for prov in models.Provenance.iterate(q.query()): 

26 for filt in pipeline: 

27 filt.count(prov) 

28 except (ESMappingMissingError, ScrollInitialiseException): 

29 return None 

30 

31 outfiles = [] 

32 for p in pipeline: 

33 table = p.tabulate() 

34 outfile = os.path.join(outdir, p.filename(fr, to)) 

35 outfiles.append(outfile) 

36 with open(outfile, "w") as f: 

37 writer = csv.writer(f) 

38 for row in table: 

39 writer.writerow(row) 

40 

41 return outfiles 

42 

43 

44def content_reports(fr, to, outdir): 

45 report = {} 

46 

47 q = ContentByDate(fr, to) 

48 res = models.Suggestion.query(q=q.query()) 

49 year_buckets = res.get("aggregations", {}).get("years", {}).get("buckets", []) 

50 for years in year_buckets: 

51 ds = years.get("key_as_string") 

52 do = dates.parse(ds) 

53 year = do.year 

54 if year not in report: 

55 report[year] = {} 

56 country_buckets = years.get("countries", {}).get("buckets", []) 

57 for country in country_buckets: 

58 cc = country.get("key") 

59 cn = datasets.get_country_name(cc) 

60 if cn not in report[year]: 

61 report[year][cn] = {} 

62 count = country.get("doc_count") 

63 report[year][cn]["count"] = count 

64 

65 table = _tabulate_time_entity_group(report, "Country") 

66 

67 filename = "applications_by_year_by_country__" + _fft(fr) + "_to_" + _fft(to) + "__on_" + dates.today() + ".csv" 

68 outfiles = [] 

69 outfile = os.path.join(outdir, filename) 

70 outfiles.append(outfile) 

71 with open(outfile, "w", encoding="utf-8") as f: 

72 writer = csv.writer(f) 

73 for row in table: 

74 writer.writerow(row) 

75 

76 return outfiles 

77 

78 

79def _tabulate_time_entity_group(group, entityKey): 

80 date_keys_unsorted = group.keys() 

81 date_keys = sorted(date_keys_unsorted) 

82 table = [] 

83 padding = [] 

84 for db in date_keys: 

85 users_active_this_period = group[db].keys() 

86 for u in users_active_this_period: 

87 c = group[db][u]["count"] 

88 existing = False 

89 for row in table: 

90 if row[0] == u: 

91 row.append(c) 

92 existing = True 

93 if not existing: 

94 table.append([u] + padding + [c]) 

95 

96 # Add a 0 for each user who has been added to the table but doesn't 

97 # have any actions in the current time period we're looping over. E.g. 

98 # if we're counting edits by month, this would be "users who were active 

99 # in a previous month but haven't made any edits this month". 

100 users_in_table = set(map(lambda each_row: each_row[0], table)) 

101 previously_active_users = users_in_table - set(users_active_this_period) 

102 for row in table: 

103 if row[0] in previously_active_users: 

104 row.append(0) 

105 

106 # The following is only prefix padding. E.g. if "dom" started making edits in 

107 # Jan 2015 but "emanuil" only started in Mar 2015, then "emanuil" needs 

108 # 0s filled in for Jan + Feb 2015. 

109 padding.append(0) 

110 

111 for row in table: 

112 if len(row) < len(date_keys) + 1: 

113 row += [0] * (len(date_keys) - len(row) + 1) 

114 

115 table.sort(key=lambda user: user[0]) 

116 table = [[entityKey] + date_keys] + table 

117 return table 

118 

119 

120def _fft(timestamp): 

121 """File Friendly Timestamp - Windows doesn't appreciate : / etc in filenames; strip these out""" 

122 return dates.reformat(timestamp, app.config.get("DEFAULT_DATE_FORMAT"), "%Y-%m-%d") 

123 

124 

125class ReportCounter(object): 

126 def __init__(self, period): 

127 self.period = period 

128 

129 def _flatten_timestamp(self, ts): 

130 if self.period == "month": 

131 return ts.strftime("%Y-%m") 

132 elif self.period == "year": 

133 return ts.strftime("%Y") 

134 

135 def count(self, prov): 

136 raise NotImplementedError() 

137 

138 def tabulate(self): 

139 raise NotImplementedError() 

140 

141 def filename(self, fr, to): 

142 raise NotImplementedError() 

143 

144 

145class ActionCounter(ReportCounter): 

146 def __init__(self, action, period): 

147 self.action = action 

148 self.report = {} 

149 self._last_period = None 

150 super(ActionCounter, self).__init__(period) 

151 

152 def count(self, prov): 

153 if prov.action != self.action: 

154 return 

155 

156 p = self._flatten_timestamp(prov.created_timestamp) 

157 if p not in self.report: 

158 self.report[p] = {} 

159 

160 if prov.user not in self.report[p]: 

161 self.report[p][prov.user] = {"ids" : []} 

162 

163 if prov.resource_id not in self.report[p][prov.user]["ids"]: 

164 self.report[p][prov.user]["ids"].append(prov.resource_id) 

165 

166 if p != self._last_period: 

167 self._count_down(self._last_period) 

168 self._last_period = p 

169 

170 def tabulate(self): 

171 self._count_down(self._last_period) 

172 return _tabulate_time_entity_group(self.report, "User") 

173 

174 def filename(self, fr, to): 

175 return self.action + "_by_" + self.period + "__from_" + _fft(fr) + "_to_" + _fft(to) + "__on_" + dates.today() + ".csv" 

176 

177 def _count_down(self, p): 

178 if p is None: 

179 return 

180 for k in self.report[p].keys(): 

181 self.report[p][k]["count"] = len(self.report[p][k]["ids"]) 

182 del self.report[p][k]["ids"] 

183 

184 

185class StatusCounter(ReportCounter): 

186 def __init__(self, period): 

187 self.report = {} 

188 self._last_period = None 

189 super(StatusCounter, self).__init__(period) 

190 

191 def count(self, prov): 

192 if not prov.action.startswith("status:"): 

193 return 

194 

195 best_role = self._get_best_role(prov.roles) 

196 countable = self._is_countable(prov, best_role) 

197 

198 if not countable: 

199 return 

200 

201 p = self._flatten_timestamp(prov.created_timestamp) 

202 if p not in self.report: 

203 self.report[p] = {} 

204 

205 if prov.user not in self.report[p]: 

206 self.report[p][prov.user] = {"ids" : []} 

207 

208 if prov.resource_id not in self.report[p][prov.user]["ids"]: 

209 self.report[p][prov.user]["ids"].append(prov.resource_id) 

210 

211 if p != self._last_period: 

212 self._count_down(self._last_period) 

213 self._last_period = p 

214 

215 @staticmethod 

216 def _get_best_role(roles): 

217 role_precedence = ["associate_editor", "editor", "admin"] 

218 best_role = None 

219 for r in roles: 

220 try: 

221 if best_role is None and r in role_precedence: 

222 best_role = r 

223 if role_precedence.index(r) > role_precedence.index(best_role): 

224 best_role = r 

225 except ValueError: 

226 pass # The user has a role not in our precedence list (e.g. api) - ignore it. 

227 

228 return best_role 

229 

230 @staticmethod 

231 def _is_countable(prov, role): 

232 """ Determine whether to include this provenance record in the report""" 

233 

234 """ 

235 # We now disregard role and count all completion events per user https://github.com/DOAJ/doaj/issues/1385 

236 countable = False 

237 

238 if role == "admin" and (prov.action == "status:accepted" or prov.action == "status:rejected"): 

239 countable = True 

240 elif role == "editor" and prov.action == "status:ready": 

241 countable = True 

242 elif role == "associate_editor" and prov.action == "status:completed": 

243 countable = True 

244 """ 

245 

246 return prov.action in ["status:accepted", "status:rejected", "status:ready", "status:completed"] 

247 

248 def tabulate(self): 

249 self._count_down(self._last_period) 

250 return _tabulate_time_entity_group(self.report, "User") 

251 

252 def filename(self, fr, to): 

253 return "completion_by_" + self.period + "__from_" + _fft(fr) + "_to_" + _fft(to) + "__on_" + dates.today() + ".csv" 

254 

255 def _count_down(self, p): 

256 if p is None: 

257 return 

258 for k in self.report[p].keys(): 

259 self.report[p][k]["count"] = len(self.report[p][k]["ids"]) 

260 del self.report[p][k]["ids"] 

261 

262 

263class ProvenanceList(object): 

264 def __init__(self, fr, to): 

265 self.fr = fr 

266 self.to = to 

267 

268 def query(self): 

269 return { 

270 "query" : { 

271 "bool" : { 

272 "must" : [ 

273 {"range" : {"created_date" : {"gt" : self.fr, "lte" : self.to}}} 

274 ] 

275 } 

276 }, 

277 "sort" : [{"created_date" : {"order" : "asc"}}] 

278 } 

279 

280 

281class ContentByDate(object): 

282 def __init__(self, fr, to): 

283 self.fr = fr 

284 self.to = to 

285 

286 def query(self): 

287 return { 

288 "query": { 

289 "bool": { 

290 "must": [ 

291 {"range": {"created_date": {"gt": self.fr, "lte": self.to}}} 

292 ] 

293 } 

294 }, 

295 "size": 0, 

296 "aggs": { 

297 "years": { 

298 "date_histogram": { 

299 "field": "created_date", 

300 "calendar_interval": "year" 

301 }, 

302 "aggs": { 

303 "countries": { 

304 "terms": { 

305 "field": "bibjson.publisher.country.exact", 

306 "size": 1000 

307 } 

308 } 

309 } 

310 } 

311 } 

312 } 

313 

314 

315######################################################### 

316# Background task implementation 

317 

318class ReportingBackgroundTask(BackgroundTask): 

319 

320 __action__ = "reporting" 

321 

322 def run(self): 

323 """ 

324 Execute the task as specified by the background_jon 

325 :return: 

326 """ 

327 job = self.background_job 

328 params = job.params 

329 

330 outdir = self.get_param(params, "outdir", "report_" + dates.today()) 

331 fr = self.get_param(params, "from", "1970-01-01T00:00:00Z") 

332 to = self.get_param(params, "to", dates.now()) 

333 

334 job.add_audit_message("Saving reports to " + outdir) 

335 if not os.path.exists(outdir): 

336 os.makedirs(outdir) 

337 

338 prov_outfiles = provenance_reports(fr, to, outdir) 

339 if prov_outfiles is None: 

340 job.add_audit_message("No provenance records found; no provenance reports will be recorded.") 

341 

342 cont_outfiles = content_reports(fr, to, outdir) 

343 refs = {} 

344 self.set_reference(refs, "provenance_outfiles", prov_outfiles) 

345 self.set_reference(refs, "content_outfiles", cont_outfiles) 

346 job.reference = refs 

347 

348 msg = "Generated reports for period {x} to {y}".format(x=fr, y=to) 

349 job.add_audit_message(msg) 

350 

351 send_email = self.get_param(params, "email", False) 

352 if send_email: 

353 ref_fr = dates.reformat(fr, app.config.get("DEFAULT_DATE_FORMAT"), "%Y-%m-%d") 

354 ref_to = dates.reformat(to, app.config.get("DEFAULT_DATE_FORMAT"), "%Y-%m-%d") 

355 archive_name = "reports_" + ref_fr + "_to_" + ref_to 

356 email_archive(outdir, archive_name) 

357 job.add_audit_message("email alert sent") 

358 else: 

359 job.add_audit_message("no email alert sent") 

360 

361 def cleanup(self): 

362 """ 

363 Cleanup after a successful OR failed run of the task 

364 :return: 

365 """ 

366 failed = self.background_job.is_failed() 

367 if not failed: 

368 return 

369 

370 params = self.background_job.params 

371 outdir = self.get_param(params, "outdir") 

372 

373 if outdir is not None and os.path.exists(outdir): 

374 shutil.rmtree(outdir) 

375 

376 self.background_job.add_audit_message(u"Deleted directory {x} due to job failure".format(x=outdir)) 

377 

378 @classmethod 

379 def prepare(cls, username, **kwargs): 

380 """ 

381 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob, 

382 or fail with a suitable exception 

383 

384 :param kwargs: arbitrary keyword arguments pertaining to this task type 

385 :return: a BackgroundJob instance representing this task 

386 """ 

387 

388 job = models.BackgroundJob() 

389 job.user = username 

390 job.action = cls.__action__ 

391 

392 params = {} 

393 cls.set_param(params, "outdir", kwargs.get("outdir", "report_" + dates.today())) 

394 cls.set_param(params, "from", kwargs.get("from_date", "1970-01-01T00:00:00Z")) 

395 cls.set_param(params, "to", kwargs.get("to_date", dates.now())) 

396 cls.set_param(params, "email", kwargs.get("email", False)) 

397 job.params = params 

398 

399 return job 

400 

401 @classmethod 

402 def submit(cls, background_job): 

403 """ 

404 Submit the specified BackgroundJob to the background queue 

405 

406 :param background_job: the BackgroundJob instance 

407 :return: 

408 """ 

409 background_job.save() 

410 run_reports.schedule(args=(background_job.id,), delay=10) 

411 

412 

413@main_queue.periodic_task(schedule("reporting")) 

414@write_required(script=True) 

415def scheduled_reports(): 

416 user = app.config.get("SYSTEM_USERNAME") 

417 mail = bool(app.config.get("REPORTS_EMAIL_TO", False)) # Send email if recipient configured 

418 outdir = app.config.get("REPORTS_BASE_DIR") 

419 outdir = os.path.join(outdir, dates.today()) 

420 job = ReportingBackgroundTask.prepare(user, outdir=outdir, email=mail) 

421 ReportingBackgroundTask.submit(job) 

422 

423 

424@main_queue.task() 

425@write_required(script=True) 

426def run_reports(job_id): 

427 job = models.BackgroundJob.pull(job_id) 

428 task = ReportingBackgroundTask(job) 

429 BackgroundApi.execute(task)