Coverage for portality / tasks / async_workflow_notifications.py: 82%
176 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
1from datetime import timedelta
3from flask import render_template
5from portality import constants
6from portality import models, app_email
7from portality.background import BackgroundTask, BackgroundApi, BackgroundException
8from portality.core import app
9from portality.dao import Facetview2
10from portality.lib import dates
11from portality.lib.dates import FMT_DATETIME_STD
12from portality.tasks.helpers import background_helper
13from portality.tasks.redis_huey import scheduled_short_queue as queue, schedule
14from portality.ui import templates
17class AgeQuery(object):
18 def __init__(self, newest_date, status_filters):
19 self._newest_date = newest_date
20 self._status_filters = status_filters
22 def query(self):
23 return {
24 "query": {
25 "bool": {
26 "must": [
27 {
28 "range": {
29 "last_manual_update": {
30 #"gte": "1970-01-01T00:00:00Z", # Newer than 'Never' (implicit)
31 "lte": self._newest_date # Older than X_WEEKS
32 }
33 }
34 },
35 {
36 "exists": {
37 "field": "admin.editor"
38 }
39 }
40 ],
41 "should": self._status_filters,
42 "minimum_should_match": 1
43 }
44 },
45 "size": 0,
46 }
49class ReadyQuery(object):
50 def __init__(self, ready_filter):
51 self._ready_filter = ready_filter
53 def query(self):
54 return {
55 "query": {
56 "bool": {
57 "filter": self._ready_filter
58 }
59 },
60 "size": 0
61 }
64class EdAppQuery(object):
65 def __init__(self, status_filters):
66 self._status_filters = status_filters
68 def query(self):
69 return {
70 "query": {
71 "bool": {
72 "filter": {
73 "bool": {
74 "must": {
75 "exists": {"field": "admin.editor_group"}
76 },
77 "must_not": {
78 "exists": {
79 "field": "admin.editor"
80 }
81 },
82 "should": self._status_filters,
83 "minimum_should_match": 1
84 }
85 }
86 }
87 },
88 "size": 0,
89 "aggregations": {
90 "ed_group_counts": {
91 "terms": {
92 "field": "admin.editor_group.exact",
93 "size" : 9999
94 }
95 }
96 }
97 }
100class EdAgeQuery(object):
101 def __init__(self, newest_date, status_filters):
102 self._newest_date = newest_date
103 self._status_filters = status_filters
105 def query(self):
106 return {
107 "query": {
108 "bool": {
109 "must": [
110 {
111 "range": {
112 "last_manual_update": {
113 # "gte": "1970-01-01T00:00:00Z", # Newer than 'Never' (implicit)
114 "lte": self._newest_date # Older than X_WEEKS
115 }
116 }
117 },
118 {
119 "exists": {
120 "field": "admin.editor"
121 }
122 }
123 ],
124 "should": self._status_filters,
125 "minimum_should_match": 1
126 }
127 },
128 "size": 0,
129 "aggregations": {
130 "ed_group_counts": {
131 "terms": {
132 "field": "admin.editor_group.exact",
133 "size": 9999
134 }
135 }
136 }
137 }
139class AssEdAgeQuery(object):
140 def __init__(self, idle_date, very_idle_date, status_filters):
141 self._idle_date = idle_date
142 self._very_idle_date = very_idle_date
143 self._status_filters = status_filters
145 def query(self):
146 return {
147 "query": {
148 "bool": {
149 "must": {
150 "range": {
151 "last_manual_update": {
152 # "gte": "1970-01-01T00:00:00Z", # Newer than 'Never' (implicit)
153 "lte": self._idle_date # Older than X_DAYS
154 }
155 }
156 },
157 "should": self._status_filters,
158 "minimum_should_match" : 1
159 }
160 },
161 "size": 0,
162 "aggregations": {
163 "assoc_ed": {
164 "terms": {
165 "field": "admin.editor.exact",
166 "size": 9999
167 },
168 "aggregations": {
169 "older_weeks": {
170 "range": {
171 "field": "last_manual_update",
172 "ranges": [
173 {"to": self._very_idle_date}, # count those which are idle for weeks
174 ]
175 }
176 }
177 }
178 }
179 }
180 }
183# Functions for each notification recipient - ManEd, Editor, Assoc_editor
184def managing_editor_notifications(emails_dict):
185 """
186 Notify managing editors about two things:
187 * Summary of records assigned to associate editors but not touched for X weeks
188 * Records marked as ready
189 Note: requires request context to render the email text from templates
190 """
191 MAN_ED_EMAIL = app.config.get('MANAGING_EDITOR_EMAIL', 'managing-editors@doaj.org')
193 relevant_statuses = app.config.get("MAN_ED_NOTIFICATION_STATUSES")
194 term = "admin.application_status.exact"
195 status_filters = [Facetview2.make_term_filter(term, status) for status in relevant_statuses]
197 # First note - records not touched for so long
198 X_WEEKS = app.config.get('MAN_ED_IDLE_WEEKS', 2)
199 newest_date = dates.now() - timedelta(weeks=X_WEEKS)
200 newest_date_stamp = newest_date.strftime(FMT_DATETIME_STD)
202 age_query = AgeQuery(newest_date_stamp, status_filters)
203 idle_res = models.Suggestion.query(q=age_query.query())
204 num_idle = idle_res.get('hits', {}).get('total', {}).get('value', 0)
206 text = render_template(templates.EMAIL_WF_ADMIN_AGE, num_idle=num_idle, x_weeks=X_WEEKS)
207 _add_email_paragraph(emails_dict, MAN_ED_EMAIL, 'Managing Editors', text)
209 # The second notification - the number of ready records
210 ready_filter = Facetview2.make_term_filter('admin.application_status.exact', constants.APPLICATION_STATUS_READY)
211 ready_query = ReadyQuery(ready_filter)
213 admin_fv_prefix = app.config.get('BASE_URL') + "/admin/applications?source="
214 fv_ready = Facetview2.make_query(filters=ready_filter, sort_parameter="last_manual_update")
215 ready_url = admin_fv_prefix + Facetview2.url_encode_query(fv_ready)
217 ready_res = models.Suggestion.query(q=ready_query.query())
218 num_ready = ready_res.get('hits').get('total', {}).get('value', 0)
220 text = render_template(templates.EMAIL_WF_ADMIN_READY, num=num_ready, url=ready_url)
221 _add_email_paragraph(emails_dict, MAN_ED_EMAIL, 'Managing Editors', text)
224def editor_notifications(emails_dict, limit=None):
225 """
226 Notify editors about two things:
227 * how many records are assigned to their group which have no associate assigned.
228 * how many records assigned to an associate in their group but have been idle for X_WEEKS
229 Note: requires request context to render the email text from templates
231 :param: limit: for the purposes of demonstration, limit the number of emails this function generates.
232 """
234 relevant_statuses = app.config.get("ED_NOTIFICATION_STATUSES")
235 term = "admin.application_status.exact"
236 status_filters = [Facetview2.make_term_filter(term, status) for status in relevant_statuses]
238 # First note - how many applications in editor's group have no associate editor assigned.
239 ed_app_query = EdAppQuery(status_filters)
241 ed_url = app.config.get("BASE_URL") + "/editor/group_applications"
243 # Query for editor groups which have items in the required statuses, count their numbers
244 es = models.Suggestion.query(q=ed_app_query.query())
245 group_stats = [(bucket.get("key"), bucket.get("doc_count")) for bucket in es.get("aggregations", {}).get("ed_group_counts", {}).get("buckets", [])]
247 if limit is not None and isinstance(limit, int):
248 group_stats = group_stats[:limit]
250 # Get the email addresses for the editor in charge of each group, Add the template to their email
251 for (group_name, group_count) in group_stats:
252 # get editor group object by name
253 eg = models.EditorGroup.pull_by_key("name", group_name)
254 if eg is None:
255 continue
257 # Get the email address to the editor account
258 editor = eg.get_editor_account()
259 ed_email = editor.email
261 text = render_template(templates.EMAIL_WF_EDITOR_GROUPCOUNT, num=group_count, ed_group=group_name, url=ed_url)
262 _add_email_paragraph(emails_dict, ed_email, eg.editor, text)
264 # Second note - records within editor group not touched for so long
265 X_WEEKS = app.config.get('ED_IDLE_WEEKS', 2)
266 newest_date = dates.now() - timedelta(weeks=X_WEEKS)
267 newest_date_stamp = newest_date.strftime(FMT_DATETIME_STD)
269 ed_age_query = EdAgeQuery(newest_date_stamp, status_filters)
271 ed_fv_prefix = app.config.get('BASE_URL') + "/editor/group_applications?source="
272 fv_age = Facetview2.make_query(sort_parameter="last_manual_update")
273 ed_age_url = ed_fv_prefix + Facetview2.url_encode_query(fv_age)
275 es = models.Suggestion.query(q=ed_age_query.query())
276 group_stats = [(bucket.get("key"), bucket.get("doc_count")) for bucket in es.get("aggregations", {}).get("ed_group_counts", {}).get("buckets", [])]
278 if limit is not None and isinstance(limit, int):
279 group_stats = group_stats[:limit]
281 # Get the email addresses for the editor in charge of each group, Add the template to their email
282 for (group_name, group_count) in group_stats:
283 # get editor group object by name
284 eg = models.EditorGroup.pull_by_key("name", group_name)
285 if eg is None:
286 continue
288 # Get the email address to the editor account
289 editor = eg.get_editor_account()
290 ed_email = editor.email
292 text = render_template(templates.EMAIL_WF_EDITOR_AGE, num=group_count, ed_group=group_name, url=ed_age_url, x_weeks=X_WEEKS)
293 _add_email_paragraph(emails_dict, ed_email, eg.editor, text)
296def associate_editor_notifications(emails_dict, limit=None):
297 """
298 Notify associates about two things:
299 * Records assigned that haven't been updated for X days
300 * Record(s) that haven't been updated for Y weeks
301 Note: requires request context to render the email text from templates
302 """
304 # Get our thresholds from settings
305 X_DAYS = app.config.get('ASSOC_ED_IDLE_DAYS', 2)
306 Y_WEEKS = app.config.get('ASSOC_ED_IDLE_WEEKS', 2)
307 now = dates.now()
308 idle_date = now - timedelta(days=X_DAYS)
309 very_idle_date = now - timedelta(weeks=Y_WEEKS)
310 idle_date_stamp = idle_date.strftime(FMT_DATETIME_STD)
311 very_idle_date_stamp = very_idle_date.strftime(FMT_DATETIME_STD)
313 relevant_statuses = app.config.get("ASSOC_ED_NOTIFICATION_STATUSES")
314 term = "admin.application_status.exact"
315 status_filters = [Facetview2.make_term_filter(term, status) for status in relevant_statuses]
317 assoc_age_query = AssEdAgeQuery(idle_date_stamp, very_idle_date_stamp, status_filters)
318 url = app.config.get("BASE_URL") + "/editor/your_applications"
320 es = models.Suggestion.query(q=assoc_age_query.query())
321 buckets = es.get("aggregations", {}).get("assoc_ed", {}).get("buckets", [])
323 if limit is not None and isinstance(limit, int):
324 buckets = buckets[:limit]
326 for bucket in buckets: # loop through assoc_ed buckets
327 assoc_id = bucket.get("key")
328 idle = bucket.get("doc_count")
330 # Get the 'older than y weeks' count from nested aggregation
331 very_idle = bucket.get("older_weeks").get("buckets")[0].get('doc_count') # only one bucket, so take first
333 # Pull the email address for our associate editor from their account
334 assoc = models.Account.pull(assoc_id)
335 try:
336 assoc_email = assoc.email
337 except AttributeError:
338 # There isn't an account for that id
339 app.logger.warning("No account found for ID {0}".format(assoc_id))
340 continue
342 text = render_template(templates.EMAIL_WF_ASSED_AGE, num_idle=idle, x_days=X_DAYS, num_very_idle=very_idle, y_weeks=Y_WEEKS, url=url)
343 _add_email_paragraph(emails_dict, assoc_email, assoc_id, text)
346def send_emails(emails_dict):
348 for (email, (to_name, paragraphs)) in emails_dict.items():
349 pre = 'Dear ' + to_name + ',\n\n'
350 post = '\n\nThe DOAJ Team\n\n***\nThis is an automated message. Please do not reply to this email.'
351 full_body = pre + '\n\n'.join(paragraphs) + post
353 app_email.send_mail(to=[email],
354 fro=app.config.get('SYSTEM_EMAIL_FROM', 'helpdesk@doaj.org'),
355 subject="DOAJ editorial reminders",
356 msg_body=full_body)
359def _add_email_paragraph(emails_dict, addr, to_name, para_string):
360 """
361 Add a new email to the global dict which stores the email fragments, or extend an existing one.
362 :param emails_dict: target object to store the emails
363 :param addr: email address for recipient
364 :param to_name: name of recipient
365 :param para_string: paragraph to add to the email
366 """
368 try:
369 (name, paras) = emails_dict[addr]
370 paras.append(para_string)
371 except KeyError:
372 emails_dict[addr] = (to_name, [para_string])
375class AsyncWorkflowBackgroundTask(BackgroundTask):
377 __action__ = "async_workflow_notifications"
379 def run(self):
380 """
381 Execute the task as specified by the background_job
382 """
383 job = self.background_job
385 """ Run through each notification type, then send emails """
386 # Create a request context to render templates
387 ctx = app.test_request_context()
388 ctx.push()
390 # Store all of the emails: { email_addr : (name, [paragraphs]) }
391 emails_dict = {}
393 # Gather info and build the notifications
394 managing_editor_notifications(emails_dict)
395 editor_notifications(emails_dict)
396 associate_editor_notifications(emails_dict)
398 # Discard the context (the send mail function makes its own)
399 ctx.pop()
401 send_emails(emails_dict)
403 def cleanup(self):
404 """
405 Cleanup after a successful OR failed run of the task
406 """
407 pass
409 @classmethod
410 def prepare(cls, username, **kwargs):
411 """
412 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob,
413 or fail with a suitable exception
414 :param username: user who called this job
415 :param kwargs: arbitrary keyword arguments pertaining to this task type
416 :return: a BackgroundJob instance representing this task
417 """
419 if not app.config.get("ENABLE_EMAIL", False):
420 raise BackgroundException("Email has been disabled in config. Set ENABLE_EMAIL to True to run this task.")
422 # first prepare a job record
423 return background_helper.create_job(username, cls.__action__, queue_id=huey_helper.queue_id)
425 @classmethod
426 def submit(cls, background_job):
427 """
428 Submit the specified BackgroundJob to the background queue
429 :param background_job: the BackgroundJob instance
430 """
431 background_job.save()
432 async_workflow_notifications.schedule(args=(background_job.id,), delay=10)
435huey_helper = AsyncWorkflowBackgroundTask.create_huey_helper(queue)
438@huey_helper.register_schedule
439def scheduled_async_workflow_notifications():
440 user = app.config.get("SYSTEM_USERNAME")
441 job = AsyncWorkflowBackgroundTask.prepare(user)
442 AsyncWorkflowBackgroundTask.submit(job)
445@huey_helper.register_execute(is_load_config=False)
446def async_workflow_notifications(job_id):
447 job = models.BackgroundJob.pull(job_id)
448 task = AsyncWorkflowBackgroundTask(job)
449 BackgroundApi.execute(task)