Coverage for portality/tasks/reporting.py: 92%
263 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-19 18:38 +0100
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-19 18:38 +0100
1from portality import models
2from portality.lib import dates
3from portality import datasets
4from portality.core import app
6from portality.background import BackgroundApi, BackgroundTask
7from portality.tasks.redis_huey import main_queue, schedule
8from portality.app_email import email_archive
9from portality.decorators import write_required
10from portality.dao import ESMappingMissingError, ScrollInitialiseException
12import os, shutil, csv
15def provenance_reports(fr, to, outdir):
16 pipeline = [
17 ActionCounter("edit", "month"),
18 ActionCounter("edit", "year"),
19 StatusCounter("month"),
20 StatusCounter("year")
21 ]
23 q = ProvenanceList(fr, to)
24 try:
25 for prov in models.Provenance.iterate(q.query()):
26 for filt in pipeline:
27 filt.count(prov)
28 except (ESMappingMissingError, ScrollInitialiseException):
29 return None
31 outfiles = []
32 for p in pipeline:
33 table = p.tabulate()
34 outfile = os.path.join(outdir, p.filename(fr, to))
35 outfiles.append(outfile)
36 with open(outfile, "w") as f:
37 writer = csv.writer(f)
38 for row in table:
39 writer.writerow(row)
41 return outfiles
44def content_reports(fr, to, outdir):
45 report = {}
47 q = ContentByDate(fr, to)
48 res = models.Suggestion.query(q=q.query())
49 year_buckets = res.get("aggregations", {}).get("years", {}).get("buckets", [])
50 for years in year_buckets:
51 ds = years.get("key_as_string")
52 do = dates.parse(ds)
53 year = do.year
54 if year not in report:
55 report[year] = {}
56 country_buckets = years.get("countries", {}).get("buckets", [])
57 for country in country_buckets:
58 cc = country.get("key")
59 cn = datasets.get_country_name(cc)
60 if cn not in report[year]:
61 report[year][cn] = {}
62 count = country.get("doc_count")
63 report[year][cn]["count"] = count
65 table = _tabulate_time_entity_group(report, "Country")
67 filename = "applications_by_year_by_country__" + _fft(fr) + "_to_" + _fft(to) + "__on_" + dates.today() + ".csv"
68 outfiles = []
69 outfile = os.path.join(outdir, filename)
70 outfiles.append(outfile)
71 with open(outfile, "w", encoding="utf-8") as f:
72 writer = csv.writer(f)
73 for row in table:
74 writer.writerow(row)
76 return outfiles
79def _tabulate_time_entity_group(group, entityKey):
80 date_keys_unsorted = group.keys()
81 date_keys = sorted(date_keys_unsorted)
82 table = []
83 padding = []
84 for db in date_keys:
85 users_active_this_period = group[db].keys()
86 for u in users_active_this_period:
87 c = group[db][u]["count"]
88 existing = False
89 for row in table:
90 if row[0] == u:
91 row.append(c)
92 existing = True
93 if not existing:
94 table.append([u] + padding + [c])
96 # Add a 0 for each user who has been added to the table but doesn't
97 # have any actions in the current time period we're looping over. E.g.
98 # if we're counting edits by month, this would be "users who were active
99 # in a previous month but haven't made any edits this month".
100 users_in_table = set(map(lambda each_row: each_row[0], table))
101 previously_active_users = users_in_table - set(users_active_this_period)
102 for row in table:
103 if row[0] in previously_active_users:
104 row.append(0)
106 # The following is only prefix padding. E.g. if "dom" started making edits in
107 # Jan 2015 but "emanuil" only started in Mar 2015, then "emanuil" needs
108 # 0s filled in for Jan + Feb 2015.
109 padding.append(0)
111 for row in table:
112 if len(row) < len(date_keys) + 1:
113 row += [0] * (len(date_keys) - len(row) + 1)
115 table.sort(key=lambda user: user[0])
116 table = [[entityKey] + date_keys] + table
117 return table
120def _fft(timestamp):
121 """File Friendly Timestamp - Windows doesn't appreciate : / etc in filenames; strip these out"""
122 return dates.reformat(timestamp, app.config.get("DEFAULT_DATE_FORMAT"), "%Y-%m-%d")
125class ReportCounter(object):
126 def __init__(self, period):
127 self.period = period
129 def _flatten_timestamp(self, ts):
130 if self.period == "month":
131 return ts.strftime("%Y-%m")
132 elif self.period == "year":
133 return ts.strftime("%Y")
135 def count(self, prov):
136 raise NotImplementedError()
138 def tabulate(self):
139 raise NotImplementedError()
141 def filename(self, fr, to):
142 raise NotImplementedError()
145class ActionCounter(ReportCounter):
146 def __init__(self, action, period):
147 self.action = action
148 self.report = {}
149 self._last_period = None
150 super(ActionCounter, self).__init__(period)
152 def count(self, prov):
153 if prov.action != self.action:
154 return
156 p = self._flatten_timestamp(prov.created_timestamp)
157 if p not in self.report:
158 self.report[p] = {}
160 if prov.user not in self.report[p]:
161 self.report[p][prov.user] = {"ids" : []}
163 if prov.resource_id not in self.report[p][prov.user]["ids"]:
164 self.report[p][prov.user]["ids"].append(prov.resource_id)
166 if p != self._last_period:
167 self._count_down(self._last_period)
168 self._last_period = p
170 def tabulate(self):
171 self._count_down(self._last_period)
172 return _tabulate_time_entity_group(self.report, "User")
174 def filename(self, fr, to):
175 return self.action + "_by_" + self.period + "__from_" + _fft(fr) + "_to_" + _fft(to) + "__on_" + dates.today() + ".csv"
177 def _count_down(self, p):
178 if p is None:
179 return
180 for k in self.report[p].keys():
181 self.report[p][k]["count"] = len(self.report[p][k]["ids"])
182 del self.report[p][k]["ids"]
185class StatusCounter(ReportCounter):
186 def __init__(self, period):
187 self.report = {}
188 self._last_period = None
189 super(StatusCounter, self).__init__(period)
191 def count(self, prov):
192 if not prov.action.startswith("status:"):
193 return
195 best_role = self._get_best_role(prov.roles)
196 countable = self._is_countable(prov, best_role)
198 if not countable:
199 return
201 p = self._flatten_timestamp(prov.created_timestamp)
202 if p not in self.report:
203 self.report[p] = {}
205 if prov.user not in self.report[p]:
206 self.report[p][prov.user] = {"ids" : []}
208 if prov.resource_id not in self.report[p][prov.user]["ids"]:
209 self.report[p][prov.user]["ids"].append(prov.resource_id)
211 if p != self._last_period:
212 self._count_down(self._last_period)
213 self._last_period = p
215 @staticmethod
216 def _get_best_role(roles):
217 role_precedence = ["associate_editor", "editor", "admin"]
218 best_role = None
219 for r in roles:
220 try:
221 if best_role is None and r in role_precedence:
222 best_role = r
223 if role_precedence.index(r) > role_precedence.index(best_role):
224 best_role = r
225 except ValueError:
226 pass # The user has a role not in our precedence list (e.g. api) - ignore it.
228 return best_role
230 @staticmethod
231 def _is_countable(prov, role):
232 """ Determine whether to include this provenance record in the report"""
234 """
235 # We now disregard role and count all completion events per user https://github.com/DOAJ/doaj/issues/1385
236 countable = False
238 if role == "admin" and (prov.action == "status:accepted" or prov.action == "status:rejected"):
239 countable = True
240 elif role == "editor" and prov.action == "status:ready":
241 countable = True
242 elif role == "associate_editor" and prov.action == "status:completed":
243 countable = True
244 """
246 return prov.action in ["status:accepted", "status:rejected", "status:ready", "status:completed"]
248 def tabulate(self):
249 self._count_down(self._last_period)
250 return _tabulate_time_entity_group(self.report, "User")
252 def filename(self, fr, to):
253 return "completion_by_" + self.period + "__from_" + _fft(fr) + "_to_" + _fft(to) + "__on_" + dates.today() + ".csv"
255 def _count_down(self, p):
256 if p is None:
257 return
258 for k in self.report[p].keys():
259 self.report[p][k]["count"] = len(self.report[p][k]["ids"])
260 del self.report[p][k]["ids"]
263class ProvenanceList(object):
264 def __init__(self, fr, to):
265 self.fr = fr
266 self.to = to
268 def query(self):
269 return {
270 "query" : {
271 "bool" : {
272 "must" : [
273 {"range" : {"created_date" : {"gt" : self.fr, "lte" : self.to}}}
274 ]
275 }
276 },
277 "sort" : [{"created_date" : {"order" : "asc"}}]
278 }
281class ContentByDate(object):
282 def __init__(self, fr, to):
283 self.fr = fr
284 self.to = to
286 def query(self):
287 return {
288 "query": {
289 "bool": {
290 "must": [
291 {"range": {"created_date": {"gt": self.fr, "lte": self.to}}}
292 ]
293 }
294 },
295 "size": 0,
296 "aggs": {
297 "years": {
298 "date_histogram": {
299 "field": "created_date",
300 "calendar_interval": "year"
301 },
302 "aggs": {
303 "countries": {
304 "terms": {
305 "field": "bibjson.publisher.country.exact",
306 "size": 1000
307 }
308 }
309 }
310 }
311 }
312 }
315#########################################################
316# Background task implementation
318class ReportingBackgroundTask(BackgroundTask):
320 __action__ = "reporting"
322 def run(self):
323 """
324 Execute the task as specified by the background_jon
325 :return:
326 """
327 job = self.background_job
328 params = job.params
330 outdir = self.get_param(params, "outdir", "report_" + dates.today())
331 fr = self.get_param(params, "from", "1970-01-01T00:00:00Z")
332 to = self.get_param(params, "to", dates.now())
334 job.add_audit_message("Saving reports to " + outdir)
335 if not os.path.exists(outdir):
336 os.makedirs(outdir)
338 prov_outfiles = provenance_reports(fr, to, outdir)
339 if prov_outfiles is None:
340 job.add_audit_message("No provenance records found; no provenance reports will be recorded.")
342 cont_outfiles = content_reports(fr, to, outdir)
343 refs = {}
344 self.set_reference(refs, "provenance_outfiles", prov_outfiles)
345 self.set_reference(refs, "content_outfiles", cont_outfiles)
346 job.reference = refs
348 msg = "Generated reports for period {x} to {y}".format(x=fr, y=to)
349 job.add_audit_message(msg)
351 send_email = self.get_param(params, "email", False)
352 if send_email:
353 ref_fr = dates.reformat(fr, app.config.get("DEFAULT_DATE_FORMAT"), "%Y-%m-%d")
354 ref_to = dates.reformat(to, app.config.get("DEFAULT_DATE_FORMAT"), "%Y-%m-%d")
355 archive_name = "reports_" + ref_fr + "_to_" + ref_to
356 email_archive(outdir, archive_name)
357 job.add_audit_message("email alert sent")
358 else:
359 job.add_audit_message("no email alert sent")
361 def cleanup(self):
362 """
363 Cleanup after a successful OR failed run of the task
364 :return:
365 """
366 failed = self.background_job.is_failed()
367 if not failed:
368 return
370 params = self.background_job.params
371 outdir = self.get_param(params, "outdir")
373 if outdir is not None and os.path.exists(outdir):
374 shutil.rmtree(outdir)
376 self.background_job.add_audit_message(u"Deleted directory {x} due to job failure".format(x=outdir))
378 @classmethod
379 def prepare(cls, username, **kwargs):
380 """
381 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob,
382 or fail with a suitable exception
384 :param kwargs: arbitrary keyword arguments pertaining to this task type
385 :return: a BackgroundJob instance representing this task
386 """
388 job = models.BackgroundJob()
389 job.user = username
390 job.action = cls.__action__
392 params = {}
393 cls.set_param(params, "outdir", kwargs.get("outdir", "report_" + dates.today()))
394 cls.set_param(params, "from", kwargs.get("from_date", "1970-01-01T00:00:00Z"))
395 cls.set_param(params, "to", kwargs.get("to_date", dates.now()))
396 cls.set_param(params, "email", kwargs.get("email", False))
397 job.params = params
399 return job
401 @classmethod
402 def submit(cls, background_job):
403 """
404 Submit the specified BackgroundJob to the background queue
406 :param background_job: the BackgroundJob instance
407 :return:
408 """
409 background_job.save()
410 run_reports.schedule(args=(background_job.id,), delay=10)
413@main_queue.periodic_task(schedule("reporting"))
414@write_required(script=True)
415def scheduled_reports():
416 user = app.config.get("SYSTEM_USERNAME")
417 mail = bool(app.config.get("REPORTS_EMAIL_TO", False)) # Send email if recipient configured
418 outdir = app.config.get("REPORTS_BASE_DIR")
419 outdir = os.path.join(outdir, dates.today())
420 job = ReportingBackgroundTask.prepare(user, outdir=outdir, email=mail)
421 ReportingBackgroundTask.submit(job)
424@main_queue.task()
425@write_required(script=True)
426def run_reports(job_id):
427 job = models.BackgroundJob.pull(job_id)
428 task = ReportingBackgroundTask(job)
429 BackgroundApi.execute(task)