Coverage for portality / tasks / suggestion_bulk_edit.py: 88%
115 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
1import json
2from copy import deepcopy
4from flask_login import current_user
6from portality import models, lock
7from portality.background import AdminBackgroundTask, BackgroundApi, BackgroundException, BackgroundSummary
8from portality.core import app
9from portality.forms.application_forms import ApplicationFormFactory
10from portality.lib import dates
11from portality.lib.formulaic import FormulaicException
12from portality.tasks.redis_huey import events_queue as queue
15def suggestion_manage(selection_query, dry_run=True, editor_group='', note='', application_status=''):
16 ids = SuggestionBulkEditBackgroundTask.resolve_selection_query(selection_query)
17 if dry_run:
18 SuggestionBulkEditBackgroundTask.check_admin_privilege(current_user.id)
19 return BackgroundSummary(None, affected={"applications": len(ids)})
20 # return len(ids)
22 job = SuggestionBulkEditBackgroundTask.prepare(
23 current_user.id,
24 selection_query=selection_query,
25 editor_group=editor_group,
26 note=note,
27 application_status=application_status,
28 ids=ids
29 )
30 SuggestionBulkEditBackgroundTask.submit(job)
32 affected = len(ids)
33 job_id = None
34 if job is not None:
35 job_id = job.id
36 return BackgroundSummary(job_id, affected={"applications": affected})
39class SuggestionBulkEditBackgroundTask(AdminBackgroundTask):
40 __action__ = "suggestion_bulk_edit"
42 @classmethod
43 def _job_parameter_check(cls, params):
44 # we definitely need "ids" defined
45 # we need at least one of, "editor_group" or "note" or "application_status"
46 return bool(
47 cls.get_param(params, 'ids') and \
48 (cls.get_param(params, 'editor_group') or cls.get_param(params, 'note') or cls.get_param(params,
49 'application_status'))
50 )
52 def run(self):
53 """
54 Execute the task as specified by the background_job
55 :return:
56 """
57 job = self.background_job
58 params = job.params
59 account = models.Account.pull(job.user)
61 ids = self.get_param(params, 'ids')
62 editor_group = self.get_param(params, 'editor_group')
63 note = self.get_param(params, 'note')
64 application_status = self.get_param(params, 'application_status')
66 if not self._job_parameter_check(params):
67 raise BackgroundException("{}.run run without sufficient parameters".format(self.__class__.__name__))
69 for suggestion_id in ids:
70 updated = False
72 s = models.Suggestion.pull(suggestion_id)
74 if s is None:
75 job.add_audit_message("Suggestion with id {} does not exist, skipping".format(suggestion_id))
76 continue
78 formulaic_context = ApplicationFormFactory.context("admin")
79 fc = formulaic_context.processor(source=s)
80 # fc = formcontext.ApplicationFormFactory.get_form_context(role="admin", source=s)
82 if editor_group:
83 job.add_audit_message(
84 "Setting editor_group to {x} for suggestion {y}".format(x=str(editor_group), y=suggestion_id))
86 # set the editor group
87 f = fc.form.editor_group
88 f.data = editor_group
90 # clear the editor
91 ed = fc.form.editor
92 ed.data = None
94 updated = True
96 if note:
97 job.add_audit_message("Adding note to for suggestion {y}".format(y=suggestion_id))
98 fc.form.notes.append_entry(
99 {'date': dates.now_str(), 'note': note}
100 )
101 updated = True
103 if application_status:
104 job.add_audit_message(
105 "Setting application_status to {x} for suggestion {y}".format(x=str(editor_group), y=suggestion_id))
106 f = fc.form.application_status
107 f.data = application_status
108 updated = True
110 if updated:
111 if fc.validate():
112 try:
113 fc.finalise(account)
114 except FormulaicException as e:
115 job.add_audit_message(
116 "Form context exception while bulk editing suggestion {} :\n{}".format(suggestion_id,
117 str(e)))
118 else:
119 data_submitted = {}
120 for affected_field_name in list(fc.form.errors.keys()):
121 affected_field = getattr(fc.form, affected_field_name,
122 ' Field {} does not exist on form. '.format(affected_field_name))
123 # ideally this should never happen, an error should not be reported on a field that is not present on the form
124 if isinstance(affected_field, str):
125 data_submitted[affected_field_name] = affected_field
126 continue
128 data_submitted[affected_field_name] = affected_field.data
129 job.add_audit_message(
130 "Data validation failed while bulk editing suggestion {} :\n"
131 "{}\n\n"
132 "The data from the fields with the errors is:\n{}".format(
133 suggestion_id, json.dumps(fc.form.errors), json.dumps(data_submitted)
134 )
135 )
137 def cleanup(self):
138 """
139 Cleanup after a successful OR failed run of the task
140 :return:
141 """
142 job = self.background_job
143 params = job.params
144 ids = self.get_param(params, 'ids')
145 username = job.user
147 lock.batch_unlock("suggestion", ids, username)
149 @classmethod
150 def resolve_selection_query(cls, selection_query):
151 q = deepcopy(selection_query)
152 q["_source"] = False
153 iterator = models.Suggestion.iterate(q=q, page_size=5000, wrap=False)
154 return [s['_id'] for s in iterator]
156 @classmethod
157 def prepare(cls, username, **kwargs):
158 """
159 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob,
160 or fail with a suitable exception
162 :param kwargs: arbitrary keyword arguments pertaining to this task type
163 :return: a BackgroundJob instance representing this task
164 """
166 super(SuggestionBulkEditBackgroundTask, cls).prepare(username, **kwargs)
168 # first prepare a job record
169 job = models.BackgroundJob()
170 job.user = username
171 job.action = cls.__action__
172 job.reference = {'selection_query': json.dumps(kwargs['selection_query'])}
174 params = {}
175 cls.set_param(params, 'ids', kwargs['ids'])
176 cls.set_param(params, 'editor_group', kwargs.get('editor_group', ''))
177 cls.set_param(params, 'note', kwargs.get('note', ''))
178 cls.set_param(params, 'application_status', kwargs.get('application_status', ''))
180 if not cls._job_parameter_check(params):
181 raise BackgroundException("{}.prepare run without sufficient parameters".format(cls.__name__))
183 job.params = params
184 job.queue_id = huey_helper.queue_id
186 # now ensure that we have the locks for all the suggestions
187 # will raise an exception if this fails
188 lock.batch_lock("suggestion", kwargs['ids'], username,
189 timeout=app.config.get("BACKGROUND_TASK_LOCK_TIMEOUT", 3600))
191 return job
193 @classmethod
194 def submit(cls, background_job):
195 """
196 Submit the specified BackgroundJob to the background queue
198 :param background_job: the BackgroundJob instance
199 :return:
200 """
201 background_job.save(blocking=True)
202 suggestion_bulk_edit.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10))
205huey_helper = SuggestionBulkEditBackgroundTask.create_huey_helper(queue)
208@huey_helper.register_execute(is_load_config=False)
209def suggestion_bulk_edit(job_id):
210 job = models.BackgroundJob.pull(job_id)
211 task = SuggestionBulkEditBackgroundTask(job)
212 BackgroundApi.execute(task)