Coverage for portality / tasks / article_bulk_create.py: 96%

57 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 00:09 +0100

1import json 

2from pathlib import Path 

3from typing import List 

4 

5from portality import models 

6from portality.background import BackgroundTask, BackgroundApi 

7from portality.core import app 

8from portality.crosswalks.exceptions import CrosswalkException 

9from portality.lib import dataobj 

10from portality.models.uploads import BulkArticles 

11from portality.tasks.helpers import background_helper, articles_upload_helper 

12from portality.tasks.redis_huey import events_queue as queue 

13 

14 

15def get_upload_dir_path() -> Path: 

16 return Path(app.config.get("UPLOAD_ASYNC_DIR", ".")) 

17 

18 

19def get_upload_path(upload: BulkArticles) -> Path: 

20 p = get_upload_dir_path() 

21 p.mkdir(parents=True, exist_ok=True) 

22 return p / upload.local_filename 

23 

24 

25def prep_article(data, account) -> models.Article: 

26 from portality.api.current import ArticlesCrudApi 

27 try: 

28 return ArticlesCrudApi.prep_article(data, account) 

29 except ( 

30 dataobj.DataStructureException, 

31 dataobj.ScriptTagFoundException, 

32 ) as e: 

33 raise CrosswalkException(message=str(e)) 

34 

35 

36class ArticleBulkCreateBackgroundTask(BackgroundTask): 

37 __action__ = "article_bulk_create" 

38 

39 def run(self): 

40 job = self.background_job 

41 bulk_articles = BulkArticles.pull(self.get_param(job.params, "upload_id")) 

42 articles_path = get_upload_path(bulk_articles) 

43 uploader = models.Account.pull(job.user) or {} 

44 

45 def _articles_factory(path): 

46 articles = (prep_article(d, uploader) for d in json.loads(Path(path).read_text())) 

47 return [models.Article(**raw) for raw in articles] 

48 

49 articles_upload_helper.upload_process(bulk_articles, job, articles_path, _articles_factory) 

50 bulk_articles.save() 

51 

52 def cleanup(self): 

53 pass 

54 

55 @classmethod 

56 def prepare(cls, username, incoming_articles: List[dict] = None, **kwargs): 

57 bulk_articles = BulkArticles() 

58 bulk_articles.incoming(username) 

59 bulk_articles.save() 

60 

61 # create articles json file in local for `run` 

62 articles_json_path = get_upload_path(bulk_articles) 

63 if articles_json_path.exists(): 

64 app.logger.warning(f'bulk_articles file already exist. -- {articles_json_path.as_posix()}') 

65 articles_json_path.unlink() 

66 articles_json_path.write_text(json.dumps(incoming_articles)) 

67 

68 params = {} 

69 cls.set_param(params, "upload_id", bulk_articles.id) 

70 

71 return background_helper.create_job(username=username, 

72 action=cls.__action__, 

73 queue_id=huey_helper.queue_id, 

74 params=params, ) 

75 

76 @classmethod 

77 def submit(cls, background_job): 

78 background_helper.submit_by_background_job(background_job, article_bulk_create) 

79 

80 

81huey_helper = ArticleBulkCreateBackgroundTask.create_huey_helper(queue) 

82 

83 

84@huey_helper.register_execute(is_load_config=False) 

85def article_bulk_create(job_id): 

86 background_helper.execute_by_job_id(job_id, ArticleBulkCreateBackgroundTask)