Coverage for portality / tasks / async_workflow_notifications.py: 82%

176 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-04 09:41 +0100

1from datetime import timedelta 

2 

3from flask import render_template 

4 

5from portality import constants 

6from portality import models, app_email 

7from portality.background import BackgroundTask, BackgroundApi, BackgroundException 

8from portality.core import app 

9from portality.dao import Facetview2 

10from portality.lib import dates 

11from portality.lib.dates import FMT_DATETIME_STD 

12from portality.tasks.helpers import background_helper 

13from portality.tasks.redis_huey import scheduled_short_queue as queue, schedule 

14from portality.ui import templates 

15 

16 

17class AgeQuery(object): 

18 def __init__(self, newest_date, status_filters): 

19 self._newest_date = newest_date 

20 self._status_filters = status_filters 

21 

22 def query(self): 

23 return { 

24 "query": { 

25 "bool": { 

26 "must": [ 

27 { 

28 "range": { 

29 "last_manual_update": { 

30 #"gte": "1970-01-01T00:00:00Z", # Newer than 'Never' (implicit) 

31 "lte": self._newest_date # Older than X_WEEKS 

32 } 

33 } 

34 }, 

35 { 

36 "exists": { 

37 "field": "admin.editor" 

38 } 

39 } 

40 ], 

41 "should": self._status_filters, 

42 "minimum_should_match": 1 

43 } 

44 }, 

45 "size": 0, 

46 } 

47 

48 

49class ReadyQuery(object): 

50 def __init__(self, ready_filter): 

51 self._ready_filter = ready_filter 

52 

53 def query(self): 

54 return { 

55 "query": { 

56 "bool": { 

57 "filter": self._ready_filter 

58 } 

59 }, 

60 "size": 0 

61 } 

62 

63 

64class EdAppQuery(object): 

65 def __init__(self, status_filters): 

66 self._status_filters = status_filters 

67 

68 def query(self): 

69 return { 

70 "query": { 

71 "bool": { 

72 "filter": { 

73 "bool": { 

74 "must": { 

75 "exists": {"field": "admin.editor_group"} 

76 }, 

77 "must_not": { 

78 "exists": { 

79 "field": "admin.editor" 

80 } 

81 }, 

82 "should": self._status_filters, 

83 "minimum_should_match": 1 

84 } 

85 } 

86 } 

87 }, 

88 "size": 0, 

89 "aggregations": { 

90 "ed_group_counts": { 

91 "terms": { 

92 "field": "admin.editor_group.exact", 

93 "size" : 9999 

94 } 

95 } 

96 } 

97 } 

98 

99 

100class EdAgeQuery(object): 

101 def __init__(self, newest_date, status_filters): 

102 self._newest_date = newest_date 

103 self._status_filters = status_filters 

104 

105 def query(self): 

106 return { 

107 "query": { 

108 "bool": { 

109 "must": [ 

110 { 

111 "range": { 

112 "last_manual_update": { 

113 # "gte": "1970-01-01T00:00:00Z", # Newer than 'Never' (implicit) 

114 "lte": self._newest_date # Older than X_WEEKS 

115 } 

116 } 

117 }, 

118 { 

119 "exists": { 

120 "field": "admin.editor" 

121 } 

122 } 

123 ], 

124 "should": self._status_filters, 

125 "minimum_should_match": 1 

126 } 

127 }, 

128 "size": 0, 

129 "aggregations": { 

130 "ed_group_counts": { 

131 "terms": { 

132 "field": "admin.editor_group.exact", 

133 "size": 9999 

134 } 

135 } 

136 } 

137 } 

138 

139class AssEdAgeQuery(object): 

140 def __init__(self, idle_date, very_idle_date, status_filters): 

141 self._idle_date = idle_date 

142 self._very_idle_date = very_idle_date 

143 self._status_filters = status_filters 

144 

145 def query(self): 

146 return { 

147 "query": { 

148 "bool": { 

149 "must": { 

150 "range": { 

151 "last_manual_update": { 

152 # "gte": "1970-01-01T00:00:00Z", # Newer than 'Never' (implicit) 

153 "lte": self._idle_date # Older than X_DAYS 

154 } 

155 } 

156 }, 

157 "should": self._status_filters, 

158 "minimum_should_match" : 1 

159 } 

160 }, 

161 "size": 0, 

162 "aggregations": { 

163 "assoc_ed": { 

164 "terms": { 

165 "field": "admin.editor.exact", 

166 "size": 9999 

167 }, 

168 "aggregations": { 

169 "older_weeks": { 

170 "range": { 

171 "field": "last_manual_update", 

172 "ranges": [ 

173 {"to": self._very_idle_date}, # count those which are idle for weeks 

174 ] 

175 } 

176 } 

177 } 

178 } 

179 } 

180 } 

181 

182 

183# Functions for each notification recipient - ManEd, Editor, Assoc_editor 

184def managing_editor_notifications(emails_dict): 

185 """ 

186 Notify managing editors about two things: 

187 * Summary of records assigned to associate editors but not touched for X weeks 

188 * Records marked as ready 

189 Note: requires request context to render the email text from templates 

190 """ 

191 MAN_ED_EMAIL = app.config.get('MANAGING_EDITOR_EMAIL', 'managing-editors@doaj.org') 

192 

193 relevant_statuses = app.config.get("MAN_ED_NOTIFICATION_STATUSES") 

194 term = "admin.application_status.exact" 

195 status_filters = [Facetview2.make_term_filter(term, status) for status in relevant_statuses] 

196 

197 # First note - records not touched for so long 

198 X_WEEKS = app.config.get('MAN_ED_IDLE_WEEKS', 2) 

199 newest_date = dates.now() - timedelta(weeks=X_WEEKS) 

200 newest_date_stamp = newest_date.strftime(FMT_DATETIME_STD) 

201 

202 age_query = AgeQuery(newest_date_stamp, status_filters) 

203 idle_res = models.Suggestion.query(q=age_query.query()) 

204 num_idle = idle_res.get('hits', {}).get('total', {}).get('value', 0) 

205 

206 text = render_template(templates.EMAIL_WF_ADMIN_AGE, num_idle=num_idle, x_weeks=X_WEEKS) 

207 _add_email_paragraph(emails_dict, MAN_ED_EMAIL, 'Managing Editors', text) 

208 

209 # The second notification - the number of ready records 

210 ready_filter = Facetview2.make_term_filter('admin.application_status.exact', constants.APPLICATION_STATUS_READY) 

211 ready_query = ReadyQuery(ready_filter) 

212 

213 admin_fv_prefix = app.config.get('BASE_URL') + "/admin/applications?source=" 

214 fv_ready = Facetview2.make_query(filters=ready_filter, sort_parameter="last_manual_update") 

215 ready_url = admin_fv_prefix + Facetview2.url_encode_query(fv_ready) 

216 

217 ready_res = models.Suggestion.query(q=ready_query.query()) 

218 num_ready = ready_res.get('hits').get('total', {}).get('value', 0) 

219 

220 text = render_template(templates.EMAIL_WF_ADMIN_READY, num=num_ready, url=ready_url) 

221 _add_email_paragraph(emails_dict, MAN_ED_EMAIL, 'Managing Editors', text) 

222 

223 

224def editor_notifications(emails_dict, limit=None): 

225 """ 

226 Notify editors about two things: 

227 * how many records are assigned to their group which have no associate assigned. 

228 * how many records assigned to an associate in their group but have been idle for X_WEEKS 

229 Note: requires request context to render the email text from templates 

230  

231 :param: limit: for the purposes of demonstration, limit the number of emails this function generates. 

232 """ 

233 

234 relevant_statuses = app.config.get("ED_NOTIFICATION_STATUSES") 

235 term = "admin.application_status.exact" 

236 status_filters = [Facetview2.make_term_filter(term, status) for status in relevant_statuses] 

237 

238 # First note - how many applications in editor's group have no associate editor assigned. 

239 ed_app_query = EdAppQuery(status_filters) 

240 

241 ed_url = app.config.get("BASE_URL") + "/editor/group_applications" 

242 

243 # Query for editor groups which have items in the required statuses, count their numbers 

244 es = models.Suggestion.query(q=ed_app_query.query()) 

245 group_stats = [(bucket.get("key"), bucket.get("doc_count")) for bucket in es.get("aggregations", {}).get("ed_group_counts", {}).get("buckets", [])] 

246 

247 if limit is not None and isinstance(limit, int): 

248 group_stats = group_stats[:limit] 

249 

250 # Get the email addresses for the editor in charge of each group, Add the template to their email 

251 for (group_name, group_count) in group_stats: 

252 # get editor group object by name 

253 eg = models.EditorGroup.pull_by_key("name", group_name) 

254 if eg is None: 

255 continue 

256 

257 # Get the email address to the editor account 

258 editor = eg.get_editor_account() 

259 ed_email = editor.email 

260 

261 text = render_template(templates.EMAIL_WF_EDITOR_GROUPCOUNT, num=group_count, ed_group=group_name, url=ed_url) 

262 _add_email_paragraph(emails_dict, ed_email, eg.editor, text) 

263 

264 # Second note - records within editor group not touched for so long 

265 X_WEEKS = app.config.get('ED_IDLE_WEEKS', 2) 

266 newest_date = dates.now() - timedelta(weeks=X_WEEKS) 

267 newest_date_stamp = newest_date.strftime(FMT_DATETIME_STD) 

268 

269 ed_age_query = EdAgeQuery(newest_date_stamp, status_filters) 

270 

271 ed_fv_prefix = app.config.get('BASE_URL') + "/editor/group_applications?source=" 

272 fv_age = Facetview2.make_query(sort_parameter="last_manual_update") 

273 ed_age_url = ed_fv_prefix + Facetview2.url_encode_query(fv_age) 

274 

275 es = models.Suggestion.query(q=ed_age_query.query()) 

276 group_stats = [(bucket.get("key"), bucket.get("doc_count")) for bucket in es.get("aggregations", {}).get("ed_group_counts", {}).get("buckets", [])] 

277 

278 if limit is not None and isinstance(limit, int): 

279 group_stats = group_stats[:limit] 

280 

281 # Get the email addresses for the editor in charge of each group, Add the template to their email 

282 for (group_name, group_count) in group_stats: 

283 # get editor group object by name 

284 eg = models.EditorGroup.pull_by_key("name", group_name) 

285 if eg is None: 

286 continue 

287 

288 # Get the email address to the editor account 

289 editor = eg.get_editor_account() 

290 ed_email = editor.email 

291 

292 text = render_template(templates.EMAIL_WF_EDITOR_AGE, num=group_count, ed_group=group_name, url=ed_age_url, x_weeks=X_WEEKS) 

293 _add_email_paragraph(emails_dict, ed_email, eg.editor, text) 

294 

295 

296def associate_editor_notifications(emails_dict, limit=None): 

297 """ 

298 Notify associates about two things: 

299 * Records assigned that haven't been updated for X days 

300 * Record(s) that haven't been updated for Y weeks 

301 Note: requires request context to render the email text from templates 

302 """ 

303 

304 # Get our thresholds from settings 

305 X_DAYS = app.config.get('ASSOC_ED_IDLE_DAYS', 2) 

306 Y_WEEKS = app.config.get('ASSOC_ED_IDLE_WEEKS', 2) 

307 now = dates.now() 

308 idle_date = now - timedelta(days=X_DAYS) 

309 very_idle_date = now - timedelta(weeks=Y_WEEKS) 

310 idle_date_stamp = idle_date.strftime(FMT_DATETIME_STD) 

311 very_idle_date_stamp = very_idle_date.strftime(FMT_DATETIME_STD) 

312 

313 relevant_statuses = app.config.get("ASSOC_ED_NOTIFICATION_STATUSES") 

314 term = "admin.application_status.exact" 

315 status_filters = [Facetview2.make_term_filter(term, status) for status in relevant_statuses] 

316 

317 assoc_age_query = AssEdAgeQuery(idle_date_stamp, very_idle_date_stamp, status_filters) 

318 url = app.config.get("BASE_URL") + "/editor/your_applications" 

319 

320 es = models.Suggestion.query(q=assoc_age_query.query()) 

321 buckets = es.get("aggregations", {}).get("assoc_ed", {}).get("buckets", []) 

322 

323 if limit is not None and isinstance(limit, int): 

324 buckets = buckets[:limit] 

325 

326 for bucket in buckets: # loop through assoc_ed buckets 

327 assoc_id = bucket.get("key") 

328 idle = bucket.get("doc_count") 

329 

330 # Get the 'older than y weeks' count from nested aggregation 

331 very_idle = bucket.get("older_weeks").get("buckets")[0].get('doc_count') # only one bucket, so take first 

332 

333 # Pull the email address for our associate editor from their account 

334 assoc = models.Account.pull(assoc_id) 

335 try: 

336 assoc_email = assoc.email 

337 except AttributeError: 

338 # There isn't an account for that id 

339 app.logger.warning("No account found for ID {0}".format(assoc_id)) 

340 continue 

341 

342 text = render_template(templates.EMAIL_WF_ASSED_AGE, num_idle=idle, x_days=X_DAYS, num_very_idle=very_idle, y_weeks=Y_WEEKS, url=url) 

343 _add_email_paragraph(emails_dict, assoc_email, assoc_id, text) 

344 

345 

346def send_emails(emails_dict): 

347 

348 for (email, (to_name, paragraphs)) in emails_dict.items(): 

349 pre = 'Dear ' + to_name + ',\n\n' 

350 post = '\n\nThe DOAJ Team\n\n***\nThis is an automated message. Please do not reply to this email.' 

351 full_body = pre + '\n\n'.join(paragraphs) + post 

352 

353 app_email.send_mail(to=[email], 

354 fro=app.config.get('SYSTEM_EMAIL_FROM', 'helpdesk@doaj.org'), 

355 subject="DOAJ editorial reminders", 

356 msg_body=full_body) 

357 

358 

359def _add_email_paragraph(emails_dict, addr, to_name, para_string): 

360 """ 

361 Add a new email to the global dict which stores the email fragments, or extend an existing one. 

362 :param emails_dict: target object to store the emails 

363 :param addr: email address for recipient 

364 :param to_name: name of recipient 

365 :param para_string: paragraph to add to the email 

366 """ 

367 

368 try: 

369 (name, paras) = emails_dict[addr] 

370 paras.append(para_string) 

371 except KeyError: 

372 emails_dict[addr] = (to_name, [para_string]) 

373 

374 

375class AsyncWorkflowBackgroundTask(BackgroundTask): 

376 

377 __action__ = "async_workflow_notifications" 

378 

379 def run(self): 

380 """ 

381 Execute the task as specified by the background_job 

382 """ 

383 job = self.background_job 

384 

385 """ Run through each notification type, then send emails """ 

386 # Create a request context to render templates 

387 ctx = app.test_request_context() 

388 ctx.push() 

389 

390 # Store all of the emails: { email_addr : (name, [paragraphs]) } 

391 emails_dict = {} 

392 

393 # Gather info and build the notifications 

394 managing_editor_notifications(emails_dict) 

395 editor_notifications(emails_dict) 

396 associate_editor_notifications(emails_dict) 

397 

398 # Discard the context (the send mail function makes its own) 

399 ctx.pop() 

400 

401 send_emails(emails_dict) 

402 

403 def cleanup(self): 

404 """ 

405 Cleanup after a successful OR failed run of the task 

406 """ 

407 pass 

408 

409 @classmethod 

410 def prepare(cls, username, **kwargs): 

411 """ 

412 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob, 

413 or fail with a suitable exception 

414 :param username: user who called this job 

415 :param kwargs: arbitrary keyword arguments pertaining to this task type 

416 :return: a BackgroundJob instance representing this task 

417 """ 

418 

419 if not app.config.get("ENABLE_EMAIL", False): 

420 raise BackgroundException("Email has been disabled in config. Set ENABLE_EMAIL to True to run this task.") 

421 

422 # first prepare a job record 

423 return background_helper.create_job(username, cls.__action__, queue_id=huey_helper.queue_id) 

424 

425 @classmethod 

426 def submit(cls, background_job): 

427 """ 

428 Submit the specified BackgroundJob to the background queue 

429 :param background_job: the BackgroundJob instance 

430 """ 

431 background_job.save() 

432 async_workflow_notifications.schedule(args=(background_job.id,), delay=10) 

433 

434 

435huey_helper = AsyncWorkflowBackgroundTask.create_huey_helper(queue) 

436 

437 

438@huey_helper.register_schedule 

439def scheduled_async_workflow_notifications(): 

440 user = app.config.get("SYSTEM_USERNAME") 

441 job = AsyncWorkflowBackgroundTask.prepare(user) 

442 AsyncWorkflowBackgroundTask.submit(job) 

443 

444 

445@huey_helper.register_execute(is_load_config=False) 

446def async_workflow_notifications(job_id): 

447 job = models.BackgroundJob.pull(job_id) 

448 task = AsyncWorkflowBackgroundTask(job) 

449 BackgroundApi.execute(task)