Coverage for portality / tasks / process_event.py: 88%
40 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
1from portality.background import BackgroundTask
2from portality.core import app
3from portality.tasks.helpers import background_helper
4from portality.tasks.redis_huey import events_queue
5from portality import models
6from portality.bll import DOAJ
7import json
10class ProcessEventBackgroundTask(BackgroundTask):
11 """
12 Background task for processing events asynchronously.
13 """
14 __action__ = "process_event"
16 def post_execute(self):
17 """
18 Override default post-execution to do nothing.
19 We intentionally avoid triggering BACKGROUND_JOB_FINISHED for this task,
20 as it is itself processing events and should not emit another event.
21 """
22 return
24 def run(self):
25 """
26 Execute the task as specified by the background_job
27 """
28 job = self.background_job
29 params = job.params
31 # Get the serialized event from the job parameters
32 serialized_event = self.get_param(params, "serialized_event")
33 if not serialized_event:
34 raise Exception("No serialized event found in job parameters")
36 # Deserialize the event
37 event_data = json.loads(serialized_event)
38 event = models.Event(raw=event_data)
40 # Get the events service and consume the event
41 events_service = DOAJ.eventsService()
42 events_service.consume(event)
44 # Record a successful completion
45 job.add_audit_message("Event processed successfully")
47 def cleanup(self):
48 """
49 Cleanup after a successful OR failed run of the task
50 """
51 pass # No cleanup needed for this task
53 @classmethod
54 def prepare(cls, username, **kwargs):
55 """
56 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob,
57 or fail with a suitable exception
59 :param username: the user creating the job
60 :param kwargs: arbitrary keyword arguments pertaining to this task type
61 :return: a BackgroundJob instance representing this task
62 """
63 event = kwargs.get("event")
64 if not event:
65 raise Exception("No event provided to ProcessEventBackgroundTask.prepare")
67 # Serialize the event
68 serialized_event = event.serialise()
70 # Create a job with the serialized event
71 params = cls.create_job_params(
72 serialized_event=serialized_event
73 )
75 # Create the background job
76 job = background_helper.create_job(
77 username=username,
78 action=cls.__action__,
79 queue_id=background_helper.get_queue_id_by_task_queue(events_queue),
80 task_queue=events_queue,
81 params=params
82 )
84 return job
86 @classmethod
87 def submit(cls, background_job):
88 """
89 Submit the specified BackgroundJob to the background queue
91 :param background_job: the BackgroundJob instance
92 :return:
93 """
94 background_helper.submit_by_background_job(
95 background_job,
96 process_event_execute
97 )
100# Create a helper for the task
101process_event_task_helper = ProcessEventBackgroundTask.create_huey_helper(events_queue)
104# Register the execute function
105@process_event_task_helper.register_execute()
106def process_event_execute(job_id):
107 process_event_task_helper.execute_common(job_id)