Coverage for portality / models / background.py: 87%
181 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
1import datetime
3from portality import dao
4from portality.constants import BgjobOutcomeStatus
5from portality.core import app
6from portality.lib import dataobj, dates, es_data_mapping, seamless
9class BackgroundJob(dataobj.DataObj, dao.DomainObject):
10 """
11 # ~~BackgroundJob:Model~~
12 """
13 __type__ = "background_job"
15 def __init__(self, **kwargs):
16 # FIXME: hack, to deal with ES integration layer being improperly abstracted
17 if "_source" in kwargs:
18 kwargs = kwargs["_source"]
20 self._add_struct(BACKGROUND_STRUCT)
21 if "status" not in kwargs:
22 kwargs["status"] = "queued"
24 # to audosave we need to move the object over to Seamless
25 # self._autosave = False
26 # self._audit_log_increments = 500
27 # self._audit_log_counter = 0
29 super(BackgroundJob, self).__init__(raw=kwargs)
31 if not self.outcome_status:
32 self.outcome_status = BgjobOutcomeStatus.Pending
34 @classmethod
35 def active(cls, task_type, since=None):
36 # ~~-> ActiveBackgroundJob:Query~~
37 q = ActiveQuery(task_type, since=since)
38 actives = cls.q2obj(q=q.query())
39 return actives
41 def mappings(self):
42 # ~~-> Elasticsearch:Technology~~
43 return es_data_mapping.create_mapping(self.get_struct(), MAPPING_OPTS)
45 # This feature would allow us to flush the audit logs to the index periodically.
46 # We need to switch the object type to Seamless to enable that
47 # def autosave(self, audit_log_increments=500):
48 # self._audit_log_increments = audit_log_increments
49 # self._autosave = True
51 @property
52 def user(self):
53 return self._get_single("user")
55 @user.setter
56 def user(self, val):
57 self._set_with_struct("user", val)
59 @property
60 def action(self):
61 return self._get_single("action")
63 @action.setter
64 def action(self, val):
65 self._set_with_struct("action", val)
67 @property
68 def queue_id(self):
69 return self._get_single("queue_id")
71 @queue_id.setter
72 def queue_id(self, val):
73 self._set_with_struct("queue_id", val)
75 @property
76 def audit(self):
77 return self._get_list("audit")
79 @property
80 def params(self):
81 return self._get_single("params")
83 @params.setter
84 def params(self, obj):
85 self._set_single("params", obj) # note we don't bother setting with struct, as there is none
87 @property
88 def reference(self):
89 return self._get_single("reference")
91 @reference.setter
92 def reference(self, obj):
93 self._set_single("reference", obj) # note we don't bother setting with struct, as there is none
95 def add_reference(self, name, value):
96 if self.reference is None:
97 self.reference = {}
98 self.reference[name] = value
100 @property
101 def status(self):
102 return self._get_single("status")
104 def start(self):
105 self._set_with_struct("status", "processing")
107 def success(self):
108 self._set_with_struct("status", "complete")
110 def fail(self):
111 self._set_with_struct("status", "error")
112 self.outcome_fail()
114 def cancel(self):
115 self._set_with_struct("status", "cancelled")
117 def is_failed(self):
118 return self._get_single("status") == "error"
120 def queue(self):
121 self._set_with_struct("status", "queued")
123 @property
124 def outcome_status(self):
125 return self._get_single('outcome_status')
127 def outcome_fail(self):
128 self.outcome_status = BgjobOutcomeStatus.Fail
130 @outcome_status.setter
131 def outcome_status(self, outcome_status: str):
132 self._set_with_struct('outcome_status', outcome_status)
134 def add_audit_message(self, msg, timestamp=None):
135 if timestamp is None:
136 timestamp = dates.now_str_with_microseconds()
137 obj = {"message": msg, "timestamp": timestamp}
138 self._add_to_list_with_struct("audit", obj)
140 # This feature would allow us to flush the audit messages to the index periodically
141 # if self._autosave:
142 # audits = len(self._get_list("audit"))
143 # if audits > self._audit_log_counter + self._audit_log_increments:
144 # self.save()
145 # self._audit_log_counter = audits
147 @property
148 def pretty_audit(self):
149 audits = self._get_list("audit")
150 return "\n".join(["{t} {m}".format(t=a["timestamp"], m=a["message"]) for a in audits])
153class StdOutBackgroundJob(BackgroundJob):
155 def __init__(self, inner, force_logging=False):
156 super(StdOutBackgroundJob, self).__init__(**inner.data)
157 self._force_logging = force_logging
159 def add_audit_message(self, msg, timestamp=None):
160 super(StdOutBackgroundJob, self).add_audit_message(msg, timestamp)
161 if app.config.get("DOAJENV") == 'dev' or self._force_logging:
162 if timestamp is None:
163 timestamp = dates.now_str_with_microseconds()
164 print("[" + timestamp + "] " + msg)
167# ~~-> DataObj:Library~~
168BACKGROUND_STRUCT = {
169 "fields": {
170 "id": {"coerce": "unicode"},
171 "created_date": {"coerce": "utcdatetime"},
172 "last_updated": {"coerce": "utcdatetime"},
174 # status of bgjob level, for example, This job experienced an exception and failed to complete normally
175 "status": {"coerce": "unicode", "allowed_values": ["queued", "processing", "complete", "error", "cancelled"]},
176 "user": {"coerce": "unicode"},
177 "action": {"coerce": "unicode"},
178 "queue_id": {"coerce": "unicode"},
179 "es_type": {"coerce": "unicode"},
181 # status of bgjob result (business logic level), for example, The job completed without exception,
182 # but the action the user wanted was not carried out for some reason
183 "outcome_status": {"coerce": "unicode", **seamless.create_allowed_values_by_constant(BgjobOutcomeStatus)},
184 },
185 "lists": {
186 "audit": {"contains": "object"}
187 },
188 "objects": [
189 "params", # Note that these do not have structs specified, which allows them to have arbitrary content
190 "reference"
191 ],
192 "structs": {
193 "audit": {
194 "fields": {
195 "message": {"coerce": "unicode"},
196 "timestamp": {"coerce": "utcdatetimemicros"}
197 }
198 }
199 }
200}
202# ~~-> Elasticsearch:Technology~~
203MAPPING_OPTS = {
204 "dynamic": None,
205 "coerces": app.config["DATAOBJ_TO_MAPPING_DEFAULTS"],
206 "exceptions": {
207 "audit.message": {
208 "type": "text",
209 "index": False,
210 # "include_in_all": False # Removed in es6 fixme: do we need to look at copy_to for the mapping?
211 }
212 }
213}
216class ActiveQuery(object):
217 """
218 ~~ActiveBackgroundJob:Query->Elasticsearch:Technology~~
219 """
221 def __init__(self, task_type, size=2, since=None):
222 self._task_type = task_type
223 self._size = size
224 self._since = since
226 def query(self):
227 q = {
228 "query": {
229 "bool": {
230 "must": [
231 {"term": {"action.exact": self._task_type}},
232 {"terms": {"status.exact": ["queued", "processing"]}}
233 ]
234 }
235 },
236 "size": self._size
237 }
238 if self._since:
239 q["query"]["bool"]["must"].append({"range": {"created_date": {"gte": self._since}}})
240 return q
243class BackgroundJobQueryBuilder:
244 def __init__(self):
245 self.query_dict = {
246 "query": {
247 "bool": {
248 "must": []
249 }
250 },
251 }
253 def _append_bool_condition(self, bool_key: str, condition: dict):
254 if bool_key not in self.query_dict['query']['bool']:
255 self.query_dict['query']['bool'][bool_key] = []
256 self.query_dict['query']['bool'][bool_key].append(condition)
257 return self
259 def append_must(self, condition: dict):
260 self._append_bool_condition('must', condition)
261 return self
263 def append_must_not(self, condition: dict):
264 self._append_bool_condition('must_not', condition)
265 return self
267 def since(self, since: datetime.datetime):
268 if since:
269 since_str = dates.format(since)
270 self.append_must({"range": {"created_date": {"gte": since_str}}})
271 return self
273 def action(self, action):
274 self.append_must({"term": {"action.exact": action}})
275 return self
277 def queue_id(self, queue_id):
278 self.append_must({"term": {"queue_id.exact": queue_id}})
279 return self
281 def _to_list(self, item):
282 if isinstance(item, str):
283 item = [item]
284 elif not isinstance(item, list):
285 item = list(item)
286 return item
288 def status_includes(self, status):
289 self.append_must({"terms": {"status.exact": self._to_list(status)}})
290 return self
292 def status_excludes(self, status):
293 self.append_must_not({"terms": {"status.exact": self._to_list(status)}})
294 return self
296 def outcome_includes(self, status):
297 self.append_must({"terms": {"outcome_status.exact": self._to_list(status)}})
299 def size(self, size: int):
300 self.query_dict['size'] = size
301 return self
303 def order_by(self, field_name, order):
304 if 'sort' not in self.query_dict:
305 self.query_dict['sort'] = []
307 self.query_dict['sort'].append(
308 {field_name: {"order": order}}
309 )
310 return self
312 def build_query_dict(self):
313 return self.query_dict.copy()
315 def build_query_object(self):
316 class _Query:
317 def query(subself):
318 return self.build_query_dict()
320 return _Query()
323class SimpleBgjobQueue:
324 def __init__(self, action, status, since=None):
325 self.action = action
326 self.status = status
327 self.since = since
329 def query(self):
330 return (BackgroundJobQueryBuilder()
331 .action(self.action)
332 .since(self.since)
333 .status_includes(self.status)
334 .build_query_dict())
337class LastCompletedJobQuery:
338 def __init__(self, queue_id):
339 self.queue_id = queue_id
341 def query(self):
342 return (BackgroundJobQueryBuilder()
343 .queue_id(self.queue_id)
344 .order_by('last_updated', 'desc')
345 .size(1)
346 .build_query_dict())