Coverage for portality / bll / services / background_task_status.py: 99%

94 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-04 09:41 +0100

1""" 

2~~BackgroundTask:Monitoring~~ 

3""" 

4import itertools 

5from typing import Iterable 

6 

7from portality import constants 

8# from portality.constants import BGJOB_QUEUE_ID_LONG, BGJOB_QUEUE_ID_MAIN, BGJOB_QUEUE_ID_EVENTS, BGJOB_QUEUE_ID_SCHEDULED_LONG, BGJOB_QUEUE_ID_SCHEDULED_SHORT, BGJOB_STATUS_ERROR, BGJOB_STATUS_QUEUED, \ 

9# BG_STATUS_STABLE, BG_STATUS_UNSTABLE, BGJOB_STATUS_COMPLETE 

10from portality.core import app 

11from portality.lib import dates 

12from portality.models.background import BackgroundJobQueryBuilder, BackgroundJob, SimpleBgjobQueue, \ 

13 LastCompletedJobQuery 

14from portality.tasks.helpers import background_helper 

15 

16 

17class BackgroundTaskStatusService: 

18 """ 

19 "background_task_status" is concept for list current background task status for monitoring. 

20 `create_background_status` is important function in service for create background_status data 

21 """ 

22 

23 @staticmethod 

24 def is_stable(val): 

25 return val == constants.BG_STATUS_STABLE 

26 

27 @staticmethod 

28 def to_bg_status_str(stable_val: bool) -> str: 

29 return constants.BG_STATUS_STABLE if stable_val else constants.BG_STATUS_UNSTABLE 

30 

31 def all_stable(self, items: Iterable, field_name='status') -> bool: 

32 return all(self.is_stable(q.get(field_name)) for q in items) 

33 

34 def all_stable_str(self, items: Iterable, field_name='status') -> str: 

35 return self.to_bg_status_str(self.all_stable(items, field_name)) 

36 

37 def create_last_successfully_run_status(self, action, last_run_successful_in=0, **_) -> dict: 

38 if last_run_successful_in == 0: 

39 return dict( 

40 status=constants.BG_STATUS_STABLE, 

41 last_run=None, 

42 last_run_status=None, 

43 err_msgs=[] 

44 ) 

45 

46 lr_query = (BackgroundJobQueryBuilder().action(action) 

47 .since(dates.before_now(last_run_successful_in)) 

48 .status_includes([constants.BGJOB_STATUS_COMPLETE, constants.BGJOB_STATUS_ERROR]) 

49 .size(1) 

50 .order_by('created_date', 'desc') 

51 .build_query_dict()) 

52 

53 lr_results = BackgroundJob.q2obj(q=lr_query) 

54 lr_job = lr_results[0] if len(lr_results) > 0 else None 

55 

56 status = constants.BG_STATUS_UNSTABLE 

57 lr = None 

58 last_run_status = None 

59 msg = ["No background jobs completed or errored in the time period"] 

60 

61 if lr_job is not None: 

62 lr = lr_job.created_date 

63 last_run_status = lr_job.status 

64 if lr_job.status == constants.BGJOB_STATUS_COMPLETE: 

65 status = constants.BG_STATUS_STABLE 

66 msg = [] 

67 else: 

68 msg = ["Last job did not complete successfully"] 

69 

70 return dict( 

71 status=status, 

72 last_run=lr, 

73 last_run_status=last_run_status, 

74 err_msgs=msg 

75 ) 

76 

77 def create_errors_status(self, action, check_sec=3600, allowed_num_err=0, **_) -> dict: 

78 in_monitoring_query = SimpleBgjobQueue(action, status=constants.BGJOB_STATUS_ERROR, since=dates.before_now(check_sec)) 

79 num_err_in_monitoring = BackgroundJob.hit_count(query=in_monitoring_query.query()) 

80 

81 # prepare errors messages 

82 err_msgs = [] 

83 if num_err_in_monitoring > allowed_num_err: 

84 err_msgs.append(f'too many error in monitoring period [{num_err_in_monitoring} > {allowed_num_err}]') 

85 

86 return dict( 

87 status=self.to_bg_status_str(not err_msgs), 

88 total=BackgroundJob.hit_count(query=SimpleBgjobQueue(action, status=constants.BGJOB_STATUS_ERROR).query()), 

89 in_monitoring_period=num_err_in_monitoring, 

90 err_msgs=err_msgs, 

91 ) 

92 

93 def create_queued_status(self, action, total=2, oldest=1200, **_) -> dict: 

94 total_queued = BackgroundJob.hit_count(query=SimpleBgjobQueue(action, status=constants.BGJOB_STATUS_QUEUED).query()) 

95 oldest_query = (BackgroundJobQueryBuilder().action(action) 

96 .status_includes(constants.BGJOB_STATUS_QUEUED).size(1) 

97 .order_by('created_date', 'asc') 

98 .build_query_dict()) 

99 oldest_jobs = list(BackgroundJob.q2obj(q=oldest_query)) 

100 oldest_job = oldest_jobs and oldest_jobs[0] 

101 

102 err_msgs = [] 

103 limited_oldest_date = dates.before_now(oldest) 

104 if oldest_job and oldest_job.created_timestamp < limited_oldest_date: 

105 err_msgs.append('outdated queued job found[{}]. created_timestamp[{} < {}]'.format( 

106 oldest_job.id, 

107 oldest_job.created_timestamp, 

108 limited_oldest_date 

109 )) 

110 

111 if total_queued > total: 

112 err_msgs.append(f'too many queued job [{total_queued} > {total}]') 

113 

114 return dict( 

115 status=self.to_bg_status_str(not err_msgs), 

116 total=total_queued, 

117 oldest=oldest_job.created_date if oldest_job else None, 

118 err_msgs=err_msgs, 

119 ) 

120 

121 def create_queues_status(self, queue_name) -> dict: 

122 # define last_completed_job 

123 bgjob_list = BackgroundJob.q2obj(q=LastCompletedJobQuery(queue_name).query()) 

124 bgjob_list = list(bgjob_list) 

125 if bgjob_list: 

126 last_completed_date = bgjob_list[0].last_updated_timestamp 

127 else: 

128 last_completed_date = None 

129 

130 errors = {action: self.create_errors_status(action, **config) for action, config 

131 in self.get_config_dict_by_queue_name('BG_MONITOR_ERRORS_CONFIG', queue_name).items()} 

132 

133 queued = {action: self.create_queued_status(action, **config) for action, config 

134 in self.get_config_dict_by_queue_name('BG_MONITOR_QUEUED_CONFIG', queue_name).items()} 

135 

136 last_run = {action: self.create_last_successfully_run_status(action, **config) for action, config 

137 in self.get_config_dict_by_queue_name('BG_MONITOR_LAST_SUCCESSFULLY_RUN_CONFIG', queue_name).items()} 

138 

139 # prepare for err_msgs 

140 limited_sec = app.config.get('BG_MONITOR_LAST_COMPLETED', {}).get(queue_name) 

141 if limited_sec is None: 

142 app.logger.warning(f'BG_MONITOR_LAST_COMPLETED for {queue_name} not found ') 

143 

144 err_msgs = [] 

145 if limited_sec is not None and last_completed_date: 

146 limited_date = dates.before_now(limited_sec) 

147 if last_completed_date < limited_date: 

148 err_msgs.append( 

149 f'last completed job is too old. [{last_completed_date} < {limited_date}]' 

150 ) 

151 

152 result_dict = dict( 

153 status=self.to_bg_status_str( 

154 not err_msgs and self.all_stable(itertools.chain(errors.values(), queued.values(), last_run.values()))), 

155 last_completed_job=last_completed_date and dates.format(last_completed_date), 

156 errors=errors, 

157 queued=queued, 

158 last_run_successful=last_run, 

159 err_msgs=err_msgs, 

160 ) 

161 return result_dict 

162 

163 @staticmethod 

164 def get_config_dict_by_queue_name(config_name, queue_name): 

165 bg_specs = background_helper.get_all_background_task_specs() 

166 actions = {action for qn, action, _ in bg_specs 

167 if qn == queue_name} 

168 

169 return { 

170 k: app.config.get(config_name, {}).get(k, app.config.get('BG_MONITOR_DEFAULT_CONFIG')) 

171 for k in actions 

172 } 

173 

174 def create_background_status(self) -> dict: 

175 queues = { 

176 queue_name: self.create_queues_status(queue_name) 

177 for queue_name in [constants.BGJOB_QUEUE_ID_LONG, 

178 constants.BGJOB_QUEUE_ID_MAIN, 

179 constants.BGJOB_QUEUE_ID_EVENTS, 

180 constants.BGJOB_QUEUE_ID_SCHEDULED_LONG, 

181 constants.BGJOB_QUEUE_ID_SCHEDULED_SHORT] 

182 } 

183 

184 result_dict = dict( 

185 status=(self.all_stable_str(queues.values())), 

186 queues=queues, 

187 ) 

188 

189 # sort the results in the order of unstable status 

190 sorted_data = self.sort_dict_by_unstable_status(result_dict) 

191 

192 return sorted_data 

193 

194 def sort_dict_by_unstable_status(self, data): 

195 """ 

196 Sorts each dictionary within the nested structure by prioritizing items with 'status': 'unstable'. 

197 The overall structure of the input dictionary is preserved. 

198 """ 

199 if isinstance(data, dict): 

200 # Extract items with 'status': 'unstable' and other items 

201 unstable_items = {k: v for k, v in data.items() if isinstance(v, dict) and v.get('status') == 'unstable'} 

202 other_items = {k: v for k, v in data.items() if k not in unstable_items} 

203 

204 # Recursively sort nested dictionaries 

205 for k in unstable_items: 

206 unstable_items[k] = self.sort_dict_by_unstable_status(unstable_items[k]) 

207 for k in other_items: 

208 other_items[k] = self.sort_dict_by_unstable_status(other_items[k]) 

209 

210 # Merge the dictionaries, with unstable items first 

211 return {**unstable_items, **other_items} 

212 else: 

213 # Return the item as is if it's not a dict 

214 return data