Coverage for portality/tasks/helpers/background_helper.py: 76%

25 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-19 18:38 +0100

1""" collections of wrapper function for helping you to create BackgroundTask 

2 

3""" 

4from typing import Callable, Type 

5 

6from portality import models 

7from portality.background import BackgroundApi, BackgroundTask 

8from portality.core import app 

9 

10 

11def create_job(username, action, params=None): 

12 """ Common way to create BackgroundJob 

13 """ 

14 job = models.BackgroundJob() 

15 job.user = username 

16 job.action = action 

17 if params is not None: 

18 job.params = params 

19 return job 

20 

21 

22def submit_by_bg_task_type(background_task: Type[BackgroundTask], **prepare_kwargs): 

23 """ Common way to submit task by BackgroundTask Class 

24 

25 """ 

26 user = app.config.get("SYSTEM_USERNAME") 

27 job = background_task.prepare(user, **prepare_kwargs) 

28 background_task.submit(job) 

29 

30 

31def execute_by_job_id(job_id, 

32 task_factory: Callable[[models.BackgroundJob], BackgroundTask]): 

33 """ Common way to execute BackgroundTask by job_id 

34 """ 

35 job = models.BackgroundJob.pull(job_id) 

36 task = task_factory(job) 

37 BackgroundApi.execute(task) 

38 

39 

40def execute_by_bg_task_type(bg_task_type: Type[BackgroundTask], **prepare_kwargs): 

41 """ wrapper for execute by BackgroundTask 

42 """ 

43 user = app.config.get("SYSTEM_USERNAME") 

44 job = bg_task_type.prepare(user, **prepare_kwargs) 

45 task = bg_task_type(job) 

46 BackgroundApi.execute(task) 

47 

48 return task