Coverage for portality/models/background.py: 90%
93 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-19 18:38 +0100
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-19 18:38 +0100
1from portality.core import app
2from portality.lib import dataobj, dates, es_data_mapping
3from portality import dao
6class BackgroundJob(dataobj.DataObj, dao.DomainObject):
7 """
8 # ~~BackgroundJob:Model~~
9 """
10 __type__ = "background_job"
12 def __init__(self, **kwargs):
13 # FIXME: hack, to deal with ES integration layer being improperly abstracted
14 if "_source" in kwargs:
15 kwargs = kwargs["_source"]
17 self._add_struct(BACKGROUND_STRUCT)
18 if "status" not in kwargs:
19 kwargs["status"] = "queued"
21 # to audosave we need to move the object over to Seamless
22 # self._autosave = False
23 # self._audit_log_increments = 500
24 # self._audit_log_counter = 0
26 super(BackgroundJob, self).__init__(raw=kwargs)
28 @classmethod
29 def active(cls, task_type, since=None):
30 # ~~-> ActiveBackgroundJob:Query~~
31 q = ActiveQuery(task_type, since=since)
32 actives = cls.q2obj(q=q.query())
33 return actives
35 def mappings(self):
36 # ~~-> Elasticsearch:Technology~~
37 return es_data_mapping.create_mapping(self.get_struct(), MAPPING_OPTS)
39 # This feature would allow us to flush the audit logs to the index periodically.
40 # We need to switch the object type to Seamless to enable that
41 # def autosave(self, audit_log_increments=500):
42 # self._audit_log_increments = audit_log_increments
43 # self._autosave = True
45 @property
46 def user(self):
47 return self._get_single("user")
49 @user.setter
50 def user(self, val):
51 self._set_with_struct("user", val)
53 @property
54 def action(self):
55 return self._get_single("action")
57 @action.setter
58 def action(self, val):
59 self._set_with_struct("action", val)
61 @property
62 def audit(self):
63 return self._get_list("audit")
65 @property
66 def params(self):
67 return self._get_single("params")
69 @params.setter
70 def params(self, obj):
71 self._set_single("params", obj) # note we don't bother setting with struct, as there is none
73 @property
74 def reference(self):
75 return self._get_single("reference")
77 @reference.setter
78 def reference(self, obj):
79 self._set_single("reference", obj) # note we don't bother setting with struct, as there is none
81 def add_reference(self, name, value):
82 if self.reference is None:
83 self.reference = {}
84 self.reference[name] = value
86 @property
87 def status(self):
88 return self._get_single("status")
90 def start(self):
91 self._set_with_struct("status", "processing")
93 def success(self):
94 self._set_with_struct("status", "complete")
96 def fail(self):
97 self._set_with_struct("status", "error")
99 def cancel(self):
100 self._set_with_struct("status", "cancelled")
102 def is_failed(self):
103 return self._get_single("status") == "error"
105 def queue(self):
106 self._set_with_struct("status", "queued")
108 def add_audit_message(self, msg, timestamp=None):
109 if timestamp is None:
110 timestamp = dates.now_with_microseconds()
111 obj = {"message": msg, "timestamp": timestamp}
112 self._add_to_list_with_struct("audit", obj)
114 # This feature would allow us to flush the audit messages to the index periodically
115 # if self._autosave:
116 # audits = len(self._get_list("audit"))
117 # if audits > self._audit_log_counter + self._audit_log_increments:
118 # self.save()
119 # self._audit_log_counter = audits
121 @property
122 def pretty_audit(self):
123 audits = self._get_list("audit")
124 return "\n".join(["{t} {m}".format(t=a["timestamp"], m=a["message"]) for a in audits])
127class StdOutBackgroundJob(BackgroundJob):
129 def __init__(self, inner):
130 super(StdOutBackgroundJob, self).__init__(**inner.data)
132 def add_audit_message(self, msg, timestamp=None):
133 super(StdOutBackgroundJob, self).add_audit_message(msg, timestamp)
134 if app.config.get("DOAJENV") == 'dev':
135 print(msg)
138# ~~-> DataObj:Library~~
139BACKGROUND_STRUCT = {
140 "fields": {
141 "id": {"coerce": "unicode"},
142 "created_date": {"coerce": "utcdatetime"},
143 "last_updated": {"coerce": "utcdatetime"},
144 "status": {"coerce": "unicode", "allowed_values": ["queued", "processing", "complete", "error", "cancelled"]},
145 "user": {"coerce": "unicode"},
146 "action": {"coerce": "unicode"},
147 "queue_id": {"coerce": "unicode"},
148 "es_type": {"coerce": "unicode"}
149 },
150 "lists": {
151 "audit": {"contains": "object"}
152 },
153 "objects": [
154 "params", # Note that these do not have structs specified, which allows them to have arbitrary content
155 "reference"
156 ],
157 "structs": {
158 "audit": {
159 "fields": {
160 "message": {"coerce": "unicode"},
161 "timestamp": {"coerce": "utcdatetimemicros"}
162 }
163 }
164 }
165}
167# ~~-> Elasticsearch:Technology~~
168MAPPING_OPTS = {
169 "dynamic": None,
170 "coerces": app.config["DATAOBJ_TO_MAPPING_DEFAULTS"],
171 "exceptions": {
172 "audit.message": {
173 "type": "text",
174 "index": False,
175 # "include_in_all": False # Removed in es6 fixme: do we need to look at copy_to for the mapping?
176 }
177 }
178}
181class ActiveQuery(object):
182 """
183 ~~ActiveBackgroundJob:Query->Elasticsearch:Technology~~
184 """
185 def __init__(self, task_type, size=2, since=None):
186 self._task_type = task_type
187 self._size = size
188 self._since = since
190 def query(self):
191 q = {
192 "query" : {
193 "bool" : {
194 "must" : [
195 {"term" : {"action.exact" : self._task_type}},
196 {"terms" : {"status.exact" : ["queued", "processing"]}}
197 ]
198 }
199 },
200 "size" : self._size
201 }
202 if self._since:
203 q["query"]["bool"]["must"].append({"range" : {"created_date" : {"gte": self._since}}})
204 return q