Coverage for portality / tasks / helpers / background_helper.py: 87%
133 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
1""" collections of wrapper function for helping you to create BackgroundTask
2~~BackgroundTasks:Feature~~
3"""
4import inspect
5import pkgutil
6from typing import Callable, Type, Iterable, Tuple
8from huey import RedisHuey
10import portality.tasks
11from portality import models, constants
12from portality.background import BackgroundApi, BackgroundTask
13from portality.core import app
14from portality.decorators import write_required
15from portality.tasks.redis_huey import long_running, main_queue, events_queue, scheduled_long_queue, scheduled_short_queue, configure, schedule
17TaskFactory = Callable[[models.BackgroundJob], BackgroundTask]
18_queue_for_action = None
21def get_queue_id_by_task_queue(task_queue: RedisHuey):
22 if task_queue is None:
23 return constants.BGJOB_QUEUE_ID_UNKNOWN
24 elif task_queue.name == long_running.name:
25 return constants.BGJOB_QUEUE_ID_LONG
26 elif task_queue.name == main_queue.name:
27 return constants.BGJOB_QUEUE_ID_MAIN
28 elif task_queue.name == events_queue.name:
29 return constants.BGJOB_QUEUE_ID_EVENTS
30 elif task_queue.name == scheduled_long_queue.name:
31 return constants.BGJOB_QUEUE_ID_SCHEDULED_LONG
32 elif task_queue.name == scheduled_short_queue.name:
33 return constants.BGJOB_QUEUE_ID_SCHEDULED_SHORT
34 else:
35 app.logger.warning(f'unknown task_queue[{task_queue}]')
36 return constants.BGJOB_QUEUE_ID_UNKNOWN
39def create_job(username, action,
40 queue_id=constants.BGJOB_QUEUE_ID_UNKNOWN,
41 task_queue: RedisHuey = None,
42 params=None):
43 """ Common way to create BackgroundJob
44 """
45 job = models.BackgroundJob()
46 job.user = username
47 job.action = action
48 if params is not None:
49 job.params = params
51 if task_queue is not None:
52 queue_id = get_queue_id_by_task_queue(task_queue)
53 job.queue_id = queue_id
54 return job
57def submit_by_bg_task_type(background_task: Type[BackgroundTask], **prepare_kwargs):
58 """ Common way for BackgroundTask register_schedule """
59 user = app.config.get("SYSTEM_USERNAME")
60 job = background_task.prepare(user, **prepare_kwargs)
61 background_task.submit(job)
64def execute_by_job_id(job_id, task_factory: TaskFactory):
65 """ Common way to execute BackgroundTask by job_id """
66 job = models.BackgroundJob.pull(job_id)
67 task = task_factory(job)
68 BackgroundApi.execute(task)
71def execute_by_bg_task_type(bg_task_type: Type[BackgroundTask], job_wrapper=None, **prepare_kwargs):
72 """ wrapper for execute by BackgroundTask """
73 user = app.config.get("SYSTEM_USERNAME")
74 job = bg_task_type.prepare(user, **prepare_kwargs)
75 if job_wrapper is not None:
76 job = job_wrapper(job)
77 task = bg_task_type(job)
78 BackgroundApi.execute(task)
80 return task
83def register_execute(task_queue, task_name=None, script=True):
84 """
85 decorator for register background job execute function
86 """
88 def wrapper(fn):
89 if task_name:
90 conf = configure(task_name)
91 else:
92 conf = {}
94 fn = write_required(script=script)(fn)
95 try:
96 fn = task_queue.task(**conf)(fn)
97 except ValueError:
98 # It's already registered - that's okay, we've probably accessed the _status endpoint and loaded the module
99 return None
100 return fn
102 return wrapper
105class RedisHueyTaskHelper:
106 """
107 some shortcut functions that help you implement functions that background job needed
108 """
110 def __init__(self, task_queue: RedisHuey, bgtask: Type[BackgroundTask]):
111 self.task_queue = task_queue
112 self.task_name = bgtask.__action__
113 self.task_factory = bgtask
115 @property
116 def queue_id(self):
117 return get_queue_id_by_task_queue(self.task_queue)
119 def register_schedule(self, fn):
120 fn = write_required(script=True)(fn)
121 try:
122 fn = self.task_queue.periodic_task(schedule(self.task_name))(fn)
123 except ValueError:
124 # It's already registered - that's okay, we've probably accessed the _status endpoint and loaded the module
125 return None
126 return fn
128 def register_execute(self, is_load_config=False):
129 return register_execute(self.task_queue,
130 self.task_name if is_load_config else None)
132 def execute_common(self, job_id):
133 """ Common way to execute BackgroundTask by job_id """
134 execute_by_job_id(job_id, self.task_factory)
136 def scheduled_common(self, **prepare_kwargs):
137 """ Common way for BackgroundTask register_schedule """
138 submit_by_bg_task_type(self.task_factory, **prepare_kwargs)
141def _get_background_task_spec(module):
142 queue_id = None
143 task_name = None
144 bg_class = None
145 for n, member in inspect.getmembers(module):
146 if isinstance(member, RedisHuey):
147 queue_id = get_queue_id_by_task_queue(member)
148 elif (
149 inspect.isclass(member)
150 and issubclass(member, BackgroundTask)
151 and member != BackgroundTask
152 ):
153 task_name = getattr(member, '__action__', None)
154 bg_class = member
156 if queue_id and task_name and bg_class:
157 return queue_id, task_name, bg_class
159 return None
162def lookup_queue_for_action(action):
163 """ Find which queue an action is registered to, by action name """
164 """ Inspect the background tasks to find some useful details. Store in a singleton to reduce work. """
165 global _queue_for_action
167 if _queue_for_action is None:
168 _queue_for_action = {_action: _queue for _queue, _action, _class in get_all_background_task_specs()}
170 return _queue_for_action.get(action, constants.BGJOB_QUEUE_ID_UNKNOWN)
173def get_all_background_task_specs() -> Iterable[Tuple[str, str, Type]]:
174 def _load_bgtask_safe(_mi):
175 try:
176 return _mi.module_finder.find_spec(_mi.name).loader.load_module(_mi.name)
177 except RuntimeError as e:
178 if 'No configuration for scheduled action' in str(e):
179 app.logger.warning(f'config for {_mi.name} not found')
180 return None
181 raise e
183 module_infos = (m for m in pkgutil.walk_packages(portality.tasks.__path__) if not m.ispkg)
184 modules = (_load_bgtask_safe(mi) for mi in module_infos)
185 modules = filter(None, modules)
186 bgspec_list = map(_get_background_task_spec, modules)
187 bgspec_list = filter(None, bgspec_list)
188 return bgspec_list
191def get_value_safe(key, default_v, kwargs, default_cond_fn=None):
192 """ get value from kwargs and return default_v if condition match
193 """
194 v = kwargs.get(key, default_v)
195 default_cond_fn = default_cond_fn or (lambda _v: _v is None)
196 if default_cond_fn(v):
197 v = default_v
198 return v
201def submit_by_background_job(background_job, execute_fn):
202 """ Common way of `BackgroundTask.submit`
203 """
204 background_job.save()
205 execute_fn.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10))
208def create_execute_fn(task_queue: RedisHuey, task_factory: TaskFactory, task_name=None, script=True):
209 """ Common way to create execute_fn for BackgroundTask """
211 @register_execute(task_queue, task_name=task_name, script=script)
212 def _execute_fn(job_id):
213 execute_by_job_id(job_id, task_factory)
215 return _execute_fn