Coverage for portality / tasks / process_event.py: 88%

40 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 00:09 +0100

1from portality.background import BackgroundTask 

2from portality.core import app 

3from portality.tasks.helpers import background_helper 

4from portality.tasks.redis_huey import events_queue 

5from portality import models 

6from portality.bll import DOAJ 

7import json 

8 

9 

10class ProcessEventBackgroundTask(BackgroundTask): 

11 """ 

12 Background task for processing events asynchronously. 

13 """ 

14 __action__ = "process_event" 

15 

16 def post_execute(self): 

17 """ 

18 Override default post-execution to do nothing. 

19 We intentionally avoid triggering BACKGROUND_JOB_FINISHED for this task, 

20 as it is itself processing events and should not emit another event. 

21 """ 

22 return 

23 

24 def run(self): 

25 """ 

26 Execute the task as specified by the background_job 

27 """ 

28 job = self.background_job 

29 params = job.params 

30 

31 # Get the serialized event from the job parameters 

32 serialized_event = self.get_param(params, "serialized_event") 

33 if not serialized_event: 

34 raise Exception("No serialized event found in job parameters") 

35 

36 # Deserialize the event 

37 event_data = json.loads(serialized_event) 

38 event = models.Event(raw=event_data) 

39 

40 # Get the events service and consume the event 

41 events_service = DOAJ.eventsService() 

42 events_service.consume(event) 

43 

44 # Record a successful completion 

45 job.add_audit_message("Event processed successfully") 

46 

47 def cleanup(self): 

48 """ 

49 Cleanup after a successful OR failed run of the task 

50 """ 

51 pass # No cleanup needed for this task 

52 

53 @classmethod 

54 def prepare(cls, username, **kwargs): 

55 """ 

56 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob, 

57 or fail with a suitable exception 

58 

59 :param username: the user creating the job 

60 :param kwargs: arbitrary keyword arguments pertaining to this task type 

61 :return: a BackgroundJob instance representing this task 

62 """ 

63 event = kwargs.get("event") 

64 if not event: 

65 raise Exception("No event provided to ProcessEventBackgroundTask.prepare") 

66 

67 # Serialize the event 

68 serialized_event = event.serialise() 

69 

70 # Create a job with the serialized event 

71 params = cls.create_job_params( 

72 serialized_event=serialized_event 

73 ) 

74 

75 # Create the background job 

76 job = background_helper.create_job( 

77 username=username, 

78 action=cls.__action__, 

79 queue_id=background_helper.get_queue_id_by_task_queue(events_queue), 

80 task_queue=events_queue, 

81 params=params 

82 ) 

83 

84 return job 

85 

86 @classmethod 

87 def submit(cls, background_job): 

88 """ 

89 Submit the specified BackgroundJob to the background queue 

90 

91 :param background_job: the BackgroundJob instance 

92 :return: 

93 """ 

94 background_helper.submit_by_background_job( 

95 background_job, 

96 process_event_execute 

97 ) 

98 

99 

100# Create a helper for the task 

101process_event_task_helper = ProcessEventBackgroundTask.create_huey_helper(events_queue) 

102 

103 

104# Register the execute function 

105@process_event_task_helper.register_execute() 

106def process_event_execute(job_id): 

107 process_event_task_helper.execute_common(job_id)