Coverage for portality / tasks / journal_bulk_edit.py: 87%

140 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 00:09 +0100

1from copy import deepcopy 

2import json 

3 

4from flask_login import current_user 

5from werkzeug.datastructures import MultiDict 

6 

7from portality import models, lock 

8from portality.core import app 

9# from portality.formcontext import formcontext 

10from portality.forms.application_forms import JournalFormFactory 

11from portality.lib import dates 

12 

13from portality.tasks.redis_huey import events_queue as queue 

14 

15from portality.background import AdminBackgroundTask, BackgroundApi, BackgroundException, BackgroundSummary 

16 

17 

18def journal_manage(selection_query, dry_run=True, editor_group='', note='', **kwargs): 

19 

20 ids = JournalBulkEditBackgroundTask.resolve_selection_query(selection_query) 

21 if dry_run: 

22 JournalBulkEditBackgroundTask.check_admin_privilege(current_user.id) 

23 return BackgroundSummary(None, affected={"journals" : len(ids)}) 

24 

25 if kwargs is None: 

26 kwargs = {} 

27 if editor_group != "": 

28 kwargs["editor_group"] = editor_group 

29 

30 job = JournalBulkEditBackgroundTask.prepare( 

31 current_user.id, 

32 selection_query=selection_query, 

33 note=note, 

34 ids=ids, 

35 replacement_metadata=kwargs 

36 ) 

37 JournalBulkEditBackgroundTask.submit(job) 

38 

39 affected = len(ids) 

40 job_id = None 

41 if job is not None: 

42 job_id = job.id 

43 return BackgroundSummary(job_id, affected={"journals" : affected}) 

44 

45 

46class JournalBulkEditBackgroundTask(AdminBackgroundTask): 

47 

48 __action__ = "journal_bulk_edit" 

49 

50 @classmethod 

51 def _job_parameter_check(cls, params): 

52 # we definitely need "ids" defined 

53 # we need at least one of "editor_group" or "note" defined as well 

54 ids = cls.get_param(params, "ids") 

55 ids_valid = ids is not None and len(ids) > 0 

56 

57 note = cls.get_param(params, "note") 

58 note_valid = note is not None 

59 

60 metadata = cls.get_param(params, "replacement_metadata", "{}") 

61 metadata = json.loads(metadata) 

62 metadata_valid = len(metadata.keys()) > 0 

63 

64 return ids_valid and (note_valid or metadata_valid) 

65 

66 def run(self): 

67 """ 

68 Execute the task as specified by the background_job 

69 :return: 

70 """ 

71 job = self.background_job 

72 params = job.params 

73 

74 if not self._job_parameter_check(params): 

75 raise BackgroundException("{}.run run without sufficient parameters".format(self.__class__.__name__)) 

76 

77 # get the parameters for the job 

78 ids = self.get_param(params, 'ids') 

79 note = self.get_param(params, 'note') 

80 metadata = json.loads(self.get_param(params, 'replacement_metadata', "{}")) 

81 

82 # if there is metadata, validate it 

83 if len(metadata.keys()) > 0: 

84 formdata = MultiDict(metadata) 

85 formulaic_context = JournalFormFactory.context("bulk_edit") 

86 fc = formulaic_context.processor(formdata=formdata) 

87 if not fc.validate(): 

88 raise BackgroundException("Unable to validate replacement metadata: " + json.dumps(metadata)) 

89 

90 for journal_id in ids: 

91 updated = False 

92 

93 j = models.Journal.pull(journal_id) 

94 

95 if j is None: 

96 job.add_audit_message("Journal with id {} does not exist, skipping".format(journal_id)) 

97 continue 

98 

99 formulaic_context = JournalFormFactory.context("admin") 

100 fc = formulaic_context.processor(source=j) 

101 

102 # turn on the "all fields optional" flag, so that bulk tasks don't cause errors that the user iterface 

103 # would allow you to bypass 

104 fc.form.make_all_fields_optional.data = True 

105 

106 if "editor_group" in metadata: 

107 fc.form.editor.data = None 

108 elif j.editor_group is not None: 

109 # FIXME: this is a bit of a stop-gap, pending a more substantial referential-integrity-like solution 

110 # if the editor group is not being changed, validate that the editor is actually in the editor group, 

111 # and if not, unset them 

112 eg = models.EditorGroup.pull_by_key("name", j.editor_group) 

113 if eg is not None: 

114 all_eds = eg.associates + [eg.editor] 

115 if j.editor not in all_eds: 

116 fc.form.editor.data = None 

117 else: 

118 # if we didn't find the editor group, this is broken anyway, so reset the editor data anyway 

119 fc.form.editor.data = None 

120 

121 for k, v in metadata.items(): 

122 job.add_audit_message("Setting {f} to {x} for journal {y}".format(f=k, x=v, y=journal_id)) 

123 fc.form[k].data = v 

124 

125 updated = True 

126 

127 if note: 

128 job.add_audit_message("Adding note to for journal {y}".format(y=journal_id)) 

129 fc.form.notes.append_entry( 

130 {'note_date': dates.now_str(), 'note': note} 

131 ) 

132 updated = True 

133 

134 if updated: 

135 if fc.validate(): 

136 try: 

137 fc.finalise() 

138 except Exception as e: 

139 job.add_audit_message("Form context exception while bulk editing journal {} :\n{}".format(journal_id, str(e))) 

140 else: 

141 data_submitted = {} 

142 for affected_field_name in fc.form.errors.keys(): 

143 affected_field = getattr(fc.form, affected_field_name, 

144 ' Field {} does not exist on form. '.format(affected_field_name)) 

145 if isinstance(affected_field, str): # ideally this should never happen, an error should not be reported on a field that is not present on the form 

146 data_submitted[affected_field_name] = affected_field 

147 continue 

148 

149 data_submitted[affected_field_name] = affected_field.data 

150 job.add_audit_message( 

151 "Data validation failed while bulk editing journal {} :\n" 

152 "{}\n\n" 

153 "The data from the fields with the errors is:\n{}".format( 

154 journal_id, json.dumps(fc.form.errors), json.dumps(data_submitted) 

155 ) 

156 ) 

157 

158 def cleanup(self): 

159 """ 

160 Cleanup after a successful OR failed run of the task 

161 :return: 

162 """ 

163 job = self.background_job 

164 params = job.params 

165 ids = self.get_param(params, 'ids') 

166 username = job.user 

167 

168 lock.batch_unlock("journal", ids, username) 

169 

170 @classmethod 

171 def resolve_selection_query(cls, selection_query): 

172 q = deepcopy(selection_query) 

173 q["_source"] = False 

174 iterator = models.Journal.iterate(q=q, page_size=5000, wrap=False) 

175 return [j['_id'] for j in iterator] 

176 

177 @classmethod 

178 def prepare(cls, username, **kwargs): 

179 """ 

180 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob, 

181 or fail with a suitable exception 

182 

183 :param kwargs: arbitrary keyword arguments pertaining to this task type 

184 :return: a BackgroundJob instance representing this task 

185 """ 

186 

187 super(JournalBulkEditBackgroundTask, cls).prepare(username, **kwargs) 

188 

189 # first prepare a job record 

190 job = models.BackgroundJob() 

191 job.user = username 

192 job.action = cls.__action__ 

193 

194 refs = {} 

195 cls.set_reference(refs, "selection_query", json.dumps(kwargs['selection_query'])) 

196 job.reference = refs 

197 

198 params = {} 

199 

200 # get the named parameters we know may be there 

201 cls.set_param(params, 'ids', kwargs['ids']) 

202 if "note" in kwargs and kwargs["note"] is not None and kwargs["note"] != "": 

203 cls.set_param(params, 'note', kwargs.get('note', '')) 

204 

205 # get the metadata overwrites 

206 if "replacement_metadata" in kwargs: 

207 metadata = {} 

208 for k, v in kwargs["replacement_metadata"].items(): 

209 if v is not None and v != "": 

210 metadata[k] = v 

211 if len(metadata.keys()) > 0: 

212 cls.set_param(params, 'replacement_metadata', json.dumps(metadata)) 

213 

214 if not cls._job_parameter_check(params): 

215 raise BackgroundException("{}.prepare run without sufficient parameters".format(cls.__name__)) 

216 

217 job.params = params 

218 job.queue_id = huey_helper.queue_id 

219 

220 # now ensure that we have the locks for all the journals 

221 # will raise an exception if this fails 

222 lock.batch_lock("journal", kwargs['ids'], username, timeout=app.config.get("BACKGROUND_TASK_LOCK_TIMEOUT", 3600)) 

223 

224 return job 

225 

226 @classmethod 

227 def submit(cls, background_job): 

228 """ 

229 Submit the specified BackgroundJob to the background queue 

230 

231 :param background_job: the BackgroundJob instance 

232 :return: 

233 """ 

234 background_job.save(blocking=True) 

235 journal_bulk_edit.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) 

236 

237 

238huey_helper = JournalBulkEditBackgroundTask.create_huey_helper(queue) 

239 

240 

241@huey_helper.register_execute(is_load_config=False) 

242def journal_bulk_edit(job_id): 

243 job = models.BackgroundJob.pull(job_id) 

244 task = JournalBulkEditBackgroundTask(job) 

245 BackgroundApi.execute(task)