Coverage for portality/models/background.py: 90%

93 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-22 15:59 +0100

1from portality.core import app 

2from portality.lib import dataobj, dates, es_data_mapping 

3from portality import dao 

4 

5 

6class BackgroundJob(dataobj.DataObj, dao.DomainObject): 

7 """ 

8 # ~~BackgroundJob:Model~~ 

9 """ 

10 __type__ = "background_job" 

11 

12 def __init__(self, **kwargs): 

13 # FIXME: hack, to deal with ES integration layer being improperly abstracted 

14 if "_source" in kwargs: 

15 kwargs = kwargs["_source"] 

16 

17 self._add_struct(BACKGROUND_STRUCT) 

18 if "status" not in kwargs: 

19 kwargs["status"] = "queued" 

20 

21 # to audosave we need to move the object over to Seamless 

22 # self._autosave = False 

23 # self._audit_log_increments = 500 

24 # self._audit_log_counter = 0 

25 

26 super(BackgroundJob, self).__init__(raw=kwargs) 

27 

28 @classmethod 

29 def active(cls, task_type, since=None): 

30 # ~~-> ActiveBackgroundJob:Query~~ 

31 q = ActiveQuery(task_type, since=since) 

32 actives = cls.q2obj(q=q.query()) 

33 return actives 

34 

35 def mappings(self): 

36 # ~~-> Elasticsearch:Technology~~ 

37 return es_data_mapping.create_mapping(self.get_struct(), MAPPING_OPTS) 

38 

39 # This feature would allow us to flush the audit logs to the index periodically. 

40 # We need to switch the object type to Seamless to enable that 

41 # def autosave(self, audit_log_increments=500): 

42 # self._audit_log_increments = audit_log_increments 

43 # self._autosave = True 

44 

45 @property 

46 def user(self): 

47 return self._get_single("user") 

48 

49 @user.setter 

50 def user(self, val): 

51 self._set_with_struct("user", val) 

52 

53 @property 

54 def action(self): 

55 return self._get_single("action") 

56 

57 @action.setter 

58 def action(self, val): 

59 self._set_with_struct("action", val) 

60 

61 @property 

62 def audit(self): 

63 return self._get_list("audit") 

64 

65 @property 

66 def params(self): 

67 return self._get_single("params") 

68 

69 @params.setter 

70 def params(self, obj): 

71 self._set_single("params", obj) # note we don't bother setting with struct, as there is none 

72 

73 @property 

74 def reference(self): 

75 return self._get_single("reference") 

76 

77 @reference.setter 

78 def reference(self, obj): 

79 self._set_single("reference", obj) # note we don't bother setting with struct, as there is none 

80 

81 def add_reference(self, name, value): 

82 if self.reference is None: 

83 self.reference = {} 

84 self.reference[name] = value 

85 

86 @property 

87 def status(self): 

88 return self._get_single("status") 

89 

90 def start(self): 

91 self._set_with_struct("status", "processing") 

92 

93 def success(self): 

94 self._set_with_struct("status", "complete") 

95 

96 def fail(self): 

97 self._set_with_struct("status", "error") 

98 

99 def cancel(self): 

100 self._set_with_struct("status", "cancelled") 

101 

102 def is_failed(self): 

103 return self._get_single("status") == "error" 

104 

105 def queue(self): 

106 self._set_with_struct("status", "queued") 

107 

108 def add_audit_message(self, msg, timestamp=None): 

109 if timestamp is None: 

110 timestamp = dates.now_with_microseconds() 

111 obj = {"message": msg, "timestamp": timestamp} 

112 self._add_to_list_with_struct("audit", obj) 

113 

114 # This feature would allow us to flush the audit messages to the index periodically 

115 # if self._autosave: 

116 # audits = len(self._get_list("audit")) 

117 # if audits > self._audit_log_counter + self._audit_log_increments: 

118 # self.save() 

119 # self._audit_log_counter = audits 

120 

121 @property 

122 def pretty_audit(self): 

123 audits = self._get_list("audit") 

124 return "\n".join(["{t} {m}".format(t=a["timestamp"], m=a["message"]) for a in audits]) 

125 

126 

127class StdOutBackgroundJob(BackgroundJob): 

128 

129 def __init__(self, inner): 

130 super(StdOutBackgroundJob, self).__init__(**inner.data) 

131 

132 def add_audit_message(self, msg, timestamp=None): 

133 super(StdOutBackgroundJob, self).add_audit_message(msg, timestamp) 

134 if app.config.get("DOAJENV") == 'dev': 

135 print(msg) 

136 

137 

138# ~~-> DataObj:Library~~ 

139BACKGROUND_STRUCT = { 

140 "fields": { 

141 "id": {"coerce": "unicode"}, 

142 "created_date": {"coerce": "utcdatetime"}, 

143 "last_updated": {"coerce": "utcdatetime"}, 

144 "status": {"coerce": "unicode", "allowed_values": ["queued", "processing", "complete", "error", "cancelled"]}, 

145 "user": {"coerce": "unicode"}, 

146 "action": {"coerce": "unicode"}, 

147 "queue_id": {"coerce": "unicode"}, 

148 "es_type": {"coerce": "unicode"} 

149 }, 

150 "lists": { 

151 "audit": {"contains": "object"} 

152 }, 

153 "objects": [ 

154 "params", # Note that these do not have structs specified, which allows them to have arbitrary content 

155 "reference" 

156 ], 

157 "structs": { 

158 "audit": { 

159 "fields": { 

160 "message": {"coerce": "unicode"}, 

161 "timestamp": {"coerce": "utcdatetimemicros"} 

162 } 

163 } 

164 } 

165} 

166 

167# ~~-> Elasticsearch:Technology~~ 

168MAPPING_OPTS = { 

169 "dynamic": None, 

170 "coerces": app.config["DATAOBJ_TO_MAPPING_DEFAULTS"], 

171 "exceptions": { 

172 "audit.message": { 

173 "type": "text", 

174 "index": False, 

175 # "include_in_all": False # Removed in es6 fixme: do we need to look at copy_to for the mapping? 

176 } 

177 } 

178} 

179 

180 

181class ActiveQuery(object): 

182 """ 

183 ~~ActiveBackgroundJob:Query->Elasticsearch:Technology~~ 

184 """ 

185 def __init__(self, task_type, size=2, since=None): 

186 self._task_type = task_type 

187 self._size = size 

188 self._since = since 

189 

190 def query(self): 

191 q = { 

192 "query" : { 

193 "bool" : { 

194 "must" : [ 

195 {"term" : {"action.exact" : self._task_type}}, 

196 {"terms" : {"status.exact" : ["queued", "processing"]}} 

197 ] 

198 } 

199 }, 

200 "size" : self._size 

201 } 

202 if self._since: 

203 q["query"]["bool"]["must"].append({"range" : {"created_date" : {"gte": self._since}}}) 

204 return q