Coverage for portality / tasks / monitor_bgjobs.py: 92%
48 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
1import itertools
3from portality import app_email, models
4from portality.background import BackgroundTask
5from portality.core import app
6from portality.tasks.helpers import background_helper
7from portality.tasks.redis_huey import scheduled_short_queue as queue
10def get_system_email():
11 return app.config.get('SYSTEM_EMAIL_FROM', 'helpdesk@doaj.org')
14def send_mail_if_bgjob_error(to_address_list, from_address, logger_fn=None):
15 if logger_fn is None:
16 logger_fn = print
18 raw_query = {
19 'query': {'bool': {'must': [
20 {'term': {'status.exact': 'error'}},
21 {'term': {'user.exact': 'system'}},
22 {'term': {'action.exact': 'harvest'}},
23 ]
24 }},
25 'sort': [{'created_date': {'order': 'desc'}}],
26 'track_total_hits': True}
28 jobs = models.BackgroundJob.q2obj(q=raw_query)
29 if not jobs:
30 logger_fn(f'[monitor_bgjobs] No background jobs found. exit')
31 return
33 logger_fn(f'[monitor_bgjobs] {len(jobs)} of background jobs found.')
35 def _to_msg_lines(_jobs):
36 return [
37 '--------------',
38 f'{_jobs.action} by {_jobs.user} status: {_jobs.status}',
39 f'Job ID: {_jobs.id}',
40 f'Job Created: {_jobs.created_date}',
41 f'Job Last Updated: {_jobs.last_updated}',
42 ]
44 msg_lines = itertools.chain.from_iterable(_to_msg_lines(j) for j in jobs)
45 msg_body = f'{len(jobs)} of background jobs found.\n'
46 msg_body += '\n'.join(msg_lines)
47 app_email.send_mail(to_address_list, from_address,
48 '[Monitoring Background job] error background jobs found.',
49 msg_body=msg_body)
52class MonitorBgjobsBackgroundTask(BackgroundTask):
53 __action__ = "monitor_bgjobs"
55 def run(self):
56 kwargs = {k: self.get_param(self.background_job.params, k)
57 for k in ['to_address_list', 'from_address']}
58 kwargs['logger_fn'] = self.background_job.add_audit_message
59 send_mail_if_bgjob_error(**kwargs)
60 self.background_job.add_audit_message(f"{self.__action__} completed")
62 @classmethod
63 def prepare(cls, username, **kwargs):
64 params = {}
65 cls.set_param(params, 'to_address_list', background_helper.get_value_safe(
66 'to_address_list', [get_system_email(), ], kwargs))
67 cls.set_param(params, 'from_address', background_helper.get_value_safe(
68 'from_address', get_system_email(), kwargs))
69 return background_helper.create_job(username=username,
70 action=cls.__action__,
71 params=params)
73 def cleanup(self):
74 pass
76 @classmethod
77 def submit(cls, background_job):
78 background_helper.submit_by_background_job(background_job, monitor_bgjobs)
81huey_helper = MonitorBgjobsBackgroundTask.create_huey_helper(queue)
84@huey_helper.register_schedule
85def scheduled_monitor_bgjobs():
86 huey_helper.scheduled_common(
87 to_address_list=app.config.get("TASKS_MONITOR_BGJOBS_TO", [get_system_email(), ]),
88 from_address=app.config.get("TASKS_MONITOR_BGJOBS_FROM", get_system_email()),
89 )
92@huey_helper.register_execute(is_load_config=False)
93def monitor_bgjobs(job_id):
94 huey_helper.execute_common(job_id)