Coverage for portality / background.py: 96%
129 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
1import logging
2import traceback
3from copy import deepcopy
4from typing import Iterable
5from typing import TYPE_CHECKING
7from portality.constants import BgjobOutcomeStatus
9if TYPE_CHECKING:
10 from portality.models import BackgroundJob
12from flask_login import login_user
13from huey import RedisHuey
14from huey.exceptions import RetryTask
16from portality import constants
17from portality import models
18from portality.bll import DOAJ
19from portality.core import app
21log = logging.getLogger(__name__)
24class BackgroundException(Exception):
25 pass
28class BackgroundSummary(object):
29 def __init__(self, job_id, affected=None, error=None):
30 self.job_id = job_id
31 self.affected = affected if affected is not None else {}
32 self.error = error
34 def as_dict(self):
35 return {
36 "job_id": self.job_id,
37 "affected": self.affected,
38 "error": self.error
39 }
42class BackgroundApi(object):
43 """
44 ~~BackgroundTasks:Feature~~
45 """
47 @classmethod
48 def execute(self, background_task: 'BackgroundTask'):
49 # ~~->BackgroundTask:Process~~
50 # ~~->BackgroundJob:Model~~
51 job = background_task.background_job
52 ctx = None
53 acc = None
54 if job.user is not None:
55 ctx = app.test_request_context("/")
56 ctx.push()
57 # ~~-> Account:Model~~
58 acc = models.Account.pull(job.user) # FIXME: what happens when this is the "system" user
59 if acc is not None:
60 login_user(acc)
62 job.start()
63 job.add_audit_message("Job Started")
64 job.save()
66 try:
67 background_task.run()
68 if job.outcome_status == BgjobOutcomeStatus.Pending:
69 job.outcome_status = BgjobOutcomeStatus.Success
70 except RetryTask:
71 if job.reference is None:
72 job.reference = {}
73 retries = job.reference.get("retries", 0)
74 job.reference["retries"] = retries + 1
75 job.save()
76 raise
77 except Exception as e:
78 log.error(f"Error in Background Task: {e}")
79 job.fail()
80 job.add_audit_message("Error in Job Run")
81 job.add_audit_message("Caught in job runner during run: " + traceback.format_exc())
82 job.add_audit_message("Job Run Completed")
84 job.add_audit_message("Cleanup Started")
85 try:
86 background_task.cleanup()
87 except Exception as e:
88 job.fail()
89 job.add_audit_message("Error in Cleanup Run")
90 job.add_audit_message("Caught in job runner during cleanup: " + traceback.format_exc())
91 job.add_audit_message("Job Cleanup Completed")
93 job.add_audit_message("Job Finished")
94 if not job.is_failed():
95 job.success()
96 job.save()
98 # post-execution hook (may be overridden by task implementations)
99 background_task.post_execute()
101 if ctx is not None:
102 ctx.pop()
105class BackgroundTask(object):
106 """
107 All background tasks should extend from this object and override at least the following methods:
109 - run
110 - cleanup
111 - prepare (class method)
113 ~~BackgroundTask:Process~~
114 """
116 __action__ = None
117 """ static member variable defining the name of this task """
119 def __init__(self, background_job: 'BackgroundJob'):
120 self._background_job = background_job
122 @property
123 def background_job(self) -> 'BackgroundJob':
124 return self._background_job
126 def run(self):
127 """
128 Execute the task as specified by the background_job
129 :return:
130 """
131 raise NotImplementedError()
133 def cleanup(self):
134 """
135 Cleanup after a successful OR failed run of the task
136 :return:
137 """
138 raise NotImplementedError()
140 def post_execute(self):
141 """
142 Hook executed after the job has been run and cleaned up.
143 Default implementation triggers BACKGROUND_JOB_FINISHED event.
144 Subclasses can override to change or extend the behaviour.
145 """
146 job = self.background_job
147 # trigger a status change event
148 jdata = deepcopy(job.data)
149 if "audit" in jdata:
150 del jdata["audit"]
151 eventsSvc = DOAJ.eventsService()
152 eventsSvc.trigger(models.Event(constants.BACKGROUND_JOB_FINISHED, job.user, {
153 "job": jdata
154 }))
156 @classmethod
157 def prepare(cls, username, **kwargs):
158 """
159 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob,
160 or fail with a suitable exception
162 :param username: the user creating the job
163 :param kwargs: arbitrary keyword arguments pertaining to this task type
164 :return: a BackgroundJob instance representing this task
165 """
166 raise NotImplementedError()
168 @classmethod
169 def submit(cls, background_job):
170 """
171 Submit the specified BackgroundJob to the background queue
173 :param background_job: the BackgroundJob instance
174 :return:
175 """
176 raise NotImplementedError()
178 @classmethod
179 def get_param(cls, params, param_name, default=None):
180 return params.get('{}__{}'.format(cls.__action__, param_name), default)
182 @classmethod
183 def set_param(cls, params, param_name, value):
184 params['{}__{}'.format(cls.__action__, param_name)] = value
186 @classmethod
187 def create_job_params(cls, **raw_param_dict: dict):
188 new_param = {}
189 for k, v in raw_param_dict.items():
190 cls.set_param(new_param, k, v)
191 return new_param
193 def get_bgjob_params(self) -> dict:
194 keys = self.background_job.params.keys()
195 keys = (k.replace('{}__'.format(self.__action__), '') for k in keys)
196 return {k: self.get_param(self.background_job.params, k) for k in keys}
198 @classmethod
199 def set_reference(cls, refs, ref_name, value):
200 refs['{}__{}'.format(cls.__action__, ref_name)] = value
202 @classmethod
203 def create_huey_helper(cls, task_queue: RedisHuey):
204 from portality.tasks.helpers import background_helper
205 return background_helper.RedisHueyTaskHelper(task_queue, cls)
208class AdminBackgroundTask(BackgroundTask):
209 """~~AdminBackgroundTask:Process->BackgroundTask:Process~~"""
211 @classmethod
212 def check_admin_privilege(cls, username):
213 # ~~->Account:Model~~
214 a = models.Account.pull(username)
215 if a is None:
216 # very unlikely, as they would have had to log in to get to here..
217 raise BackgroundException('Admin account that is being used to prepare this job does not exist.')
219 if not a.has_role('admin'):
220 raise BackgroundException('Account {} is not permitted to run this background task.'.format(username))
222 @classmethod
223 def prepare(cls, username, **kwargs):
224 cls.check_admin_privilege(username)