Coverage for portality / tasks / reporting.py: 92%

262 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-04 09:41 +0100

1import csv 

2import os 

3import shutil 

4 

5from portality import datasets 

6from portality import models 

7from portality.app_email import email_archive 

8from portality.background import BackgroundApi, BackgroundTask 

9from portality.core import app 

10from portality.dao import ESMappingMissingError, ScrollInitialiseException 

11from portality.lib import dates 

12from portality.lib.dates import DEFAULT_TIMESTAMP_VAL, FMT_DATE_STD, FMT_DATE_YM, FMT_YEAR, FMT_DATETIME_STD 

13from portality.tasks.helpers import background_helper 

14from portality.tasks.redis_huey import scheduled_short_queue as queue 

15 

16 

17def provenance_reports(fr, to, outdir): 

18 pipeline = [ 

19 ActionCounter("edit", "month"), 

20 ActionCounter("edit", "year"), 

21 StatusCounter("month"), 

22 StatusCounter("year") 

23 ] 

24 

25 q = ProvenanceList(fr, to) 

26 try: 

27 for prov in models.Provenance.iterate(q.query()): 

28 for filt in pipeline: 

29 filt.count(prov) 

30 except (ESMappingMissingError, ScrollInitialiseException): 

31 return None 

32 

33 outfiles = [] 

34 for p in pipeline: 

35 table = p.tabulate() 

36 outfile = os.path.join(outdir, p.filename(fr, to)) 

37 outfiles.append(outfile) 

38 with open(outfile, "w") as f: 

39 writer = csv.writer(f) 

40 for row in table: 

41 writer.writerow(row) 

42 

43 return outfiles 

44 

45 

46def content_reports(fr, to, outdir): 

47 report = {} 

48 

49 q = ContentByDate(fr, to) 

50 res = models.Suggestion.query(q=q.query()) 

51 year_buckets = res.get("aggregations", {}).get("years", {}).get("buckets", []) 

52 for years in year_buckets: 

53 ds = years.get("key_as_string") 

54 do = dates.parse(ds) 

55 year = do.year 

56 if year not in report: 

57 report[year] = {} 

58 country_buckets = years.get("countries", {}).get("buckets", []) 

59 for country in country_buckets: 

60 cc = country.get("key") 

61 cn = datasets.get_country_name(cc) 

62 if cn not in report[year]: 

63 report[year][cn] = {} 

64 count = country.get("doc_count") 

65 report[year][cn]["count"] = count 

66 

67 table = _tabulate_time_entity_group(report, "Country") 

68 

69 filename = "applications_by_year_by_country__" + _fft(fr) + "_to_" + _fft(to) + "__on_" + dates.today() + ".csv" 

70 outfiles = [] 

71 outfile = os.path.join(outdir, filename) 

72 outfiles.append(outfile) 

73 with open(outfile, "w", encoding="utf-8") as f: 

74 writer = csv.writer(f) 

75 for row in table: 

76 writer.writerow(row) 

77 

78 return outfiles 

79 

80 

81def _tabulate_time_entity_group(group, entityKey): 

82 date_keys_unsorted = group.keys() 

83 date_keys = sorted(date_keys_unsorted) 

84 table = [] 

85 padding = [] 

86 for db in date_keys: 

87 users_active_this_period = group[db].keys() 

88 for u in users_active_this_period: 

89 c = group[db][u]["count"] 

90 existing = False 

91 for row in table: 

92 if row[0] == u: 

93 row.append(c) 

94 existing = True 

95 if not existing: 

96 table.append([u] + padding + [c]) 

97 

98 # Add a 0 for each user who has been added to the table but doesn't 

99 # have any actions in the current time period we're looping over. E.g. 

100 # if we're counting edits by month, this would be "users who were active 

101 # in a previous month but haven't made any edits this month". 

102 users_in_table = set(map(lambda each_row: each_row[0], table)) 

103 previously_active_users = users_in_table - set(users_active_this_period) 

104 for row in table: 

105 if row[0] in previously_active_users: 

106 row.append(0) 

107 

108 # The following is only prefix padding. E.g. if "dom" started making edits in 

109 # Jan 2015 but "emanuil" only started in Mar 2015, then "emanuil" needs 

110 # 0s filled in for Jan + Feb 2015. 

111 padding.append(0) 

112 

113 for row in table: 

114 if len(row) < len(date_keys) + 1: 

115 row += [0] * (len(date_keys) - len(row) + 1) 

116 

117 table.sort(key=lambda user: user[0]) 

118 table = [[entityKey] + date_keys] + table 

119 return table 

120 

121 

122def _fft(timestamp): 

123 """File Friendly Timestamp - Windows doesn't appreciate : / etc in filenames; strip these out""" 

124 return dates.reformat(timestamp, FMT_DATETIME_STD, FMT_DATE_STD) 

125 

126 

127class ReportCounter(object): 

128 def __init__(self, period): 

129 self.period = period 

130 

131 def _flatten_timestamp(self, ts): 

132 if self.period == "month": 

133 return ts.strftime(FMT_DATE_YM) 

134 elif self.period == "year": 

135 return ts.strftime(FMT_YEAR) 

136 

137 def count(self, prov): 

138 raise NotImplementedError() 

139 

140 def tabulate(self): 

141 raise NotImplementedError() 

142 

143 def filename(self, fr, to): 

144 raise NotImplementedError() 

145 

146 

147class ActionCounter(ReportCounter): 

148 def __init__(self, action, period): 

149 self.action = action 

150 self.report = {} 

151 self._last_period = None 

152 super(ActionCounter, self).__init__(period) 

153 

154 def count(self, prov): 

155 if prov.action != self.action: 

156 return 

157 

158 p = self._flatten_timestamp(prov.created_timestamp) 

159 if p not in self.report: 

160 self.report[p] = {} 

161 

162 if prov.user not in self.report[p]: 

163 self.report[p][prov.user] = {"ids" : []} 

164 

165 if prov.resource_id not in self.report[p][prov.user]["ids"]: 

166 self.report[p][prov.user]["ids"].append(prov.resource_id) 

167 

168 if p != self._last_period: 

169 self._count_down(self._last_period) 

170 self._last_period = p 

171 

172 def tabulate(self): 

173 self._count_down(self._last_period) 

174 return _tabulate_time_entity_group(self.report, "User") 

175 

176 def filename(self, fr, to): 

177 return self.action + "_by_" + self.period + "__from_" + _fft(fr) + "_to_" + _fft(to) + "__on_" + dates.today() + ".csv" 

178 

179 def _count_down(self, p): 

180 if p is None: 

181 return 

182 for k in self.report[p].keys(): 

183 self.report[p][k]["count"] = len(self.report[p][k]["ids"]) 

184 del self.report[p][k]["ids"] 

185 

186 

187class StatusCounter(ReportCounter): 

188 def __init__(self, period): 

189 self.report = {} 

190 self._last_period = None 

191 super(StatusCounter, self).__init__(period) 

192 

193 def count(self, prov): 

194 if not prov.action.startswith("status:"): 

195 return 

196 

197 best_role = self._get_best_role(prov.roles) 

198 countable = self._is_countable(prov, best_role) 

199 

200 if not countable: 

201 return 

202 

203 p = self._flatten_timestamp(prov.created_timestamp) 

204 if p not in self.report: 

205 self.report[p] = {} 

206 

207 if prov.user not in self.report[p]: 

208 self.report[p][prov.user] = {"ids" : []} 

209 

210 if prov.resource_id not in self.report[p][prov.user]["ids"]: 

211 self.report[p][prov.user]["ids"].append(prov.resource_id) 

212 

213 if p != self._last_period: 

214 self._count_down(self._last_period) 

215 self._last_period = p 

216 

217 @staticmethod 

218 def _get_best_role(roles): 

219 role_precedence = ["associate_editor", "editor", "admin"] 

220 best_role = None 

221 for r in roles: 

222 try: 

223 if best_role is None and r in role_precedence: 

224 best_role = r 

225 if role_precedence.index(r) > role_precedence.index(best_role): 

226 best_role = r 

227 except ValueError: 

228 pass # The user has a role not in our precedence list (e.g. api) - ignore it. 

229 

230 return best_role 

231 

232 @staticmethod 

233 def _is_countable(prov, role): 

234 """ Determine whether to include this provenance record in the report""" 

235 

236 """ 

237 # We now disregard role and count all completion events per user https://github.com/DOAJ/doaj/issues/1385 

238 countable = False 

239 

240 if role == "admin" and (prov.action == "status:accepted" or prov.action == "status:rejected"): 

241 countable = True 

242 elif role == "editor" and prov.action == "status:ready": 

243 countable = True 

244 elif role == "associate_editor" and prov.action == "status:completed": 

245 countable = True 

246 """ 

247 

248 return prov.action in ["status:accepted", "status:rejected", "status:ready", "status:completed"] 

249 

250 def tabulate(self): 

251 self._count_down(self._last_period) 

252 return _tabulate_time_entity_group(self.report, "User") 

253 

254 def filename(self, fr, to): 

255 return "completion_by_" + self.period + "__from_" + _fft(fr) + "_to_" + _fft(to) + "__on_" + dates.today() + ".csv" 

256 

257 def _count_down(self, p): 

258 if p is None: 

259 return 

260 for k in self.report[p].keys(): 

261 self.report[p][k]["count"] = len(self.report[p][k]["ids"]) 

262 del self.report[p][k]["ids"] 

263 

264 

265class ProvenanceList(object): 

266 def __init__(self, fr, to): 

267 self.fr = fr 

268 self.to = to 

269 

270 def query(self): 

271 return { 

272 "query" : { 

273 "bool" : { 

274 "must" : [ 

275 {"range" : {"created_date" : {"gt" : self.fr, "lte" : self.to}}} 

276 ] 

277 } 

278 }, 

279 "sort" : [{"created_date" : {"order" : "asc"}}] 

280 } 

281 

282 

283class ContentByDate(object): 

284 def __init__(self, fr, to): 

285 self.fr = fr 

286 self.to = to 

287 

288 def query(self): 

289 return { 

290 "query": { 

291 "bool": { 

292 "must": [ 

293 {"range": {"created_date": {"gt": self.fr, "lte": self.to}}} 

294 ] 

295 } 

296 }, 

297 "size": 0, 

298 "aggs": { 

299 "years": { 

300 "date_histogram": { 

301 "field": "created_date", 

302 "calendar_interval": "year" 

303 }, 

304 "aggs": { 

305 "countries": { 

306 "terms": { 

307 "field": "bibjson.publisher.country.exact", 

308 "size": 1000 

309 } 

310 } 

311 } 

312 } 

313 } 

314 } 

315 

316 

317######################################################### 

318# Background task implementation 

319 

320class ReportingBackgroundTask(BackgroundTask): 

321 

322 __action__ = "reporting" 

323 

324 def run(self): 

325 """ 

326 Execute the task as specified by the background_jon 

327 :return: 

328 """ 

329 job = self.background_job 

330 params = job.params 

331 

332 outdir = self.get_param(params, "outdir", "report_" + dates.today()) 

333 fr = self.get_param(params, "from", DEFAULT_TIMESTAMP_VAL) 

334 to = self.get_param(params, "to", dates.now_str()) 

335 

336 job.add_audit_message("Saving reports to " + outdir) 

337 if not os.path.exists(outdir): 

338 os.makedirs(outdir) 

339 

340 prov_outfiles = provenance_reports(fr, to, outdir) 

341 if prov_outfiles is None: 

342 job.add_audit_message("No provenance records found; no provenance reports will be recorded.") 

343 

344 cont_outfiles = content_reports(fr, to, outdir) 

345 refs = {} 

346 self.set_reference(refs, "provenance_outfiles", prov_outfiles) 

347 self.set_reference(refs, "content_outfiles", cont_outfiles) 

348 job.reference = refs 

349 

350 msg = "Generated reports for period {x} to {y}".format(x=fr, y=to) 

351 job.add_audit_message(msg) 

352 

353 send_email = self.get_param(params, "email", False) 

354 if send_email: 

355 ref_fr = dates.reformat(fr, FMT_DATETIME_STD, FMT_DATE_STD) 

356 ref_to = dates.reformat(to, FMT_DATETIME_STD, FMT_DATE_STD) 

357 archive_name = "reports_" + ref_fr + "_to_" + ref_to 

358 email_archive(outdir, archive_name) 

359 job.add_audit_message("email alert sent") 

360 else: 

361 job.add_audit_message("no email alert sent") 

362 

363 def cleanup(self): 

364 """ 

365 Cleanup after a successful OR failed run of the task 

366 :return: 

367 """ 

368 failed = self.background_job.is_failed() 

369 if not failed: 

370 return 

371 

372 params = self.background_job.params 

373 outdir = self.get_param(params, "outdir") 

374 

375 if outdir is not None and os.path.exists(outdir): 

376 shutil.rmtree(outdir) 

377 

378 self.background_job.add_audit_message(u"Deleted directory {x} due to job failure".format(x=outdir)) 

379 

380 @classmethod 

381 def prepare(cls, username, **kwargs): 

382 """ 

383 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob, 

384 or fail with a suitable exception 

385 

386 :param kwargs: arbitrary keyword arguments pertaining to this task type 

387 :return: a BackgroundJob instance representing this task 

388 """ 

389 

390 

391 params = {} 

392 cls.set_param(params, "outdir", kwargs.get("outdir", "report_" + dates.today())) 

393 cls.set_param(params, "from", kwargs.get("from_date", DEFAULT_TIMESTAMP_VAL)) 

394 cls.set_param(params, "to", kwargs.get("to_date", dates.now_str())) 

395 cls.set_param(params, "email", kwargs.get("email", False)) 

396 job = background_helper.create_job(username, cls.__action__, 

397 queue_id=huey_helper.queue_id, 

398 params=params) 

399 

400 return job 

401 

402 @classmethod 

403 def submit(cls, background_job): 

404 """ 

405 Submit the specified BackgroundJob to the background queue 

406 

407 :param background_job: the BackgroundJob instance 

408 :return: 

409 """ 

410 background_job.save() 

411 run_reports.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) 

412 

413 

414huey_helper = ReportingBackgroundTask.create_huey_helper(queue) 

415 

416 

417@huey_helper.register_schedule 

418def scheduled_reports(): 

419 user = app.config.get("SYSTEM_USERNAME") 

420 mail = bool(app.config.get("REPORTS_EMAIL_TO", False)) # Send email if recipient configured 

421 outdir = app.config.get("REPORTS_BASE_DIR") 

422 outdir = os.path.join(outdir, dates.today()) 

423 job = ReportingBackgroundTask.prepare(user, outdir=outdir, email=mail) 

424 ReportingBackgroundTask.submit(job) 

425 

426 

427@huey_helper.register_execute(is_load_config=False) 

428def run_reports(job_id): 

429 job = models.BackgroundJob.pull(job_id) 

430 task = ReportingBackgroundTask(job) 

431 BackgroundApi.execute(task)