Coverage for portality/tasks/journal_bulk_edit.py: 88%

143 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-22 15:59 +0100

1from copy import deepcopy 

2import json 

3from datetime import datetime 

4 

5from flask_login import current_user 

6from werkzeug.datastructures import MultiDict 

7 

8from portality import models, lock 

9from portality.core import app 

10# from portality.formcontext import formcontext 

11from portality.forms.application_forms import JournalFormFactory 

12 

13from portality.tasks.redis_huey import main_queue 

14from portality.decorators import write_required 

15 

16from portality.background import AdminBackgroundTask, BackgroundApi, BackgroundException, BackgroundSummary 

17 

18 

19def journal_manage(selection_query, dry_run=True, editor_group='', note='', **kwargs): 

20 

21 ids = JournalBulkEditBackgroundTask.resolve_selection_query(selection_query) 

22 if dry_run: 

23 JournalBulkEditBackgroundTask.check_admin_privilege(current_user.id) 

24 return BackgroundSummary(None, affected={"journals" : len(ids)}) 

25 

26 if kwargs is None: 

27 kwargs = {} 

28 if editor_group != "": 

29 kwargs["editor_group"] = editor_group 

30 

31 job = JournalBulkEditBackgroundTask.prepare( 

32 current_user.id, 

33 selection_query=selection_query, 

34 note=note, 

35 ids=ids, 

36 replacement_metadata=kwargs 

37 ) 

38 JournalBulkEditBackgroundTask.submit(job) 

39 

40 affected = len(ids) 

41 job_id = None 

42 if job is not None: 

43 job_id = job.id 

44 return BackgroundSummary(job_id, affected={"journals" : affected}) 

45 

46 

47class JournalBulkEditBackgroundTask(AdminBackgroundTask): 

48 

49 __action__ = "journal_bulk_edit" 

50 

51 @classmethod 

52 def _job_parameter_check(cls, params): 

53 # we definitely need "ids" defined 

54 # we need at least one of "editor_group" or "note" defined as well 

55 ids = cls.get_param(params, "ids") 

56 ids_valid = ids is not None and len(ids) > 0 

57 

58 note = cls.get_param(params, "note") 

59 note_valid = note is not None 

60 

61 metadata = cls.get_param(params, "replacement_metadata", "{}") 

62 metadata = json.loads(metadata) 

63 metadata_valid = len(metadata.keys()) > 0 

64 

65 return ids_valid and (note_valid or metadata_valid) 

66 

67 def run(self): 

68 """ 

69 Execute the task as specified by the background_job 

70 :return: 

71 """ 

72 job = self.background_job 

73 params = job.params 

74 

75 if not self._job_parameter_check(params): 

76 raise BackgroundException("{}.run run without sufficient parameters".format(self.__class__.__name__)) 

77 

78 # get the parameters for the job 

79 ids = self.get_param(params, 'ids') 

80 note = self.get_param(params, 'note') 

81 metadata = json.loads(self.get_param(params, 'replacement_metadata', "{}")) 

82 

83 # if there is metadata, validate it 

84 if len(metadata.keys()) > 0: 

85 formdata = MultiDict(metadata) 

86 formulaic_context = JournalFormFactory.context("bulk_edit") 

87 fc = formulaic_context.processor(formdata=formdata) 

88 if not fc.validate(): 

89 raise BackgroundException("Unable to validate replacement metadata: " + json.dumps(metadata)) 

90 

91 for journal_id in ids: 

92 updated = False 

93 

94 j = models.Journal.pull(journal_id) 

95 

96 if j is None: 

97 job.add_audit_message("Journal with id {} does not exist, skipping".format(journal_id)) 

98 continue 

99 

100 formulaic_context = JournalFormFactory.context("admin") 

101 fc = formulaic_context.processor(source=j) 

102 

103 # turn on the "all fields optional" flag, so that bulk tasks don't cause errors that the user iterface 

104 # would allow you to bypass 

105 fc.form.make_all_fields_optional.data = True 

106 

107 if "editor_group" in metadata: 

108 fc.form.editor.data = None 

109 elif j.editor_group is not None: 

110 # FIXME: this is a bit of a stop-gap, pending a more substantial referential-integrity-like solution 

111 # if the editor group is not being changed, validate that the editor is actually in the editor group, 

112 # and if not, unset them 

113 eg = models.EditorGroup.pull_by_key("name", j.editor_group) 

114 if eg is not None: 

115 all_eds = eg.associates + [eg.editor] 

116 if j.editor not in all_eds: 

117 fc.form.editor.data = None 

118 else: 

119 # if we didn't find the editor group, this is broken anyway, so reset the editor data anyway 

120 fc.form.editor.data = None 

121 

122 for k, v in metadata.items(): 

123 if k != "change_doaj_seal": 

124 job.add_audit_message("Setting {f} to {x} for journal {y}".format(f=k, x=v, y=journal_id)) 

125 fc.form[k].data = v 

126 else: 

127 if v: 

128 fc.form.doaj_seal.data = v 

129 updated = True 

130 

131 if note: 

132 job.add_audit_message("Adding note to for journal {y}".format(y=journal_id)) 

133 fc.form.notes.append_entry( 

134 {'note_date': datetime.now().strftime(app.config['DEFAULT_DATE_FORMAT']), 'note': note} 

135 ) 

136 updated = True 

137 

138 if updated: 

139 if fc.validate(): 

140 try: 

141 fc.finalise() 

142 except Exception as e: 

143 job.add_audit_message("Form context exception while bulk editing journal {} :\n{}".format(journal_id, str(e))) 

144 else: 

145 data_submitted = {} 

146 for affected_field_name in fc.form.errors.keys(): 

147 affected_field = getattr(fc.form, affected_field_name, 

148 ' Field {} does not exist on form. '.format(affected_field_name)) 

149 if isinstance(affected_field, str): # ideally this should never happen, an error should not be reported on a field that is not present on the form 

150 data_submitted[affected_field_name] = affected_field 

151 continue 

152 

153 data_submitted[affected_field_name] = affected_field.data 

154 job.add_audit_message( 

155 "Data validation failed while bulk editing journal {} :\n" 

156 "{}\n\n" 

157 "The data from the fields with the errors is:\n{}".format( 

158 journal_id, json.dumps(fc.form.errors), json.dumps(data_submitted) 

159 ) 

160 ) 

161 

162 def cleanup(self): 

163 """ 

164 Cleanup after a successful OR failed run of the task 

165 :return: 

166 """ 

167 job = self.background_job 

168 params = job.params 

169 ids = self.get_param(params, 'ids') 

170 username = job.user 

171 

172 lock.batch_unlock("journal", ids, username) 

173 

174 @classmethod 

175 def resolve_selection_query(cls, selection_query): 

176 q = deepcopy(selection_query) 

177 q["_source"] = False 

178 iterator = models.Journal.iterate(q=q, page_size=5000, wrap=False) 

179 return [j['_id'] for j in iterator] 

180 

181 @classmethod 

182 def prepare(cls, username, **kwargs): 

183 """ 

184 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob, 

185 or fail with a suitable exception 

186 

187 :param kwargs: arbitrary keyword arguments pertaining to this task type 

188 :return: a BackgroundJob instance representing this task 

189 """ 

190 

191 super(JournalBulkEditBackgroundTask, cls).prepare(username, **kwargs) 

192 

193 # first prepare a job record 

194 job = models.BackgroundJob() 

195 job.user = username 

196 job.action = cls.__action__ 

197 

198 refs = {} 

199 cls.set_reference(refs, "selection_query", json.dumps(kwargs['selection_query'])) 

200 job.reference = refs 

201 

202 params = {} 

203 

204 # get the named parameters we know may be there 

205 cls.set_param(params, 'ids', kwargs['ids']) 

206 if "note" in kwargs and kwargs["note"] is not None and kwargs["note"] != "": 

207 cls.set_param(params, 'note', kwargs.get('note', '')) 

208 

209 # get the metadata overwrites 

210 if "replacement_metadata" in kwargs: 

211 metadata = {} 

212 for k, v in kwargs["replacement_metadata"].items(): 

213 if v is not None and v != "": 

214 metadata[k] = v 

215 if len(metadata.keys()) > 0: 

216 cls.set_param(params, 'replacement_metadata', json.dumps(metadata)) 

217 

218 if not cls._job_parameter_check(params): 

219 raise BackgroundException("{}.prepare run without sufficient parameters".format(cls.__name__)) 

220 

221 job.params = params 

222 

223 # now ensure that we have the locks for all the journals 

224 # will raise an exception if this fails 

225 lock.batch_lock("journal", kwargs['ids'], username, timeout=app.config.get("BACKGROUND_TASK_LOCK_TIMEOUT", 3600)) 

226 

227 return job 

228 

229 @classmethod 

230 def submit(cls, background_job): 

231 """ 

232 Submit the specified BackgroundJob to the background queue 

233 

234 :param background_job: the BackgroundJob instance 

235 :return: 

236 """ 

237 background_job.save(blocking=True) 

238 journal_bulk_edit.schedule(args=(background_job.id,), delay=10) 

239 

240 

241@main_queue.task() 

242@write_required(script=True) 

243def journal_bulk_edit(job_id): 

244 job = models.BackgroundJob.pull(job_id) 

245 task = JournalBulkEditBackgroundTask(job) 

246 BackgroundApi.execute(task)