Coverage for portality / background.py: 96%

129 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 00:09 +0100

1import logging 

2import traceback 

3from copy import deepcopy 

4from typing import Iterable 

5from typing import TYPE_CHECKING 

6 

7from portality.constants import BgjobOutcomeStatus 

8 

9if TYPE_CHECKING: 

10 from portality.models import BackgroundJob 

11 

12from flask_login import login_user 

13from huey import RedisHuey 

14from huey.exceptions import RetryTask 

15 

16from portality import constants 

17from portality import models 

18from portality.bll import DOAJ 

19from portality.core import app 

20 

21log = logging.getLogger(__name__) 

22 

23 

24class BackgroundException(Exception): 

25 pass 

26 

27 

28class BackgroundSummary(object): 

29 def __init__(self, job_id, affected=None, error=None): 

30 self.job_id = job_id 

31 self.affected = affected if affected is not None else {} 

32 self.error = error 

33 

34 def as_dict(self): 

35 return { 

36 "job_id": self.job_id, 

37 "affected": self.affected, 

38 "error": self.error 

39 } 

40 

41 

42class BackgroundApi(object): 

43 """ 

44 ~~BackgroundTasks:Feature~~ 

45 """ 

46 

47 @classmethod 

48 def execute(self, background_task: 'BackgroundTask'): 

49 # ~~->BackgroundTask:Process~~ 

50 # ~~->BackgroundJob:Model~~ 

51 job = background_task.background_job 

52 ctx = None 

53 acc = None 

54 if job.user is not None: 

55 ctx = app.test_request_context("/") 

56 ctx.push() 

57 # ~~-> Account:Model~~ 

58 acc = models.Account.pull(job.user) # FIXME: what happens when this is the "system" user 

59 if acc is not None: 

60 login_user(acc) 

61 

62 job.start() 

63 job.add_audit_message("Job Started") 

64 job.save() 

65 

66 try: 

67 background_task.run() 

68 if job.outcome_status == BgjobOutcomeStatus.Pending: 

69 job.outcome_status = BgjobOutcomeStatus.Success 

70 except RetryTask: 

71 if job.reference is None: 

72 job.reference = {} 

73 retries = job.reference.get("retries", 0) 

74 job.reference["retries"] = retries + 1 

75 job.save() 

76 raise 

77 except Exception as e: 

78 log.error(f"Error in Background Task: {e}") 

79 job.fail() 

80 job.add_audit_message("Error in Job Run") 

81 job.add_audit_message("Caught in job runner during run: " + traceback.format_exc()) 

82 job.add_audit_message("Job Run Completed") 

83 

84 job.add_audit_message("Cleanup Started") 

85 try: 

86 background_task.cleanup() 

87 except Exception as e: 

88 job.fail() 

89 job.add_audit_message("Error in Cleanup Run") 

90 job.add_audit_message("Caught in job runner during cleanup: " + traceback.format_exc()) 

91 job.add_audit_message("Job Cleanup Completed") 

92 

93 job.add_audit_message("Job Finished") 

94 if not job.is_failed(): 

95 job.success() 

96 job.save() 

97 

98 # post-execution hook (may be overridden by task implementations) 

99 background_task.post_execute() 

100 

101 if ctx is not None: 

102 ctx.pop() 

103 

104 

105class BackgroundTask(object): 

106 """ 

107 All background tasks should extend from this object and override at least the following methods: 

108 

109 - run 

110 - cleanup 

111 - prepare (class method) 

112 

113 ~~BackgroundTask:Process~~ 

114 """ 

115 

116 __action__ = None 

117 """ static member variable defining the name of this task """ 

118 

119 def __init__(self, background_job: 'BackgroundJob'): 

120 self._background_job = background_job 

121 

122 @property 

123 def background_job(self) -> 'BackgroundJob': 

124 return self._background_job 

125 

126 def run(self): 

127 """ 

128 Execute the task as specified by the background_job 

129 :return: 

130 """ 

131 raise NotImplementedError() 

132 

133 def cleanup(self): 

134 """ 

135 Cleanup after a successful OR failed run of the task 

136 :return: 

137 """ 

138 raise NotImplementedError() 

139 

140 def post_execute(self): 

141 """ 

142 Hook executed after the job has been run and cleaned up. 

143 Default implementation triggers BACKGROUND_JOB_FINISHED event. 

144 Subclasses can override to change or extend the behaviour. 

145 """ 

146 job = self.background_job 

147 # trigger a status change event 

148 jdata = deepcopy(job.data) 

149 if "audit" in jdata: 

150 del jdata["audit"] 

151 eventsSvc = DOAJ.eventsService() 

152 eventsSvc.trigger(models.Event(constants.BACKGROUND_JOB_FINISHED, job.user, { 

153 "job": jdata 

154 })) 

155 

156 @classmethod 

157 def prepare(cls, username, **kwargs): 

158 """ 

159 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob, 

160 or fail with a suitable exception 

161 

162 :param username: the user creating the job 

163 :param kwargs: arbitrary keyword arguments pertaining to this task type 

164 :return: a BackgroundJob instance representing this task 

165 """ 

166 raise NotImplementedError() 

167 

168 @classmethod 

169 def submit(cls, background_job): 

170 """ 

171 Submit the specified BackgroundJob to the background queue 

172 

173 :param background_job: the BackgroundJob instance 

174 :return: 

175 """ 

176 raise NotImplementedError() 

177 

178 @classmethod 

179 def get_param(cls, params, param_name, default=None): 

180 return params.get('{}__{}'.format(cls.__action__, param_name), default) 

181 

182 @classmethod 

183 def set_param(cls, params, param_name, value): 

184 params['{}__{}'.format(cls.__action__, param_name)] = value 

185 

186 @classmethod 

187 def create_job_params(cls, **raw_param_dict: dict): 

188 new_param = {} 

189 for k, v in raw_param_dict.items(): 

190 cls.set_param(new_param, k, v) 

191 return new_param 

192 

193 def get_bgjob_params(self) -> dict: 

194 keys = self.background_job.params.keys() 

195 keys = (k.replace('{}__'.format(self.__action__), '') for k in keys) 

196 return {k: self.get_param(self.background_job.params, k) for k in keys} 

197 

198 @classmethod 

199 def set_reference(cls, refs, ref_name, value): 

200 refs['{}__{}'.format(cls.__action__, ref_name)] = value 

201 

202 @classmethod 

203 def create_huey_helper(cls, task_queue: RedisHuey): 

204 from portality.tasks.helpers import background_helper 

205 return background_helper.RedisHueyTaskHelper(task_queue, cls) 

206 

207 

208class AdminBackgroundTask(BackgroundTask): 

209 """~~AdminBackgroundTask:Process->BackgroundTask:Process~~""" 

210 

211 @classmethod 

212 def check_admin_privilege(cls, username): 

213 # ~~->Account:Model~~ 

214 a = models.Account.pull(username) 

215 if a is None: 

216 # very unlikely, as they would have had to log in to get to here.. 

217 raise BackgroundException('Admin account that is being used to prepare this job does not exist.') 

218 

219 if not a.has_role('admin'): 

220 raise BackgroundException('Account {} is not permitted to run this background task.'.format(username)) 

221 

222 @classmethod 

223 def prepare(cls, username, **kwargs): 

224 cls.check_admin_privilege(username)