Coverage for portality / tasks / reporting.py: 92%
262 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
1import csv
2import os
3import shutil
5from portality import datasets
6from portality import models
7from portality.app_email import email_archive
8from portality.background import BackgroundApi, BackgroundTask
9from portality.core import app
10from portality.dao import ESMappingMissingError, ScrollInitialiseException
11from portality.lib import dates
12from portality.lib.dates import DEFAULT_TIMESTAMP_VAL, FMT_DATE_STD, FMT_DATE_YM, FMT_YEAR, FMT_DATETIME_STD
13from portality.tasks.helpers import background_helper
14from portality.tasks.redis_huey import scheduled_short_queue as queue
17def provenance_reports(fr, to, outdir):
18 pipeline = [
19 ActionCounter("edit", "month"),
20 ActionCounter("edit", "year"),
21 StatusCounter("month"),
22 StatusCounter("year")
23 ]
25 q = ProvenanceList(fr, to)
26 try:
27 for prov in models.Provenance.iterate(q.query()):
28 for filt in pipeline:
29 filt.count(prov)
30 except (ESMappingMissingError, ScrollInitialiseException):
31 return None
33 outfiles = []
34 for p in pipeline:
35 table = p.tabulate()
36 outfile = os.path.join(outdir, p.filename(fr, to))
37 outfiles.append(outfile)
38 with open(outfile, "w") as f:
39 writer = csv.writer(f)
40 for row in table:
41 writer.writerow(row)
43 return outfiles
46def content_reports(fr, to, outdir):
47 report = {}
49 q = ContentByDate(fr, to)
50 res = models.Suggestion.query(q=q.query())
51 year_buckets = res.get("aggregations", {}).get("years", {}).get("buckets", [])
52 for years in year_buckets:
53 ds = years.get("key_as_string")
54 do = dates.parse(ds)
55 year = do.year
56 if year not in report:
57 report[year] = {}
58 country_buckets = years.get("countries", {}).get("buckets", [])
59 for country in country_buckets:
60 cc = country.get("key")
61 cn = datasets.get_country_name(cc)
62 if cn not in report[year]:
63 report[year][cn] = {}
64 count = country.get("doc_count")
65 report[year][cn]["count"] = count
67 table = _tabulate_time_entity_group(report, "Country")
69 filename = "applications_by_year_by_country__" + _fft(fr) + "_to_" + _fft(to) + "__on_" + dates.today() + ".csv"
70 outfiles = []
71 outfile = os.path.join(outdir, filename)
72 outfiles.append(outfile)
73 with open(outfile, "w", encoding="utf-8") as f:
74 writer = csv.writer(f)
75 for row in table:
76 writer.writerow(row)
78 return outfiles
81def _tabulate_time_entity_group(group, entityKey):
82 date_keys_unsorted = group.keys()
83 date_keys = sorted(date_keys_unsorted)
84 table = []
85 padding = []
86 for db in date_keys:
87 users_active_this_period = group[db].keys()
88 for u in users_active_this_period:
89 c = group[db][u]["count"]
90 existing = False
91 for row in table:
92 if row[0] == u:
93 row.append(c)
94 existing = True
95 if not existing:
96 table.append([u] + padding + [c])
98 # Add a 0 for each user who has been added to the table but doesn't
99 # have any actions in the current time period we're looping over. E.g.
100 # if we're counting edits by month, this would be "users who were active
101 # in a previous month but haven't made any edits this month".
102 users_in_table = set(map(lambda each_row: each_row[0], table))
103 previously_active_users = users_in_table - set(users_active_this_period)
104 for row in table:
105 if row[0] in previously_active_users:
106 row.append(0)
108 # The following is only prefix padding. E.g. if "dom" started making edits in
109 # Jan 2015 but "emanuil" only started in Mar 2015, then "emanuil" needs
110 # 0s filled in for Jan + Feb 2015.
111 padding.append(0)
113 for row in table:
114 if len(row) < len(date_keys) + 1:
115 row += [0] * (len(date_keys) - len(row) + 1)
117 table.sort(key=lambda user: user[0])
118 table = [[entityKey] + date_keys] + table
119 return table
122def _fft(timestamp):
123 """File Friendly Timestamp - Windows doesn't appreciate : / etc in filenames; strip these out"""
124 return dates.reformat(timestamp, FMT_DATETIME_STD, FMT_DATE_STD)
127class ReportCounter(object):
128 def __init__(self, period):
129 self.period = period
131 def _flatten_timestamp(self, ts):
132 if self.period == "month":
133 return ts.strftime(FMT_DATE_YM)
134 elif self.period == "year":
135 return ts.strftime(FMT_YEAR)
137 def count(self, prov):
138 raise NotImplementedError()
140 def tabulate(self):
141 raise NotImplementedError()
143 def filename(self, fr, to):
144 raise NotImplementedError()
147class ActionCounter(ReportCounter):
148 def __init__(self, action, period):
149 self.action = action
150 self.report = {}
151 self._last_period = None
152 super(ActionCounter, self).__init__(period)
154 def count(self, prov):
155 if prov.action != self.action:
156 return
158 p = self._flatten_timestamp(prov.created_timestamp)
159 if p not in self.report:
160 self.report[p] = {}
162 if prov.user not in self.report[p]:
163 self.report[p][prov.user] = {"ids" : []}
165 if prov.resource_id not in self.report[p][prov.user]["ids"]:
166 self.report[p][prov.user]["ids"].append(prov.resource_id)
168 if p != self._last_period:
169 self._count_down(self._last_period)
170 self._last_period = p
172 def tabulate(self):
173 self._count_down(self._last_period)
174 return _tabulate_time_entity_group(self.report, "User")
176 def filename(self, fr, to):
177 return self.action + "_by_" + self.period + "__from_" + _fft(fr) + "_to_" + _fft(to) + "__on_" + dates.today() + ".csv"
179 def _count_down(self, p):
180 if p is None:
181 return
182 for k in self.report[p].keys():
183 self.report[p][k]["count"] = len(self.report[p][k]["ids"])
184 del self.report[p][k]["ids"]
187class StatusCounter(ReportCounter):
188 def __init__(self, period):
189 self.report = {}
190 self._last_period = None
191 super(StatusCounter, self).__init__(period)
193 def count(self, prov):
194 if not prov.action.startswith("status:"):
195 return
197 best_role = self._get_best_role(prov.roles)
198 countable = self._is_countable(prov, best_role)
200 if not countable:
201 return
203 p = self._flatten_timestamp(prov.created_timestamp)
204 if p not in self.report:
205 self.report[p] = {}
207 if prov.user not in self.report[p]:
208 self.report[p][prov.user] = {"ids" : []}
210 if prov.resource_id not in self.report[p][prov.user]["ids"]:
211 self.report[p][prov.user]["ids"].append(prov.resource_id)
213 if p != self._last_period:
214 self._count_down(self._last_period)
215 self._last_period = p
217 @staticmethod
218 def _get_best_role(roles):
219 role_precedence = ["associate_editor", "editor", "admin"]
220 best_role = None
221 for r in roles:
222 try:
223 if best_role is None and r in role_precedence:
224 best_role = r
225 if role_precedence.index(r) > role_precedence.index(best_role):
226 best_role = r
227 except ValueError:
228 pass # The user has a role not in our precedence list (e.g. api) - ignore it.
230 return best_role
232 @staticmethod
233 def _is_countable(prov, role):
234 """ Determine whether to include this provenance record in the report"""
236 """
237 # We now disregard role and count all completion events per user https://github.com/DOAJ/doaj/issues/1385
238 countable = False
240 if role == "admin" and (prov.action == "status:accepted" or prov.action == "status:rejected"):
241 countable = True
242 elif role == "editor" and prov.action == "status:ready":
243 countable = True
244 elif role == "associate_editor" and prov.action == "status:completed":
245 countable = True
246 """
248 return prov.action in ["status:accepted", "status:rejected", "status:ready", "status:completed"]
250 def tabulate(self):
251 self._count_down(self._last_period)
252 return _tabulate_time_entity_group(self.report, "User")
254 def filename(self, fr, to):
255 return "completion_by_" + self.period + "__from_" + _fft(fr) + "_to_" + _fft(to) + "__on_" + dates.today() + ".csv"
257 def _count_down(self, p):
258 if p is None:
259 return
260 for k in self.report[p].keys():
261 self.report[p][k]["count"] = len(self.report[p][k]["ids"])
262 del self.report[p][k]["ids"]
265class ProvenanceList(object):
266 def __init__(self, fr, to):
267 self.fr = fr
268 self.to = to
270 def query(self):
271 return {
272 "query" : {
273 "bool" : {
274 "must" : [
275 {"range" : {"created_date" : {"gt" : self.fr, "lte" : self.to}}}
276 ]
277 }
278 },
279 "sort" : [{"created_date" : {"order" : "asc"}}]
280 }
283class ContentByDate(object):
284 def __init__(self, fr, to):
285 self.fr = fr
286 self.to = to
288 def query(self):
289 return {
290 "query": {
291 "bool": {
292 "must": [
293 {"range": {"created_date": {"gt": self.fr, "lte": self.to}}}
294 ]
295 }
296 },
297 "size": 0,
298 "aggs": {
299 "years": {
300 "date_histogram": {
301 "field": "created_date",
302 "calendar_interval": "year"
303 },
304 "aggs": {
305 "countries": {
306 "terms": {
307 "field": "bibjson.publisher.country.exact",
308 "size": 1000
309 }
310 }
311 }
312 }
313 }
314 }
317#########################################################
318# Background task implementation
320class ReportingBackgroundTask(BackgroundTask):
322 __action__ = "reporting"
324 def run(self):
325 """
326 Execute the task as specified by the background_jon
327 :return:
328 """
329 job = self.background_job
330 params = job.params
332 outdir = self.get_param(params, "outdir", "report_" + dates.today())
333 fr = self.get_param(params, "from", DEFAULT_TIMESTAMP_VAL)
334 to = self.get_param(params, "to", dates.now_str())
336 job.add_audit_message("Saving reports to " + outdir)
337 if not os.path.exists(outdir):
338 os.makedirs(outdir)
340 prov_outfiles = provenance_reports(fr, to, outdir)
341 if prov_outfiles is None:
342 job.add_audit_message("No provenance records found; no provenance reports will be recorded.")
344 cont_outfiles = content_reports(fr, to, outdir)
345 refs = {}
346 self.set_reference(refs, "provenance_outfiles", prov_outfiles)
347 self.set_reference(refs, "content_outfiles", cont_outfiles)
348 job.reference = refs
350 msg = "Generated reports for period {x} to {y}".format(x=fr, y=to)
351 job.add_audit_message(msg)
353 send_email = self.get_param(params, "email", False)
354 if send_email:
355 ref_fr = dates.reformat(fr, FMT_DATETIME_STD, FMT_DATE_STD)
356 ref_to = dates.reformat(to, FMT_DATETIME_STD, FMT_DATE_STD)
357 archive_name = "reports_" + ref_fr + "_to_" + ref_to
358 email_archive(outdir, archive_name)
359 job.add_audit_message("email alert sent")
360 else:
361 job.add_audit_message("no email alert sent")
363 def cleanup(self):
364 """
365 Cleanup after a successful OR failed run of the task
366 :return:
367 """
368 failed = self.background_job.is_failed()
369 if not failed:
370 return
372 params = self.background_job.params
373 outdir = self.get_param(params, "outdir")
375 if outdir is not None and os.path.exists(outdir):
376 shutil.rmtree(outdir)
378 self.background_job.add_audit_message(u"Deleted directory {x} due to job failure".format(x=outdir))
380 @classmethod
381 def prepare(cls, username, **kwargs):
382 """
383 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob,
384 or fail with a suitable exception
386 :param kwargs: arbitrary keyword arguments pertaining to this task type
387 :return: a BackgroundJob instance representing this task
388 """
391 params = {}
392 cls.set_param(params, "outdir", kwargs.get("outdir", "report_" + dates.today()))
393 cls.set_param(params, "from", kwargs.get("from_date", DEFAULT_TIMESTAMP_VAL))
394 cls.set_param(params, "to", kwargs.get("to_date", dates.now_str()))
395 cls.set_param(params, "email", kwargs.get("email", False))
396 job = background_helper.create_job(username, cls.__action__,
397 queue_id=huey_helper.queue_id,
398 params=params)
400 return job
402 @classmethod
403 def submit(cls, background_job):
404 """
405 Submit the specified BackgroundJob to the background queue
407 :param background_job: the BackgroundJob instance
408 :return:
409 """
410 background_job.save()
411 run_reports.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10))
414huey_helper = ReportingBackgroundTask.create_huey_helper(queue)
417@huey_helper.register_schedule
418def scheduled_reports():
419 user = app.config.get("SYSTEM_USERNAME")
420 mail = bool(app.config.get("REPORTS_EMAIL_TO", False)) # Send email if recipient configured
421 outdir = app.config.get("REPORTS_BASE_DIR")
422 outdir = os.path.join(outdir, dates.today())
423 job = ReportingBackgroundTask.prepare(user, outdir=outdir, email=mail)
424 ReportingBackgroundTask.submit(job)
427@huey_helper.register_execute(is_load_config=False)
428def run_reports(job_id):
429 job = models.BackgroundJob.pull(job_id)
430 task = ReportingBackgroundTask(job)
431 BackgroundApi.execute(task)