Coverage for portality/tasks/suggestion_bulk_edit.py: 88%

115 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-19 18:38 +0100

1from copy import deepcopy 

2import json 

3from datetime import datetime 

4 

5from flask_login import current_user 

6 

7from portality import models, lock 

8from portality.core import app 

9 

10from portality.tasks.redis_huey import main_queue 

11from portality.decorators import write_required 

12 

13from portality.background import AdminBackgroundTask, BackgroundApi, BackgroundException, BackgroundSummary 

14from portality.forms.application_forms import ApplicationFormFactory 

15from portality.lib.formulaic import FormulaicException 

16 

17 

18def suggestion_manage(selection_query, dry_run=True, editor_group='', note='', application_status=''): 

19 ids = SuggestionBulkEditBackgroundTask.resolve_selection_query(selection_query) 

20 if dry_run: 

21 SuggestionBulkEditBackgroundTask.check_admin_privilege(current_user.id) 

22 return BackgroundSummary(None, affected={"applications" : len(ids)}) 

23 # return len(ids) 

24 

25 job = SuggestionBulkEditBackgroundTask.prepare( 

26 current_user.id, 

27 selection_query=selection_query, 

28 editor_group=editor_group, 

29 note=note, 

30 application_status=application_status, 

31 ids=ids 

32 ) 

33 SuggestionBulkEditBackgroundTask.submit(job) 

34 

35 affected = len(ids) 

36 job_id = None 

37 if job is not None: 

38 job_id = job.id 

39 return BackgroundSummary(job_id, affected={"applications" : affected}) 

40 

41 # return len(ids) 

42 

43 

44class SuggestionBulkEditBackgroundTask(AdminBackgroundTask): 

45 __action__ = "suggestion_bulk_edit" 

46 

47 @classmethod 

48 def _job_parameter_check(cls, params): 

49 # we definitely need "ids" defined 

50 # we need at least one of, "editor_group" or "note" or "application_status" 

51 return bool( 

52 cls.get_param(params, 'ids') and \ 

53 (cls.get_param(params, 'editor_group') or cls.get_param(params, 'note') or cls.get_param(params, 'application_status')) 

54 ) 

55 

56 def run(self): 

57 """ 

58 Execute the task as specified by the background_job 

59 :return: 

60 """ 

61 job = self.background_job 

62 params = job.params 

63 account = models.Account.pull(job.user) 

64 

65 ids = self.get_param(params, 'ids') 

66 editor_group = self.get_param(params, 'editor_group') 

67 note = self.get_param(params, 'note') 

68 application_status = self.get_param(params, 'application_status') 

69 

70 if not self._job_parameter_check(params): 

71 raise BackgroundException("{}.run run without sufficient parameters".format(self.__class__.__name__)) 

72 

73 for suggestion_id in ids: 

74 updated = False 

75 

76 s = models.Suggestion.pull(suggestion_id) 

77 

78 if s is None: 

79 job.add_audit_message("Suggestion with id {} does not exist, skipping".format(suggestion_id)) 

80 continue 

81 

82 formulaic_context = ApplicationFormFactory.context("admin") 

83 fc = formulaic_context.processor(source=s) 

84 # fc = formcontext.ApplicationFormFactory.get_form_context(role="admin", source=s) 

85 

86 if editor_group: 

87 job.add_audit_message( 

88 "Setting editor_group to {x} for suggestion {y}".format(x=str(editor_group), y=suggestion_id)) 

89 

90 # set the editor group 

91 f = fc.form.editor_group 

92 f.data = editor_group 

93 

94 # clear the editor 

95 ed = fc.form.editor 

96 ed.data = None 

97 

98 updated = True 

99 

100 if note: 

101 job.add_audit_message("Adding note to for suggestion {y}".format(y=suggestion_id)) 

102 fc.form.notes.append_entry( 

103 {'date': datetime.now().strftime(app.config['DEFAULT_DATE_FORMAT']), 'note': note} 

104 ) 

105 updated = True 

106 

107 if application_status: 

108 job.add_audit_message( 

109 "Setting application_status to {x} for suggestion {y}".format(x=str(editor_group), y=suggestion_id)) 

110 f = fc.form.application_status 

111 f.data = application_status 

112 updated = True 

113 

114 if updated: 

115 if fc.validate(): 

116 try: 

117 fc.finalise(account) 

118 except FormulaicException as e: 

119 job.add_audit_message( 

120 "Form context exception while bulk editing suggestion {} :\n{}".format(suggestion_id, str(e))) 

121 else: 

122 data_submitted = {} 

123 for affected_field_name in list(fc.form.errors.keys()): 

124 affected_field = getattr(fc.form, affected_field_name, 

125 ' Field {} does not exist on form. '.format(affected_field_name)) 

126 if isinstance(affected_field, str): # ideally this should never happen, an error should not be reported on a field that is not present on the form 

127 data_submitted[affected_field_name] = affected_field 

128 continue 

129 

130 data_submitted[affected_field_name] = affected_field.data 

131 job.add_audit_message( 

132 "Data validation failed while bulk editing suggestion {} :\n" 

133 "{}\n\n" 

134 "The data from the fields with the errors is:\n{}".format( 

135 suggestion_id, json.dumps(fc.form.errors), json.dumps(data_submitted) 

136 ) 

137 ) 

138 

139 def cleanup(self): 

140 """ 

141 Cleanup after a successful OR failed run of the task 

142 :return: 

143 """ 

144 job = self.background_job 

145 params = job.params 

146 ids = self.get_param(params, 'ids') 

147 username = job.user 

148 

149 lock.batch_unlock("suggestion", ids, username) 

150 

151 @classmethod 

152 def resolve_selection_query(cls, selection_query): 

153 q = deepcopy(selection_query) 

154 q["_source"] = False 

155 iterator = models.Suggestion.iterate(q=q, page_size=5000, wrap=False) 

156 return [s['_id'] for s in iterator] 

157 

158 @classmethod 

159 def prepare(cls, username, **kwargs): 

160 """ 

161 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob, 

162 or fail with a suitable exception 

163 

164 :param kwargs: arbitrary keyword arguments pertaining to this task type 

165 :return: a BackgroundJob instance representing this task 

166 """ 

167 

168 super(SuggestionBulkEditBackgroundTask, cls).prepare(username, **kwargs) 

169 

170 # first prepare a job record 

171 job = models.BackgroundJob() 

172 job.user = username 

173 job.action = cls.__action__ 

174 job.reference = {'selection_query': json.dumps(kwargs['selection_query'])} 

175 

176 params = {} 

177 cls.set_param(params, 'ids', kwargs['ids']) 

178 cls.set_param(params, 'editor_group', kwargs.get('editor_group', '')) 

179 cls.set_param(params, 'note', kwargs.get('note', '')) 

180 cls.set_param(params, 'application_status', kwargs.get('application_status', '')) 

181 

182 if not cls._job_parameter_check(params): 

183 raise BackgroundException("{}.prepare run without sufficient parameters".format(cls.__name__)) 

184 

185 job.params = params 

186 

187 # now ensure that we have the locks for all the suggestions 

188 # will raise an exception if this fails 

189 lock.batch_lock("suggestion", kwargs['ids'], username, timeout=app.config.get("BACKGROUND_TASK_LOCK_TIMEOUT", 3600)) 

190 

191 return job 

192 

193 @classmethod 

194 def submit(cls, background_job): 

195 """ 

196 Submit the specified BackgroundJob to the background queue 

197 

198 :param background_job: the BackgroundJob instance 

199 :return: 

200 """ 

201 background_job.save(blocking=True) 

202 suggestion_bulk_edit.schedule(args=(background_job.id,), delay=10) 

203 

204 

205@main_queue.task() 

206@write_required(script=True) 

207def suggestion_bulk_edit(job_id): 

208 job = models.BackgroundJob.pull(job_id) 

209 task = SuggestionBulkEditBackgroundTask(job) 

210 BackgroundApi.execute(task)