Coverage for portality / tasks / article_bulk_create.py: 96%
57 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
1import json
2from pathlib import Path
3from typing import List
5from portality import models
6from portality.background import BackgroundTask, BackgroundApi
7from portality.core import app
8from portality.crosswalks.exceptions import CrosswalkException
9from portality.lib import dataobj
10from portality.models.uploads import BulkArticles
11from portality.tasks.helpers import background_helper, articles_upload_helper
12from portality.tasks.redis_huey import events_queue as queue
15def get_upload_dir_path() -> Path:
16 return Path(app.config.get("UPLOAD_ASYNC_DIR", "."))
19def get_upload_path(upload: BulkArticles) -> Path:
20 p = get_upload_dir_path()
21 p.mkdir(parents=True, exist_ok=True)
22 return p / upload.local_filename
25def prep_article(data, account) -> models.Article:
26 from portality.api.current import ArticlesCrudApi
27 try:
28 return ArticlesCrudApi.prep_article(data, account)
29 except (
30 dataobj.DataStructureException,
31 dataobj.ScriptTagFoundException,
32 ) as e:
33 raise CrosswalkException(message=str(e))
36class ArticleBulkCreateBackgroundTask(BackgroundTask):
37 __action__ = "article_bulk_create"
39 def run(self):
40 job = self.background_job
41 bulk_articles = BulkArticles.pull(self.get_param(job.params, "upload_id"))
42 articles_path = get_upload_path(bulk_articles)
43 uploader = models.Account.pull(job.user) or {}
45 def _articles_factory(path):
46 articles = (prep_article(d, uploader) for d in json.loads(Path(path).read_text()))
47 return [models.Article(**raw) for raw in articles]
49 articles_upload_helper.upload_process(bulk_articles, job, articles_path, _articles_factory)
50 bulk_articles.save()
52 def cleanup(self):
53 pass
55 @classmethod
56 def prepare(cls, username, incoming_articles: List[dict] = None, **kwargs):
57 bulk_articles = BulkArticles()
58 bulk_articles.incoming(username)
59 bulk_articles.save()
61 # create articles json file in local for `run`
62 articles_json_path = get_upload_path(bulk_articles)
63 if articles_json_path.exists():
64 app.logger.warning(f'bulk_articles file already exist. -- {articles_json_path.as_posix()}')
65 articles_json_path.unlink()
66 articles_json_path.write_text(json.dumps(incoming_articles))
68 params = {}
69 cls.set_param(params, "upload_id", bulk_articles.id)
71 return background_helper.create_job(username=username,
72 action=cls.__action__,
73 queue_id=huey_helper.queue_id,
74 params=params, )
76 @classmethod
77 def submit(cls, background_job):
78 background_helper.submit_by_background_job(background_job, article_bulk_create)
81huey_helper = ArticleBulkCreateBackgroundTask.create_huey_helper(queue)
84@huey_helper.register_execute(is_load_config=False)
85def article_bulk_create(job_id):
86 background_helper.execute_by_job_id(job_id, ArticleBulkCreateBackgroundTask)