Coverage for portality/tasks/suggestion_bulk_edit.py: 88%
115 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-22 15:59 +0100
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-22 15:59 +0100
1from copy import deepcopy
2import json
3from datetime import datetime
5from flask_login import current_user
7from portality import models, lock
8from portality.core import app
10from portality.tasks.redis_huey import main_queue
11from portality.decorators import write_required
13from portality.background import AdminBackgroundTask, BackgroundApi, BackgroundException, BackgroundSummary
14from portality.forms.application_forms import ApplicationFormFactory
15from portality.lib.formulaic import FormulaicException
18def suggestion_manage(selection_query, dry_run=True, editor_group='', note='', application_status=''):
19 ids = SuggestionBulkEditBackgroundTask.resolve_selection_query(selection_query)
20 if dry_run:
21 SuggestionBulkEditBackgroundTask.check_admin_privilege(current_user.id)
22 return BackgroundSummary(None, affected={"applications" : len(ids)})
23 # return len(ids)
25 job = SuggestionBulkEditBackgroundTask.prepare(
26 current_user.id,
27 selection_query=selection_query,
28 editor_group=editor_group,
29 note=note,
30 application_status=application_status,
31 ids=ids
32 )
33 SuggestionBulkEditBackgroundTask.submit(job)
35 affected = len(ids)
36 job_id = None
37 if job is not None:
38 job_id = job.id
39 return BackgroundSummary(job_id, affected={"applications" : affected})
41 # return len(ids)
44class SuggestionBulkEditBackgroundTask(AdminBackgroundTask):
45 __action__ = "suggestion_bulk_edit"
47 @classmethod
48 def _job_parameter_check(cls, params):
49 # we definitely need "ids" defined
50 # we need at least one of, "editor_group" or "note" or "application_status"
51 return bool(
52 cls.get_param(params, 'ids') and \
53 (cls.get_param(params, 'editor_group') or cls.get_param(params, 'note') or cls.get_param(params, 'application_status'))
54 )
56 def run(self):
57 """
58 Execute the task as specified by the background_job
59 :return:
60 """
61 job = self.background_job
62 params = job.params
63 account = models.Account.pull(job.user)
65 ids = self.get_param(params, 'ids')
66 editor_group = self.get_param(params, 'editor_group')
67 note = self.get_param(params, 'note')
68 application_status = self.get_param(params, 'application_status')
70 if not self._job_parameter_check(params):
71 raise BackgroundException("{}.run run without sufficient parameters".format(self.__class__.__name__))
73 for suggestion_id in ids:
74 updated = False
76 s = models.Suggestion.pull(suggestion_id)
78 if s is None:
79 job.add_audit_message("Suggestion with id {} does not exist, skipping".format(suggestion_id))
80 continue
82 formulaic_context = ApplicationFormFactory.context("admin")
83 fc = formulaic_context.processor(source=s)
84 # fc = formcontext.ApplicationFormFactory.get_form_context(role="admin", source=s)
86 if editor_group:
87 job.add_audit_message(
88 "Setting editor_group to {x} for suggestion {y}".format(x=str(editor_group), y=suggestion_id))
90 # set the editor group
91 f = fc.form.editor_group
92 f.data = editor_group
94 # clear the editor
95 ed = fc.form.editor
96 ed.data = None
98 updated = True
100 if note:
101 job.add_audit_message("Adding note to for suggestion {y}".format(y=suggestion_id))
102 fc.form.notes.append_entry(
103 {'date': datetime.now().strftime(app.config['DEFAULT_DATE_FORMAT']), 'note': note}
104 )
105 updated = True
107 if application_status:
108 job.add_audit_message(
109 "Setting application_status to {x} for suggestion {y}".format(x=str(editor_group), y=suggestion_id))
110 f = fc.form.application_status
111 f.data = application_status
112 updated = True
114 if updated:
115 if fc.validate():
116 try:
117 fc.finalise(account)
118 except FormulaicException as e:
119 job.add_audit_message(
120 "Form context exception while bulk editing suggestion {} :\n{}".format(suggestion_id, str(e)))
121 else:
122 data_submitted = {}
123 for affected_field_name in list(fc.form.errors.keys()):
124 affected_field = getattr(fc.form, affected_field_name,
125 ' Field {} does not exist on form. '.format(affected_field_name))
126 if isinstance(affected_field, str): # ideally this should never happen, an error should not be reported on a field that is not present on the form
127 data_submitted[affected_field_name] = affected_field
128 continue
130 data_submitted[affected_field_name] = affected_field.data
131 job.add_audit_message(
132 "Data validation failed while bulk editing suggestion {} :\n"
133 "{}\n\n"
134 "The data from the fields with the errors is:\n{}".format(
135 suggestion_id, json.dumps(fc.form.errors), json.dumps(data_submitted)
136 )
137 )
139 def cleanup(self):
140 """
141 Cleanup after a successful OR failed run of the task
142 :return:
143 """
144 job = self.background_job
145 params = job.params
146 ids = self.get_param(params, 'ids')
147 username = job.user
149 lock.batch_unlock("suggestion", ids, username)
151 @classmethod
152 def resolve_selection_query(cls, selection_query):
153 q = deepcopy(selection_query)
154 q["_source"] = False
155 iterator = models.Suggestion.iterate(q=q, page_size=5000, wrap=False)
156 return [s['_id'] for s in iterator]
158 @classmethod
159 def prepare(cls, username, **kwargs):
160 """
161 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob,
162 or fail with a suitable exception
164 :param kwargs: arbitrary keyword arguments pertaining to this task type
165 :return: a BackgroundJob instance representing this task
166 """
168 super(SuggestionBulkEditBackgroundTask, cls).prepare(username, **kwargs)
170 # first prepare a job record
171 job = models.BackgroundJob()
172 job.user = username
173 job.action = cls.__action__
174 job.reference = {'selection_query': json.dumps(kwargs['selection_query'])}
176 params = {}
177 cls.set_param(params, 'ids', kwargs['ids'])
178 cls.set_param(params, 'editor_group', kwargs.get('editor_group', ''))
179 cls.set_param(params, 'note', kwargs.get('note', ''))
180 cls.set_param(params, 'application_status', kwargs.get('application_status', ''))
182 if not cls._job_parameter_check(params):
183 raise BackgroundException("{}.prepare run without sufficient parameters".format(cls.__name__))
185 job.params = params
187 # now ensure that we have the locks for all the suggestions
188 # will raise an exception if this fails
189 lock.batch_lock("suggestion", kwargs['ids'], username, timeout=app.config.get("BACKGROUND_TASK_LOCK_TIMEOUT", 3600))
191 return job
193 @classmethod
194 def submit(cls, background_job):
195 """
196 Submit the specified BackgroundJob to the background queue
198 :param background_job: the BackgroundJob instance
199 :return:
200 """
201 background_job.save(blocking=True)
202 suggestion_bulk_edit.schedule(args=(background_job.id,), delay=10)
205@main_queue.task()
206@write_required(script=True)
207def suggestion_bulk_edit(job_id):
208 job = models.BackgroundJob.pull(job_id)
209 task = SuggestionBulkEditBackgroundTask(job)
210 BackgroundApi.execute(task)