Coverage for portality / models / background.py: 87%

181 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 00:09 +0100

1import datetime 

2 

3from portality import dao 

4from portality.constants import BgjobOutcomeStatus 

5from portality.core import app 

6from portality.lib import dataobj, dates, es_data_mapping, seamless 

7 

8 

9class BackgroundJob(dataobj.DataObj, dao.DomainObject): 

10 """ 

11 # ~~BackgroundJob:Model~~ 

12 """ 

13 __type__ = "background_job" 

14 

15 def __init__(self, **kwargs): 

16 # FIXME: hack, to deal with ES integration layer being improperly abstracted 

17 if "_source" in kwargs: 

18 kwargs = kwargs["_source"] 

19 

20 self._add_struct(BACKGROUND_STRUCT) 

21 if "status" not in kwargs: 

22 kwargs["status"] = "queued" 

23 

24 # to audosave we need to move the object over to Seamless 

25 # self._autosave = False 

26 # self._audit_log_increments = 500 

27 # self._audit_log_counter = 0 

28 

29 super(BackgroundJob, self).__init__(raw=kwargs) 

30 

31 if not self.outcome_status: 

32 self.outcome_status = BgjobOutcomeStatus.Pending 

33 

34 @classmethod 

35 def active(cls, task_type, since=None): 

36 # ~~-> ActiveBackgroundJob:Query~~ 

37 q = ActiveQuery(task_type, since=since) 

38 actives = cls.q2obj(q=q.query()) 

39 return actives 

40 

41 def mappings(self): 

42 # ~~-> Elasticsearch:Technology~~ 

43 return es_data_mapping.create_mapping(self.get_struct(), MAPPING_OPTS) 

44 

45 # This feature would allow us to flush the audit logs to the index periodically. 

46 # We need to switch the object type to Seamless to enable that 

47 # def autosave(self, audit_log_increments=500): 

48 # self._audit_log_increments = audit_log_increments 

49 # self._autosave = True 

50 

51 @property 

52 def user(self): 

53 return self._get_single("user") 

54 

55 @user.setter 

56 def user(self, val): 

57 self._set_with_struct("user", val) 

58 

59 @property 

60 def action(self): 

61 return self._get_single("action") 

62 

63 @action.setter 

64 def action(self, val): 

65 self._set_with_struct("action", val) 

66 

67 @property 

68 def queue_id(self): 

69 return self._get_single("queue_id") 

70 

71 @queue_id.setter 

72 def queue_id(self, val): 

73 self._set_with_struct("queue_id", val) 

74 

75 @property 

76 def audit(self): 

77 return self._get_list("audit") 

78 

79 @property 

80 def params(self): 

81 return self._get_single("params") 

82 

83 @params.setter 

84 def params(self, obj): 

85 self._set_single("params", obj) # note we don't bother setting with struct, as there is none 

86 

87 @property 

88 def reference(self): 

89 return self._get_single("reference") 

90 

91 @reference.setter 

92 def reference(self, obj): 

93 self._set_single("reference", obj) # note we don't bother setting with struct, as there is none 

94 

95 def add_reference(self, name, value): 

96 if self.reference is None: 

97 self.reference = {} 

98 self.reference[name] = value 

99 

100 @property 

101 def status(self): 

102 return self._get_single("status") 

103 

104 def start(self): 

105 self._set_with_struct("status", "processing") 

106 

107 def success(self): 

108 self._set_with_struct("status", "complete") 

109 

110 def fail(self): 

111 self._set_with_struct("status", "error") 

112 self.outcome_fail() 

113 

114 def cancel(self): 

115 self._set_with_struct("status", "cancelled") 

116 

117 def is_failed(self): 

118 return self._get_single("status") == "error" 

119 

120 def queue(self): 

121 self._set_with_struct("status", "queued") 

122 

123 @property 

124 def outcome_status(self): 

125 return self._get_single('outcome_status') 

126 

127 def outcome_fail(self): 

128 self.outcome_status = BgjobOutcomeStatus.Fail 

129 

130 @outcome_status.setter 

131 def outcome_status(self, outcome_status: str): 

132 self._set_with_struct('outcome_status', outcome_status) 

133 

134 def add_audit_message(self, msg, timestamp=None): 

135 if timestamp is None: 

136 timestamp = dates.now_str_with_microseconds() 

137 obj = {"message": msg, "timestamp": timestamp} 

138 self._add_to_list_with_struct("audit", obj) 

139 

140 # This feature would allow us to flush the audit messages to the index periodically 

141 # if self._autosave: 

142 # audits = len(self._get_list("audit")) 

143 # if audits > self._audit_log_counter + self._audit_log_increments: 

144 # self.save() 

145 # self._audit_log_counter = audits 

146 

147 @property 

148 def pretty_audit(self): 

149 audits = self._get_list("audit") 

150 return "\n".join(["{t} {m}".format(t=a["timestamp"], m=a["message"]) for a in audits]) 

151 

152 

153class StdOutBackgroundJob(BackgroundJob): 

154 

155 def __init__(self, inner, force_logging=False): 

156 super(StdOutBackgroundJob, self).__init__(**inner.data) 

157 self._force_logging = force_logging 

158 

159 def add_audit_message(self, msg, timestamp=None): 

160 super(StdOutBackgroundJob, self).add_audit_message(msg, timestamp) 

161 if app.config.get("DOAJENV") == 'dev' or self._force_logging: 

162 if timestamp is None: 

163 timestamp = dates.now_str_with_microseconds() 

164 print("[" + timestamp + "] " + msg) 

165 

166 

167# ~~-> DataObj:Library~~ 

168BACKGROUND_STRUCT = { 

169 "fields": { 

170 "id": {"coerce": "unicode"}, 

171 "created_date": {"coerce": "utcdatetime"}, 

172 "last_updated": {"coerce": "utcdatetime"}, 

173 

174 # status of bgjob level, for example, This job experienced an exception and failed to complete normally 

175 "status": {"coerce": "unicode", "allowed_values": ["queued", "processing", "complete", "error", "cancelled"]}, 

176 "user": {"coerce": "unicode"}, 

177 "action": {"coerce": "unicode"}, 

178 "queue_id": {"coerce": "unicode"}, 

179 "es_type": {"coerce": "unicode"}, 

180 

181 # status of bgjob result (business logic level), for example, The job completed without exception, 

182 # but the action the user wanted was not carried out for some reason 

183 "outcome_status": {"coerce": "unicode", **seamless.create_allowed_values_by_constant(BgjobOutcomeStatus)}, 

184 }, 

185 "lists": { 

186 "audit": {"contains": "object"} 

187 }, 

188 "objects": [ 

189 "params", # Note that these do not have structs specified, which allows them to have arbitrary content 

190 "reference" 

191 ], 

192 "structs": { 

193 "audit": { 

194 "fields": { 

195 "message": {"coerce": "unicode"}, 

196 "timestamp": {"coerce": "utcdatetimemicros"} 

197 } 

198 } 

199 } 

200} 

201 

202# ~~-> Elasticsearch:Technology~~ 

203MAPPING_OPTS = { 

204 "dynamic": None, 

205 "coerces": app.config["DATAOBJ_TO_MAPPING_DEFAULTS"], 

206 "exceptions": { 

207 "audit.message": { 

208 "type": "text", 

209 "index": False, 

210 # "include_in_all": False # Removed in es6 fixme: do we need to look at copy_to for the mapping? 

211 } 

212 } 

213} 

214 

215 

216class ActiveQuery(object): 

217 """ 

218 ~~ActiveBackgroundJob:Query->Elasticsearch:Technology~~ 

219 """ 

220 

221 def __init__(self, task_type, size=2, since=None): 

222 self._task_type = task_type 

223 self._size = size 

224 self._since = since 

225 

226 def query(self): 

227 q = { 

228 "query": { 

229 "bool": { 

230 "must": [ 

231 {"term": {"action.exact": self._task_type}}, 

232 {"terms": {"status.exact": ["queued", "processing"]}} 

233 ] 

234 } 

235 }, 

236 "size": self._size 

237 } 

238 if self._since: 

239 q["query"]["bool"]["must"].append({"range": {"created_date": {"gte": self._since}}}) 

240 return q 

241 

242 

243class BackgroundJobQueryBuilder: 

244 def __init__(self): 

245 self.query_dict = { 

246 "query": { 

247 "bool": { 

248 "must": [] 

249 } 

250 }, 

251 } 

252 

253 def _append_bool_condition(self, bool_key: str, condition: dict): 

254 if bool_key not in self.query_dict['query']['bool']: 

255 self.query_dict['query']['bool'][bool_key] = [] 

256 self.query_dict['query']['bool'][bool_key].append(condition) 

257 return self 

258 

259 def append_must(self, condition: dict): 

260 self._append_bool_condition('must', condition) 

261 return self 

262 

263 def append_must_not(self, condition: dict): 

264 self._append_bool_condition('must_not', condition) 

265 return self 

266 

267 def since(self, since: datetime.datetime): 

268 if since: 

269 since_str = dates.format(since) 

270 self.append_must({"range": {"created_date": {"gte": since_str}}}) 

271 return self 

272 

273 def action(self, action): 

274 self.append_must({"term": {"action.exact": action}}) 

275 return self 

276 

277 def queue_id(self, queue_id): 

278 self.append_must({"term": {"queue_id.exact": queue_id}}) 

279 return self 

280 

281 def _to_list(self, item): 

282 if isinstance(item, str): 

283 item = [item] 

284 elif not isinstance(item, list): 

285 item = list(item) 

286 return item 

287 

288 def status_includes(self, status): 

289 self.append_must({"terms": {"status.exact": self._to_list(status)}}) 

290 return self 

291 

292 def status_excludes(self, status): 

293 self.append_must_not({"terms": {"status.exact": self._to_list(status)}}) 

294 return self 

295 

296 def outcome_includes(self, status): 

297 self.append_must({"terms": {"outcome_status.exact": self._to_list(status)}}) 

298 

299 def size(self, size: int): 

300 self.query_dict['size'] = size 

301 return self 

302 

303 def order_by(self, field_name, order): 

304 if 'sort' not in self.query_dict: 

305 self.query_dict['sort'] = [] 

306 

307 self.query_dict['sort'].append( 

308 {field_name: {"order": order}} 

309 ) 

310 return self 

311 

312 def build_query_dict(self): 

313 return self.query_dict.copy() 

314 

315 def build_query_object(self): 

316 class _Query: 

317 def query(subself): 

318 return self.build_query_dict() 

319 

320 return _Query() 

321 

322 

323class SimpleBgjobQueue: 

324 def __init__(self, action, status, since=None): 

325 self.action = action 

326 self.status = status 

327 self.since = since 

328 

329 def query(self): 

330 return (BackgroundJobQueryBuilder() 

331 .action(self.action) 

332 .since(self.since) 

333 .status_includes(self.status) 

334 .build_query_dict()) 

335 

336 

337class LastCompletedJobQuery: 

338 def __init__(self, queue_id): 

339 self.queue_id = queue_id 

340 

341 def query(self): 

342 return (BackgroundJobQueryBuilder() 

343 .queue_id(self.queue_id) 

344 .order_by('last_updated', 'desc') 

345 .size(1) 

346 .build_query_dict())