Coverage for portality/tasks/helpers/background_helper.py: 76%
25 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-19 18:38 +0100
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-19 18:38 +0100
1""" collections of wrapper function for helping you to create BackgroundTask
3"""
4from typing import Callable, Type
6from portality import models
7from portality.background import BackgroundApi, BackgroundTask
8from portality.core import app
11def create_job(username, action, params=None):
12 """ Common way to create BackgroundJob
13 """
14 job = models.BackgroundJob()
15 job.user = username
16 job.action = action
17 if params is not None:
18 job.params = params
19 return job
22def submit_by_bg_task_type(background_task: Type[BackgroundTask], **prepare_kwargs):
23 """ Common way to submit task by BackgroundTask Class
25 """
26 user = app.config.get("SYSTEM_USERNAME")
27 job = background_task.prepare(user, **prepare_kwargs)
28 background_task.submit(job)
31def execute_by_job_id(job_id,
32 task_factory: Callable[[models.BackgroundJob], BackgroundTask]):
33 """ Common way to execute BackgroundTask by job_id
34 """
35 job = models.BackgroundJob.pull(job_id)
36 task = task_factory(job)
37 BackgroundApi.execute(task)
40def execute_by_bg_task_type(bg_task_type: Type[BackgroundTask], **prepare_kwargs):
41 """ wrapper for execute by BackgroundTask
42 """
43 user = app.config.get("SYSTEM_USERNAME")
44 job = bg_task_type.prepare(user, **prepare_kwargs)
45 task = bg_task_type(job)
46 BackgroundApi.execute(task)
48 return task