Coverage for portality / bll / services / background_task_status.py: 99%
94 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
1"""
2~~BackgroundTask:Monitoring~~
3"""
4import itertools
5from typing import Iterable
7from portality import constants
8# from portality.constants import BGJOB_QUEUE_ID_LONG, BGJOB_QUEUE_ID_MAIN, BGJOB_QUEUE_ID_EVENTS, BGJOB_QUEUE_ID_SCHEDULED_LONG, BGJOB_QUEUE_ID_SCHEDULED_SHORT, BGJOB_STATUS_ERROR, BGJOB_STATUS_QUEUED, \
9# BG_STATUS_STABLE, BG_STATUS_UNSTABLE, BGJOB_STATUS_COMPLETE
10from portality.core import app
11from portality.lib import dates
12from portality.models.background import BackgroundJobQueryBuilder, BackgroundJob, SimpleBgjobQueue, \
13 LastCompletedJobQuery
14from portality.tasks.helpers import background_helper
17class BackgroundTaskStatusService:
18 """
19 "background_task_status" is concept for list current background task status for monitoring.
20 `create_background_status` is important function in service for create background_status data
21 """
23 @staticmethod
24 def is_stable(val):
25 return val == constants.BG_STATUS_STABLE
27 @staticmethod
28 def to_bg_status_str(stable_val: bool) -> str:
29 return constants.BG_STATUS_STABLE if stable_val else constants.BG_STATUS_UNSTABLE
31 def all_stable(self, items: Iterable, field_name='status') -> bool:
32 return all(self.is_stable(q.get(field_name)) for q in items)
34 def all_stable_str(self, items: Iterable, field_name='status') -> str:
35 return self.to_bg_status_str(self.all_stable(items, field_name))
37 def create_last_successfully_run_status(self, action, last_run_successful_in=0, **_) -> dict:
38 if last_run_successful_in == 0:
39 return dict(
40 status=constants.BG_STATUS_STABLE,
41 last_run=None,
42 last_run_status=None,
43 err_msgs=[]
44 )
46 lr_query = (BackgroundJobQueryBuilder().action(action)
47 .since(dates.before_now(last_run_successful_in))
48 .status_includes([constants.BGJOB_STATUS_COMPLETE, constants.BGJOB_STATUS_ERROR])
49 .size(1)
50 .order_by('created_date', 'desc')
51 .build_query_dict())
53 lr_results = BackgroundJob.q2obj(q=lr_query)
54 lr_job = lr_results[0] if len(lr_results) > 0 else None
56 status = constants.BG_STATUS_UNSTABLE
57 lr = None
58 last_run_status = None
59 msg = ["No background jobs completed or errored in the time period"]
61 if lr_job is not None:
62 lr = lr_job.created_date
63 last_run_status = lr_job.status
64 if lr_job.status == constants.BGJOB_STATUS_COMPLETE:
65 status = constants.BG_STATUS_STABLE
66 msg = []
67 else:
68 msg = ["Last job did not complete successfully"]
70 return dict(
71 status=status,
72 last_run=lr,
73 last_run_status=last_run_status,
74 err_msgs=msg
75 )
77 def create_errors_status(self, action, check_sec=3600, allowed_num_err=0, **_) -> dict:
78 in_monitoring_query = SimpleBgjobQueue(action, status=constants.BGJOB_STATUS_ERROR, since=dates.before_now(check_sec))
79 num_err_in_monitoring = BackgroundJob.hit_count(query=in_monitoring_query.query())
81 # prepare errors messages
82 err_msgs = []
83 if num_err_in_monitoring > allowed_num_err:
84 err_msgs.append(f'too many error in monitoring period [{num_err_in_monitoring} > {allowed_num_err}]')
86 return dict(
87 status=self.to_bg_status_str(not err_msgs),
88 total=BackgroundJob.hit_count(query=SimpleBgjobQueue(action, status=constants.BGJOB_STATUS_ERROR).query()),
89 in_monitoring_period=num_err_in_monitoring,
90 err_msgs=err_msgs,
91 )
93 def create_queued_status(self, action, total=2, oldest=1200, **_) -> dict:
94 total_queued = BackgroundJob.hit_count(query=SimpleBgjobQueue(action, status=constants.BGJOB_STATUS_QUEUED).query())
95 oldest_query = (BackgroundJobQueryBuilder().action(action)
96 .status_includes(constants.BGJOB_STATUS_QUEUED).size(1)
97 .order_by('created_date', 'asc')
98 .build_query_dict())
99 oldest_jobs = list(BackgroundJob.q2obj(q=oldest_query))
100 oldest_job = oldest_jobs and oldest_jobs[0]
102 err_msgs = []
103 limited_oldest_date = dates.before_now(oldest)
104 if oldest_job and oldest_job.created_timestamp < limited_oldest_date:
105 err_msgs.append('outdated queued job found[{}]. created_timestamp[{} < {}]'.format(
106 oldest_job.id,
107 oldest_job.created_timestamp,
108 limited_oldest_date
109 ))
111 if total_queued > total:
112 err_msgs.append(f'too many queued job [{total_queued} > {total}]')
114 return dict(
115 status=self.to_bg_status_str(not err_msgs),
116 total=total_queued,
117 oldest=oldest_job.created_date if oldest_job else None,
118 err_msgs=err_msgs,
119 )
121 def create_queues_status(self, queue_name) -> dict:
122 # define last_completed_job
123 bgjob_list = BackgroundJob.q2obj(q=LastCompletedJobQuery(queue_name).query())
124 bgjob_list = list(bgjob_list)
125 if bgjob_list:
126 last_completed_date = bgjob_list[0].last_updated_timestamp
127 else:
128 last_completed_date = None
130 errors = {action: self.create_errors_status(action, **config) for action, config
131 in self.get_config_dict_by_queue_name('BG_MONITOR_ERRORS_CONFIG', queue_name).items()}
133 queued = {action: self.create_queued_status(action, **config) for action, config
134 in self.get_config_dict_by_queue_name('BG_MONITOR_QUEUED_CONFIG', queue_name).items()}
136 last_run = {action: self.create_last_successfully_run_status(action, **config) for action, config
137 in self.get_config_dict_by_queue_name('BG_MONITOR_LAST_SUCCESSFULLY_RUN_CONFIG', queue_name).items()}
139 # prepare for err_msgs
140 limited_sec = app.config.get('BG_MONITOR_LAST_COMPLETED', {}).get(queue_name)
141 if limited_sec is None:
142 app.logger.warning(f'BG_MONITOR_LAST_COMPLETED for {queue_name} not found ')
144 err_msgs = []
145 if limited_sec is not None and last_completed_date:
146 limited_date = dates.before_now(limited_sec)
147 if last_completed_date < limited_date:
148 err_msgs.append(
149 f'last completed job is too old. [{last_completed_date} < {limited_date}]'
150 )
152 result_dict = dict(
153 status=self.to_bg_status_str(
154 not err_msgs and self.all_stable(itertools.chain(errors.values(), queued.values(), last_run.values()))),
155 last_completed_job=last_completed_date and dates.format(last_completed_date),
156 errors=errors,
157 queued=queued,
158 last_run_successful=last_run,
159 err_msgs=err_msgs,
160 )
161 return result_dict
163 @staticmethod
164 def get_config_dict_by_queue_name(config_name, queue_name):
165 bg_specs = background_helper.get_all_background_task_specs()
166 actions = {action for qn, action, _ in bg_specs
167 if qn == queue_name}
169 return {
170 k: app.config.get(config_name, {}).get(k, app.config.get('BG_MONITOR_DEFAULT_CONFIG'))
171 for k in actions
172 }
174 def create_background_status(self) -> dict:
175 queues = {
176 queue_name: self.create_queues_status(queue_name)
177 for queue_name in [constants.BGJOB_QUEUE_ID_LONG,
178 constants.BGJOB_QUEUE_ID_MAIN,
179 constants.BGJOB_QUEUE_ID_EVENTS,
180 constants.BGJOB_QUEUE_ID_SCHEDULED_LONG,
181 constants.BGJOB_QUEUE_ID_SCHEDULED_SHORT]
182 }
184 result_dict = dict(
185 status=(self.all_stable_str(queues.values())),
186 queues=queues,
187 )
189 # sort the results in the order of unstable status
190 sorted_data = self.sort_dict_by_unstable_status(result_dict)
192 return sorted_data
194 def sort_dict_by_unstable_status(self, data):
195 """
196 Sorts each dictionary within the nested structure by prioritizing items with 'status': 'unstable'.
197 The overall structure of the input dictionary is preserved.
198 """
199 if isinstance(data, dict):
200 # Extract items with 'status': 'unstable' and other items
201 unstable_items = {k: v for k, v in data.items() if isinstance(v, dict) and v.get('status') == 'unstable'}
202 other_items = {k: v for k, v in data.items() if k not in unstable_items}
204 # Recursively sort nested dictionaries
205 for k in unstable_items:
206 unstable_items[k] = self.sort_dict_by_unstable_status(unstable_items[k])
207 for k in other_items:
208 other_items[k] = self.sort_dict_by_unstable_status(other_items[k])
210 # Merge the dictionaries, with unstable items first
211 return {**unstable_items, **other_items}
212 else:
213 # Return the item as is if it's not a dict
214 return data