Coverage for portality / tasks / article_bulk_delete.py: 97%

79 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-04 09:41 +0100

1import json 

2from copy import deepcopy 

3 

4from flask_login import current_user 

5 

6from portality import models 

7from portality.core import app 

8from portality.background import AdminBackgroundTask, BackgroundApi, BackgroundException, BackgroundSummary 

9from portality.tasks.redis_huey import events_queue as queue 

10from portality.util import batch_up 

11 

12 

13def article_bulk_delete_manage(selection_query, dry_run=True): 

14 if dry_run: 

15 ArticleBulkDeleteBackgroundTask.check_admin_privilege(current_user.id) 

16 estimate = ArticleBulkDeleteBackgroundTask.estimate_delete_counts(selection_query) 

17 return BackgroundSummary(None, affected={"articles": estimate}) 

18 

19 ids = ArticleBulkDeleteBackgroundTask.resolve_selection_query(selection_query) 

20 job = ArticleBulkDeleteBackgroundTask.prepare( 

21 current_user.id, 

22 selection_query=selection_query, 

23 ids=ids 

24 ) 

25 ArticleBulkDeleteBackgroundTask.submit(job) 

26 

27 affected = len(ids) 

28 job_id = None 

29 if job is not None: 

30 job_id = job.id 

31 return BackgroundSummary(job_id, affected={"articles": affected}) 

32 

33 

34class ArticleBulkDeleteBackgroundTask(AdminBackgroundTask): 

35 BATCH_SIZE = 1000 

36 

37 __action__ = "article_bulk_delete" 

38 

39 @classmethod 

40 def _job_parameter_check(cls, params): 

41 # we definitely need "ids" defined 

42 return bool(cls.get_param(params, 'ids')) 

43 

44 def run(self): 

45 """ 

46 Execute the task as specified by the background_job 

47 :return: 

48 """ 

49 job = self.background_job 

50 params = job.params 

51 

52 ids = self.get_param(params, 'ids') 

53 

54 if not self._job_parameter_check(params): 

55 raise BackgroundException("{}.run run without sufficient parameters".format(self.__class__.__name__)) 

56 

57 batches_count = len(ids) // self.BATCH_SIZE + (0 if len(ids) % self.BATCH_SIZE == 0 else 1) 

58 job.add_audit_message("About to delete {} articles in {} batches".format(len(ids), batches_count)) 

59 

60 for batch_num, batch in enumerate(batch_up(ids, self.BATCH_SIZE), start=1): 

61 article_delete_q_by_ids = models.Article.make_query(should_terms={'_id': batch}, consistent_order=False) 

62 models.Article.delete_selected(query=article_delete_q_by_ids, snapshot=True) 

63 job.add_audit_message("Deleted {} articles in batch {} of {}".format(len(batch), batch_num, batches_count)) 

64 

65 job.add_audit_message("Deleted {} articles".format(len(ids))) 

66 

67 def cleanup(self): 

68 """ 

69 Cleanup after a successful OR failed run of the task 

70 :return: 

71 """ 

72 pass 

73 

74 @classmethod 

75 def estimate_delete_counts(cls, selection_query): 

76 q = deepcopy(selection_query) 

77 res = models.Article.query(q=q) 

78 return res['hits']['total']['value'] 

79 

80 @classmethod 

81 def resolve_selection_query(cls, selection_query): 

82 q = deepcopy(selection_query) 

83 q["_source"] = False 

84 iterator = models.Article.iterate(q=q, page_size=5000, wrap=False) 

85 return [j['_id'] for j in iterator] 

86 

87 @classmethod 

88 def prepare(cls, username, **kwargs): 

89 """ 

90 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob, 

91 or fail with a suitable exception 

92 

93 :param kwargs: arbitrary keyword arguments pertaining to this task type 

94 :return: a BackgroundJob instance representing this task 

95 """ 

96 

97 super(ArticleBulkDeleteBackgroundTask, cls).prepare(username, **kwargs) 

98 

99 # first prepare a job record 

100 job = models.BackgroundJob() 

101 job.user = username 

102 job.action = cls.__action__ 

103 refs = {} 

104 cls.set_reference(refs, "selection_query", json.dumps(kwargs['selection_query'])) 

105 job.reference = refs 

106 

107 params = {} 

108 cls.set_param(params, 'ids', kwargs['ids']) 

109 

110 if not cls._job_parameter_check(params): 

111 raise BackgroundException("{}.prepare run without sufficient parameters".format(cls.__name__)) 

112 

113 job.params = params 

114 job.queue_id = huey_helper.queue_id 

115 

116 return job 

117 

118 @classmethod 

119 def submit(cls, background_job): 

120 """ 

121 Submit the specified BackgroundJob to the background queue 

122 

123 :param background_job: the BackgroundJob instance 

124 :return: 

125 """ 

126 background_job.save(blocking=True) 

127 article_bulk_delete.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) 

128 

129 

130huey_helper = ArticleBulkDeleteBackgroundTask.create_huey_helper(queue) 

131 

132 

133@huey_helper.register_execute(is_load_config=False) 

134def article_bulk_delete(job_id): 

135 job = models.BackgroundJob.pull(job_id) 

136 task = ArticleBulkDeleteBackgroundTask(job) 

137 BackgroundApi.execute(task)