Coverage for portality / tasks / helpers / background_helper.py: 87%

133 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-04 09:41 +0100

1""" collections of wrapper function for helping you to create BackgroundTask 

2~~BackgroundTasks:Feature~~ 

3""" 

4import inspect 

5import pkgutil 

6from typing import Callable, Type, Iterable, Tuple 

7 

8from huey import RedisHuey 

9 

10import portality.tasks 

11from portality import models, constants 

12from portality.background import BackgroundApi, BackgroundTask 

13from portality.core import app 

14from portality.decorators import write_required 

15from portality.tasks.redis_huey import long_running, main_queue, events_queue, scheduled_long_queue, scheduled_short_queue, configure, schedule 

16 

17TaskFactory = Callable[[models.BackgroundJob], BackgroundTask] 

18_queue_for_action = None 

19 

20 

21def get_queue_id_by_task_queue(task_queue: RedisHuey): 

22 if task_queue is None: 

23 return constants.BGJOB_QUEUE_ID_UNKNOWN 

24 elif task_queue.name == long_running.name: 

25 return constants.BGJOB_QUEUE_ID_LONG 

26 elif task_queue.name == main_queue.name: 

27 return constants.BGJOB_QUEUE_ID_MAIN 

28 elif task_queue.name == events_queue.name: 

29 return constants.BGJOB_QUEUE_ID_EVENTS 

30 elif task_queue.name == scheduled_long_queue.name: 

31 return constants.BGJOB_QUEUE_ID_SCHEDULED_LONG 

32 elif task_queue.name == scheduled_short_queue.name: 

33 return constants.BGJOB_QUEUE_ID_SCHEDULED_SHORT 

34 else: 

35 app.logger.warning(f'unknown task_queue[{task_queue}]') 

36 return constants.BGJOB_QUEUE_ID_UNKNOWN 

37 

38 

39def create_job(username, action, 

40 queue_id=constants.BGJOB_QUEUE_ID_UNKNOWN, 

41 task_queue: RedisHuey = None, 

42 params=None): 

43 """ Common way to create BackgroundJob 

44 """ 

45 job = models.BackgroundJob() 

46 job.user = username 

47 job.action = action 

48 if params is not None: 

49 job.params = params 

50 

51 if task_queue is not None: 

52 queue_id = get_queue_id_by_task_queue(task_queue) 

53 job.queue_id = queue_id 

54 return job 

55 

56 

57def submit_by_bg_task_type(background_task: Type[BackgroundTask], **prepare_kwargs): 

58 """ Common way for BackgroundTask register_schedule """ 

59 user = app.config.get("SYSTEM_USERNAME") 

60 job = background_task.prepare(user, **prepare_kwargs) 

61 background_task.submit(job) 

62 

63 

64def execute_by_job_id(job_id, task_factory: TaskFactory): 

65 """ Common way to execute BackgroundTask by job_id """ 

66 job = models.BackgroundJob.pull(job_id) 

67 task = task_factory(job) 

68 BackgroundApi.execute(task) 

69 

70 

71def execute_by_bg_task_type(bg_task_type: Type[BackgroundTask], job_wrapper=None, **prepare_kwargs): 

72 """ wrapper for execute by BackgroundTask """ 

73 user = app.config.get("SYSTEM_USERNAME") 

74 job = bg_task_type.prepare(user, **prepare_kwargs) 

75 if job_wrapper is not None: 

76 job = job_wrapper(job) 

77 task = bg_task_type(job) 

78 BackgroundApi.execute(task) 

79 

80 return task 

81 

82 

83def register_execute(task_queue, task_name=None, script=True): 

84 """ 

85 decorator for register background job execute function 

86 """ 

87 

88 def wrapper(fn): 

89 if task_name: 

90 conf = configure(task_name) 

91 else: 

92 conf = {} 

93 

94 fn = write_required(script=script)(fn) 

95 try: 

96 fn = task_queue.task(**conf)(fn) 

97 except ValueError: 

98 # It's already registered - that's okay, we've probably accessed the _status endpoint and loaded the module 

99 return None 

100 return fn 

101 

102 return wrapper 

103 

104 

105class RedisHueyTaskHelper: 

106 """ 

107 some shortcut functions that help you implement functions that background job needed 

108 """ 

109 

110 def __init__(self, task_queue: RedisHuey, bgtask: Type[BackgroundTask]): 

111 self.task_queue = task_queue 

112 self.task_name = bgtask.__action__ 

113 self.task_factory = bgtask 

114 

115 @property 

116 def queue_id(self): 

117 return get_queue_id_by_task_queue(self.task_queue) 

118 

119 def register_schedule(self, fn): 

120 fn = write_required(script=True)(fn) 

121 try: 

122 fn = self.task_queue.periodic_task(schedule(self.task_name))(fn) 

123 except ValueError: 

124 # It's already registered - that's okay, we've probably accessed the _status endpoint and loaded the module 

125 return None 

126 return fn 

127 

128 def register_execute(self, is_load_config=False): 

129 return register_execute(self.task_queue, 

130 self.task_name if is_load_config else None) 

131 

132 def execute_common(self, job_id): 

133 """ Common way to execute BackgroundTask by job_id """ 

134 execute_by_job_id(job_id, self.task_factory) 

135 

136 def scheduled_common(self, **prepare_kwargs): 

137 """ Common way for BackgroundTask register_schedule """ 

138 submit_by_bg_task_type(self.task_factory, **prepare_kwargs) 

139 

140 

141def _get_background_task_spec(module): 

142 queue_id = None 

143 task_name = None 

144 bg_class = None 

145 for n, member in inspect.getmembers(module): 

146 if isinstance(member, RedisHuey): 

147 queue_id = get_queue_id_by_task_queue(member) 

148 elif ( 

149 inspect.isclass(member) 

150 and issubclass(member, BackgroundTask) 

151 and member != BackgroundTask 

152 ): 

153 task_name = getattr(member, '__action__', None) 

154 bg_class = member 

155 

156 if queue_id and task_name and bg_class: 

157 return queue_id, task_name, bg_class 

158 

159 return None 

160 

161 

162def lookup_queue_for_action(action): 

163 """ Find which queue an action is registered to, by action name """ 

164 """ Inspect the background tasks to find some useful details. Store in a singleton to reduce work. """ 

165 global _queue_for_action 

166 

167 if _queue_for_action is None: 

168 _queue_for_action = {_action: _queue for _queue, _action, _class in get_all_background_task_specs()} 

169 

170 return _queue_for_action.get(action, constants.BGJOB_QUEUE_ID_UNKNOWN) 

171 

172 

173def get_all_background_task_specs() -> Iterable[Tuple[str, str, Type]]: 

174 def _load_bgtask_safe(_mi): 

175 try: 

176 return _mi.module_finder.find_spec(_mi.name).loader.load_module(_mi.name) 

177 except RuntimeError as e: 

178 if 'No configuration for scheduled action' in str(e): 

179 app.logger.warning(f'config for {_mi.name} not found') 

180 return None 

181 raise e 

182 

183 module_infos = (m for m in pkgutil.walk_packages(portality.tasks.__path__) if not m.ispkg) 

184 modules = (_load_bgtask_safe(mi) for mi in module_infos) 

185 modules = filter(None, modules) 

186 bgspec_list = map(_get_background_task_spec, modules) 

187 bgspec_list = filter(None, bgspec_list) 

188 return bgspec_list 

189 

190 

191def get_value_safe(key, default_v, kwargs, default_cond_fn=None): 

192 """ get value from kwargs and return default_v if condition match 

193 """ 

194 v = kwargs.get(key, default_v) 

195 default_cond_fn = default_cond_fn or (lambda _v: _v is None) 

196 if default_cond_fn(v): 

197 v = default_v 

198 return v 

199 

200 

201def submit_by_background_job(background_job, execute_fn): 

202 """ Common way of `BackgroundTask.submit` 

203 """ 

204 background_job.save() 

205 execute_fn.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) 

206 

207 

208def create_execute_fn(task_queue: RedisHuey, task_factory: TaskFactory, task_name=None, script=True): 

209 """ Common way to create execute_fn for BackgroundTask """ 

210 

211 @register_execute(task_queue, task_name=task_name, script=script) 

212 def _execute_fn(job_id): 

213 execute_by_job_id(job_id, task_factory) 

214 

215 return _execute_fn