Coverage for portality / tasks / suggestion_bulk_edit.py: 88%

115 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 00:09 +0100

1import json 

2from copy import deepcopy 

3 

4from flask_login import current_user 

5 

6from portality import models, lock 

7from portality.background import AdminBackgroundTask, BackgroundApi, BackgroundException, BackgroundSummary 

8from portality.core import app 

9from portality.forms.application_forms import ApplicationFormFactory 

10from portality.lib import dates 

11from portality.lib.formulaic import FormulaicException 

12from portality.tasks.redis_huey import events_queue as queue 

13 

14 

15def suggestion_manage(selection_query, dry_run=True, editor_group='', note='', application_status=''): 

16 ids = SuggestionBulkEditBackgroundTask.resolve_selection_query(selection_query) 

17 if dry_run: 

18 SuggestionBulkEditBackgroundTask.check_admin_privilege(current_user.id) 

19 return BackgroundSummary(None, affected={"applications": len(ids)}) 

20 # return len(ids) 

21 

22 job = SuggestionBulkEditBackgroundTask.prepare( 

23 current_user.id, 

24 selection_query=selection_query, 

25 editor_group=editor_group, 

26 note=note, 

27 application_status=application_status, 

28 ids=ids 

29 ) 

30 SuggestionBulkEditBackgroundTask.submit(job) 

31 

32 affected = len(ids) 

33 job_id = None 

34 if job is not None: 

35 job_id = job.id 

36 return BackgroundSummary(job_id, affected={"applications": affected}) 

37 

38 

39class SuggestionBulkEditBackgroundTask(AdminBackgroundTask): 

40 __action__ = "suggestion_bulk_edit" 

41 

42 @classmethod 

43 def _job_parameter_check(cls, params): 

44 # we definitely need "ids" defined 

45 # we need at least one of, "editor_group" or "note" or "application_status" 

46 return bool( 

47 cls.get_param(params, 'ids') and \ 

48 (cls.get_param(params, 'editor_group') or cls.get_param(params, 'note') or cls.get_param(params, 

49 'application_status')) 

50 ) 

51 

52 def run(self): 

53 """ 

54 Execute the task as specified by the background_job 

55 :return: 

56 """ 

57 job = self.background_job 

58 params = job.params 

59 account = models.Account.pull(job.user) 

60 

61 ids = self.get_param(params, 'ids') 

62 editor_group = self.get_param(params, 'editor_group') 

63 note = self.get_param(params, 'note') 

64 application_status = self.get_param(params, 'application_status') 

65 

66 if not self._job_parameter_check(params): 

67 raise BackgroundException("{}.run run without sufficient parameters".format(self.__class__.__name__)) 

68 

69 for suggestion_id in ids: 

70 updated = False 

71 

72 s = models.Suggestion.pull(suggestion_id) 

73 

74 if s is None: 

75 job.add_audit_message("Suggestion with id {} does not exist, skipping".format(suggestion_id)) 

76 continue 

77 

78 formulaic_context = ApplicationFormFactory.context("admin") 

79 fc = formulaic_context.processor(source=s) 

80 # fc = formcontext.ApplicationFormFactory.get_form_context(role="admin", source=s) 

81 

82 if editor_group: 

83 job.add_audit_message( 

84 "Setting editor_group to {x} for suggestion {y}".format(x=str(editor_group), y=suggestion_id)) 

85 

86 # set the editor group 

87 f = fc.form.editor_group 

88 f.data = editor_group 

89 

90 # clear the editor 

91 ed = fc.form.editor 

92 ed.data = None 

93 

94 updated = True 

95 

96 if note: 

97 job.add_audit_message("Adding note to for suggestion {y}".format(y=suggestion_id)) 

98 fc.form.notes.append_entry( 

99 {'date': dates.now_str(), 'note': note} 

100 ) 

101 updated = True 

102 

103 if application_status: 

104 job.add_audit_message( 

105 "Setting application_status to {x} for suggestion {y}".format(x=str(editor_group), y=suggestion_id)) 

106 f = fc.form.application_status 

107 f.data = application_status 

108 updated = True 

109 

110 if updated: 

111 if fc.validate(): 

112 try: 

113 fc.finalise(account) 

114 except FormulaicException as e: 

115 job.add_audit_message( 

116 "Form context exception while bulk editing suggestion {} :\n{}".format(suggestion_id, 

117 str(e))) 

118 else: 

119 data_submitted = {} 

120 for affected_field_name in list(fc.form.errors.keys()): 

121 affected_field = getattr(fc.form, affected_field_name, 

122 ' Field {} does not exist on form. '.format(affected_field_name)) 

123 # ideally this should never happen, an error should not be reported on a field that is not present on the form 

124 if isinstance(affected_field, str): 

125 data_submitted[affected_field_name] = affected_field 

126 continue 

127 

128 data_submitted[affected_field_name] = affected_field.data 

129 job.add_audit_message( 

130 "Data validation failed while bulk editing suggestion {} :\n" 

131 "{}\n\n" 

132 "The data from the fields with the errors is:\n{}".format( 

133 suggestion_id, json.dumps(fc.form.errors), json.dumps(data_submitted) 

134 ) 

135 ) 

136 

137 def cleanup(self): 

138 """ 

139 Cleanup after a successful OR failed run of the task 

140 :return: 

141 """ 

142 job = self.background_job 

143 params = job.params 

144 ids = self.get_param(params, 'ids') 

145 username = job.user 

146 

147 lock.batch_unlock("suggestion", ids, username) 

148 

149 @classmethod 

150 def resolve_selection_query(cls, selection_query): 

151 q = deepcopy(selection_query) 

152 q["_source"] = False 

153 iterator = models.Suggestion.iterate(q=q, page_size=5000, wrap=False) 

154 return [s['_id'] for s in iterator] 

155 

156 @classmethod 

157 def prepare(cls, username, **kwargs): 

158 """ 

159 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob, 

160 or fail with a suitable exception 

161 

162 :param kwargs: arbitrary keyword arguments pertaining to this task type 

163 :return: a BackgroundJob instance representing this task 

164 """ 

165 

166 super(SuggestionBulkEditBackgroundTask, cls).prepare(username, **kwargs) 

167 

168 # first prepare a job record 

169 job = models.BackgroundJob() 

170 job.user = username 

171 job.action = cls.__action__ 

172 job.reference = {'selection_query': json.dumps(kwargs['selection_query'])} 

173 

174 params = {} 

175 cls.set_param(params, 'ids', kwargs['ids']) 

176 cls.set_param(params, 'editor_group', kwargs.get('editor_group', '')) 

177 cls.set_param(params, 'note', kwargs.get('note', '')) 

178 cls.set_param(params, 'application_status', kwargs.get('application_status', '')) 

179 

180 if not cls._job_parameter_check(params): 

181 raise BackgroundException("{}.prepare run without sufficient parameters".format(cls.__name__)) 

182 

183 job.params = params 

184 job.queue_id = huey_helper.queue_id 

185 

186 # now ensure that we have the locks for all the suggestions 

187 # will raise an exception if this fails 

188 lock.batch_lock("suggestion", kwargs['ids'], username, 

189 timeout=app.config.get("BACKGROUND_TASK_LOCK_TIMEOUT", 3600)) 

190 

191 return job 

192 

193 @classmethod 

194 def submit(cls, background_job): 

195 """ 

196 Submit the specified BackgroundJob to the background queue 

197 

198 :param background_job: the BackgroundJob instance 

199 :return: 

200 """ 

201 background_job.save(blocking=True) 

202 suggestion_bulk_edit.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) 

203 

204 

205huey_helper = SuggestionBulkEditBackgroundTask.create_huey_helper(queue) 

206 

207 

208@huey_helper.register_execute(is_load_config=False) 

209def suggestion_bulk_edit(job_id): 

210 job = models.BackgroundJob.pull(job_id) 

211 task = SuggestionBulkEditBackgroundTask(job) 

212 BackgroundApi.execute(task)