Coverage for portality / tasks / monitor_bgjobs.py: 92%

48 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 00:09 +0100

1import itertools 

2 

3from portality import app_email, models 

4from portality.background import BackgroundTask 

5from portality.core import app 

6from portality.tasks.helpers import background_helper 

7from portality.tasks.redis_huey import scheduled_short_queue as queue 

8 

9 

10def get_system_email(): 

11 return app.config.get('SYSTEM_EMAIL_FROM', 'helpdesk@doaj.org') 

12 

13 

14def send_mail_if_bgjob_error(to_address_list, from_address, logger_fn=None): 

15 if logger_fn is None: 

16 logger_fn = print 

17 

18 raw_query = { 

19 'query': {'bool': {'must': [ 

20 {'term': {'status.exact': 'error'}}, 

21 {'term': {'user.exact': 'system'}}, 

22 {'term': {'action.exact': 'harvest'}}, 

23 ] 

24 }}, 

25 'sort': [{'created_date': {'order': 'desc'}}], 

26 'track_total_hits': True} 

27 

28 jobs = models.BackgroundJob.q2obj(q=raw_query) 

29 if not jobs: 

30 logger_fn(f'[monitor_bgjobs] No background jobs found. exit') 

31 return 

32 

33 logger_fn(f'[monitor_bgjobs] {len(jobs)} of background jobs found.') 

34 

35 def _to_msg_lines(_jobs): 

36 return [ 

37 '--------------', 

38 f'{_jobs.action} by {_jobs.user} status: {_jobs.status}', 

39 f'Job ID: {_jobs.id}', 

40 f'Job Created: {_jobs.created_date}', 

41 f'Job Last Updated: {_jobs.last_updated}', 

42 ] 

43 

44 msg_lines = itertools.chain.from_iterable(_to_msg_lines(j) for j in jobs) 

45 msg_body = f'{len(jobs)} of background jobs found.\n' 

46 msg_body += '\n'.join(msg_lines) 

47 app_email.send_mail(to_address_list, from_address, 

48 '[Monitoring Background job] error background jobs found.', 

49 msg_body=msg_body) 

50 

51 

52class MonitorBgjobsBackgroundTask(BackgroundTask): 

53 __action__ = "monitor_bgjobs" 

54 

55 def run(self): 

56 kwargs = {k: self.get_param(self.background_job.params, k) 

57 for k in ['to_address_list', 'from_address']} 

58 kwargs['logger_fn'] = self.background_job.add_audit_message 

59 send_mail_if_bgjob_error(**kwargs) 

60 self.background_job.add_audit_message(f"{self.__action__} completed") 

61 

62 @classmethod 

63 def prepare(cls, username, **kwargs): 

64 params = {} 

65 cls.set_param(params, 'to_address_list', background_helper.get_value_safe( 

66 'to_address_list', [get_system_email(), ], kwargs)) 

67 cls.set_param(params, 'from_address', background_helper.get_value_safe( 

68 'from_address', get_system_email(), kwargs)) 

69 return background_helper.create_job(username=username, 

70 action=cls.__action__, 

71 params=params) 

72 

73 def cleanup(self): 

74 pass 

75 

76 @classmethod 

77 def submit(cls, background_job): 

78 background_helper.submit_by_background_job(background_job, monitor_bgjobs) 

79 

80 

81huey_helper = MonitorBgjobsBackgroundTask.create_huey_helper(queue) 

82 

83 

84@huey_helper.register_schedule 

85def scheduled_monitor_bgjobs(): 

86 huey_helper.scheduled_common( 

87 to_address_list=app.config.get("TASKS_MONITOR_BGJOBS_TO", [get_system_email(), ]), 

88 from_address=app.config.get("TASKS_MONITOR_BGJOBS_FROM", get_system_email()), 

89 ) 

90 

91 

92@huey_helper.register_execute(is_load_config=False) 

93def monitor_bgjobs(job_id): 

94 huey_helper.execute_common(job_id)