Coverage for portality/tasks/async_workflow_notifications.py: 82%
174 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-19 18:38 +0100
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-19 18:38 +0100
1from datetime import datetime, timedelta
3from flask import render_template
5from portality import constants
6from portality import models, app_email
7from portality.background import BackgroundTask, BackgroundApi, BackgroundException
8from portality.core import app
9from portality.dao import Facetview2
10from portality.tasks.redis_huey import main_queue, schedule
13class AgeQuery(object):
14 def __init__(self, newest_date, status_filters):
15 self._newest_date = newest_date
16 self._status_filters = status_filters
18 def query(self):
19 return {
20 "query": {
21 "bool": {
22 "must": [
23 {
24 "range": {
25 "last_manual_update": {
26 #"gte": "1970-01-01T00:00:00Z", # Newer than 'Never' (implicit)
27 "lte": self._newest_date # Older than X_WEEKS
28 }
29 }
30 },
31 {
32 "exists": {
33 "field": "admin.editor"
34 }
35 }
36 ],
37 "should": self._status_filters,
38 "minimum_should_match": 1
39 }
40 },
41 "size": 0,
42 }
45class ReadyQuery(object):
46 def __init__(self, ready_filter):
47 self._ready_filter = ready_filter
49 def query(self):
50 return {
51 "query": {
52 "bool": {
53 "filter": self._ready_filter
54 }
55 },
56 "size": 0
57 }
60class EdAppQuery(object):
61 def __init__(self, status_filters):
62 self._status_filters = status_filters
64 def query(self):
65 return {
66 "query": {
67 "bool": {
68 "filter": {
69 "bool": {
70 "must": {
71 "exists": {"field": "admin.editor_group"}
72 },
73 "must_not": {
74 "exists": {
75 "field": "admin.editor"
76 }
77 },
78 "should": self._status_filters,
79 "minimum_should_match": 1
80 }
81 }
82 }
83 },
84 "size": 0,
85 "aggregations": {
86 "ed_group_counts": {
87 "terms": {
88 "field": "admin.editor_group.exact",
89 "size" : 9999
90 }
91 }
92 }
93 }
96class EdAgeQuery(object):
97 def __init__(self, newest_date, status_filters):
98 self._newest_date = newest_date
99 self._status_filters = status_filters
101 def query(self):
102 return {
103 "query": {
104 "bool": {
105 "must": [
106 {
107 "range": {
108 "last_manual_update": {
109 # "gte": "1970-01-01T00:00:00Z", # Newer than 'Never' (implicit)
110 "lte": self._newest_date # Older than X_WEEKS
111 }
112 }
113 },
114 {
115 "exists": {
116 "field": "admin.editor"
117 }
118 }
119 ],
120 "should": self._status_filters,
121 "minimum_should_match": 1
122 }
123 },
124 "size": 0,
125 "aggregations": {
126 "ed_group_counts": {
127 "terms": {
128 "field": "admin.editor_group.exact",
129 "size": 9999
130 }
131 }
132 }
133 }
135class AssEdAgeQuery(object):
136 def __init__(self, idle_date, very_idle_date, status_filters):
137 self._idle_date = idle_date
138 self._very_idle_date = very_idle_date
139 self._status_filters = status_filters
141 def query(self):
142 return {
143 "query": {
144 "bool": {
145 "must": {
146 "range": {
147 "last_manual_update": {
148 # "gte": "1970-01-01T00:00:00Z", # Newer than 'Never' (implicit)
149 "lte": self._idle_date # Older than X_DAYS
150 }
151 }
152 },
153 "should": self._status_filters,
154 "minimum_should_match" : 1
155 }
156 },
157 "size": 0,
158 "aggregations": {
159 "assoc_ed": {
160 "terms": {
161 "field": "admin.editor.exact",
162 "size": 9999
163 },
164 "aggregations": {
165 "older_weeks": {
166 "range": {
167 "field": "last_manual_update",
168 "ranges": [
169 {"to": self._very_idle_date}, # count those which are idle for weeks
170 ]
171 }
172 }
173 }
174 }
175 }
176 }
177# Functions for each notification recipient - ManEd, Editor, Assoc_editor
178def managing_editor_notifications(emails_dict):
179 """
180 Notify managing editors about two things:
181 * Summary of records assigned to associate editors but not touched for X weeks
182 * Records marked as ready
183 Note: requires request context to render the email text from templates
184 """
185 MAN_ED_EMAIL = app.config.get('MANAGING_EDITOR_EMAIL', 'managing-editors@doaj.org')
187 relevant_statuses = app.config.get("MAN_ED_NOTIFICATION_STATUSES")
188 term = "admin.application_status.exact"
189 status_filters = [Facetview2.make_term_filter(term, status) for status in relevant_statuses]
191 # First note - records not touched for so long
192 X_WEEKS = app.config.get('MAN_ED_IDLE_WEEKS', 2)
193 newest_date = datetime.now() - timedelta(weeks=X_WEEKS)
194 newest_date_stamp = newest_date.strftime("%Y-%m-%dT%H:%M:%SZ")
196 age_query = AgeQuery(newest_date_stamp, status_filters)
197 idle_res = models.Suggestion.query(q=age_query.query())
198 num_idle = idle_res.get('hits', {}).get('total', {}).get('value', 0)
200 text = render_template('email/workflow_reminder_fragments/admin_age_frag', num_idle=num_idle, x_weeks=X_WEEKS)
201 _add_email_paragraph(emails_dict, MAN_ED_EMAIL, 'Managing Editors', text)
203 # The second notification - the number of ready records
204 ready_filter = Facetview2.make_term_filter('admin.application_status.exact', constants.APPLICATION_STATUS_READY)
205 ready_query = ReadyQuery(ready_filter)
207 admin_fv_prefix = app.config.get('BASE_URL') + "/admin/applications?source="
208 fv_ready = Facetview2.make_query(filters=ready_filter, sort_parameter="last_manual_update")
209 ready_url = admin_fv_prefix + Facetview2.url_encode_query(fv_ready)
211 ready_res = models.Suggestion.query(q=ready_query.query())
212 num_ready = ready_res.get('hits').get('total', {}).get('value', 0)
214 text = render_template('email/workflow_reminder_fragments/admin_ready_frag', num=num_ready, url=ready_url)
215 _add_email_paragraph(emails_dict, MAN_ED_EMAIL, 'Managing Editors', text)
218def editor_notifications(emails_dict, limit=None):
219 """
220 Notify editors about two things:
221 * how many records are assigned to their group which have no associate assigned.
222 * how many records assigned to an associate in their group but have been idle for X_WEEKS
223 Note: requires request context to render the email text from templates
225 :param: limit: for the purposes of demonstration, limit the number of emails this function generates.
226 """
228 relevant_statuses = app.config.get("ED_NOTIFICATION_STATUSES")
229 term = "admin.application_status.exact"
230 status_filters = [Facetview2.make_term_filter(term, status) for status in relevant_statuses]
232 # First note - how many applications in editor's group have no associate editor assigned.
233 ed_app_query = EdAppQuery(status_filters)
235 ed_url = app.config.get("BASE_URL") + "/editor/group_applications"
237 # Query for editor groups which have items in the required statuses, count their numbers
238 es = models.Suggestion.query(q=ed_app_query.query())
239 group_stats = [(bucket.get("key"), bucket.get("doc_count")) for bucket in es.get("aggregations", {}).get("ed_group_counts", {}).get("buckets", [])]
241 if limit is not None and isinstance(limit, int):
242 group_stats = group_stats[:limit]
244 # Get the email addresses for the editor in charge of each group, Add the template to their email
245 for (group_name, group_count) in group_stats:
246 # get editor group object by name
247 eg = models.EditorGroup.pull_by_key("name", group_name)
248 if eg is None:
249 continue
251 # Get the email address to the editor account
252 editor = eg.get_editor_account()
253 ed_email = editor.email
255 text = render_template('email/workflow_reminder_fragments/editor_groupcount_frag', num=group_count, ed_group=group_name, url=ed_url)
256 _add_email_paragraph(emails_dict, ed_email, eg.editor, text)
258 # Second note - records within editor group not touched for so long
259 X_WEEKS = app.config.get('ED_IDLE_WEEKS', 2)
260 newest_date = datetime.now() - timedelta(weeks=X_WEEKS)
261 newest_date_stamp = newest_date.strftime("%Y-%m-%dT%H:%M:%SZ")
263 ed_age_query = EdAgeQuery(newest_date_stamp, status_filters)
265 ed_fv_prefix = app.config.get('BASE_URL') + "/editor/group_applications?source="
266 fv_age = Facetview2.make_query(sort_parameter="last_manual_update")
267 ed_age_url = ed_fv_prefix + Facetview2.url_encode_query(fv_age)
269 es = models.Suggestion.query(q=ed_age_query.query())
270 group_stats = [(bucket.get("key"), bucket.get("doc_count")) for bucket in es.get("aggregations", {}).get("ed_group_counts", {}).get("buckets", [])]
272 if limit is not None and isinstance(limit, int):
273 group_stats = group_stats[:limit]
275 # Get the email addresses for the editor in charge of each group, Add the template to their email
276 for (group_name, group_count) in group_stats:
277 # get editor group object by name
278 eg = models.EditorGroup.pull_by_key("name", group_name)
279 if eg is None:
280 continue
282 # Get the email address to the editor account
283 editor = eg.get_editor_account()
284 ed_email = editor.email
286 text = render_template('email/workflow_reminder_fragments/editor_age_frag', num=group_count, ed_group=group_name, url=ed_age_url, x_weeks=X_WEEKS)
287 _add_email_paragraph(emails_dict, ed_email, eg.editor, text)
290def associate_editor_notifications(emails_dict, limit=None):
291 """
292 Notify associates about two things:
293 * Records assigned that haven't been updated for X days
294 * Record(s) that haven't been updated for Y weeks
295 Note: requires request context to render the email text from templates
296 """
298 # Get our thresholds from settings
299 X_DAYS = app.config.get('ASSOC_ED_IDLE_DAYS', 2)
300 Y_WEEKS = app.config.get('ASSOC_ED_IDLE_WEEKS', 2)
301 now = datetime.now()
302 idle_date = now - timedelta(days=X_DAYS)
303 very_idle_date = now - timedelta(weeks=Y_WEEKS)
304 idle_date_stamp = idle_date.strftime("%Y-%m-%dT%H:%M:%SZ")
305 very_idle_date_stamp = very_idle_date.strftime("%Y-%m-%dT%H:%M:%SZ")
307 relevant_statuses = app.config.get("ASSOC_ED_NOTIFICATION_STATUSES")
308 term = "admin.application_status.exact"
309 status_filters = [Facetview2.make_term_filter(term, status) for status in relevant_statuses]
311 assoc_age_query = AssEdAgeQuery(idle_date_stamp, very_idle_date_stamp, status_filters)
312 url = app.config.get("BASE_URL") + "/editor/your_applications"
314 es = models.Suggestion.query(q=assoc_age_query.query())
315 buckets = es.get("aggregations", {}).get("assoc_ed", {}).get("buckets", [])
317 if limit is not None and isinstance(limit, int):
318 buckets = buckets[:limit]
320 for bucket in buckets: # loop through assoc_ed buckets
321 assoc_id = bucket.get("key")
322 idle = bucket.get("doc_count")
324 # Get the 'older than y weeks' count from nested aggregation
325 very_idle = bucket.get("older_weeks").get("buckets")[0].get('doc_count') # only one bucket, so take first
327 # Pull the email address for our associate editor from their account
328 assoc = models.Account.pull(assoc_id)
329 try:
330 assoc_email = assoc.email
331 except AttributeError:
332 # There isn't an account for that id
333 app.logger.warn("No account found for ID {0}".format(assoc_id))
334 continue
336 text = render_template('email/workflow_reminder_fragments/assoc_ed_age_frag', num_idle=idle, x_days=X_DAYS, num_very_idle=very_idle, y_weeks=Y_WEEKS, url=url)
337 _add_email_paragraph(emails_dict, assoc_email, assoc_id, text)
340def send_emails(emails_dict):
342 for (email, (to_name, paragraphs)) in emails_dict.items():
343 pre = 'Dear ' + to_name + ',\n\n'
344 post = '\n\nThe DOAJ Team\n\n***\nThis is an automated message. Please do not reply to this email.'
345 full_body = pre + '\n\n'.join(paragraphs) + post
347 app_email.send_mail(to=[email],
348 fro=app.config.get('SYSTEM_EMAIL_FROM', 'helpdesk@doaj.org'),
349 subject="DOAJ editorial reminders",
350 msg_body=full_body)
353def _add_email_paragraph(emails_dict, addr, to_name, para_string):
354 """
355 Add a new email to the global dict which stores the email fragments, or extend an existing one.
356 :param emails_dict: target object to store the emails
357 :param addr: email address for recipient
358 :param to_name: name of recipient
359 :param para_string: paragraph to add to the email
360 """
362 try:
363 (name, paras) = emails_dict[addr]
364 paras.append(para_string)
365 except KeyError:
366 emails_dict[addr] = (to_name, [para_string])
369class AsyncWorkflowBackgroundTask(BackgroundTask):
371 __action__ = "async_workflow_notifications"
373 def run(self):
374 """
375 Execute the task as specified by the background_job
376 """
377 job = self.background_job
379 """ Run through each notification type, then send emails """
380 # Create a request context to render templates
381 ctx = app.test_request_context()
382 ctx.push()
384 # Store all of the emails: { email_addr : (name, [paragraphs]) }
385 emails_dict = {}
387 # Gather info and build the notifications
388 managing_editor_notifications(emails_dict)
389 editor_notifications(emails_dict)
390 associate_editor_notifications(emails_dict)
392 # Discard the context (the send mail function makes its own)
393 ctx.pop()
395 send_emails(emails_dict)
397 def cleanup(self):
398 """
399 Cleanup after a successful OR failed run of the task
400 """
401 pass
403 @classmethod
404 def prepare(cls, username, **kwargs):
405 """
406 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob,
407 or fail with a suitable exception
408 :param username: user who called this job
409 :param kwargs: arbitrary keyword arguments pertaining to this task type
410 :return: a BackgroundJob instance representing this task
411 """
413 if not app.config.get("ENABLE_EMAIL", False):
414 raise BackgroundException("Email has been disabled in config. Set ENABLE_EMAIL to True to run this task.")
416 # first prepare a job record
417 job = models.BackgroundJob()
418 job.user = username
419 job.action = cls.__action__
420 return job
422 @classmethod
423 def submit(cls, background_job):
424 """
425 Submit the specified BackgroundJob to the background queue
426 :param background_job: the BackgroundJob instance
427 """
428 background_job.save()
429 async_workflow_notifications.schedule(args=(background_job.id,), delay=10)
432@main_queue.periodic_task(schedule("async_workflow_notifications"))
433def scheduled_async_workflow_notifications():
434 user = app.config.get("SYSTEM_USERNAME")
435 job = AsyncWorkflowBackgroundTask.prepare(user)
436 AsyncWorkflowBackgroundTask.submit(job)
439@main_queue.task()
440def async_workflow_notifications(job_id):
441 job = models.BackgroundJob.pull(job_id)
442 task = AsyncWorkflowBackgroundTask(job)
443 BackgroundApi.execute(task)