Coverage for portality/tasks/article_bulk_delete.py: 97%

78 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-22 15:59 +0100

1from copy import deepcopy 

2import json 

3 

4from flask_login import current_user 

5 

6from portality import models 

7 

8from portality.tasks.redis_huey import main_queue 

9from portality.decorators import write_required 

10from portality.util import batch_up 

11 

12from portality.background import AdminBackgroundTask, BackgroundApi, BackgroundException, BackgroundSummary 

13 

14 

15def article_bulk_delete_manage(selection_query, dry_run=True): 

16 

17 if dry_run: 

18 ArticleBulkDeleteBackgroundTask.check_admin_privilege(current_user.id) 

19 estimate = ArticleBulkDeleteBackgroundTask.estimate_delete_counts(selection_query) 

20 return BackgroundSummary(None, affected={"articles" : estimate}) 

21 

22 ids = ArticleBulkDeleteBackgroundTask.resolve_selection_query(selection_query) 

23 job = ArticleBulkDeleteBackgroundTask.prepare( 

24 current_user.id, 

25 selection_query=selection_query, 

26 ids=ids 

27 ) 

28 ArticleBulkDeleteBackgroundTask.submit(job) 

29 

30 affected = len(ids) 

31 job_id = None 

32 if job is not None: 

33 job_id = job.id 

34 return BackgroundSummary(job_id, affected={"articles" : affected}) 

35 

36 

37class ArticleBulkDeleteBackgroundTask(AdminBackgroundTask): 

38 

39 BATCH_SIZE = 1000 

40 

41 __action__ = "article_bulk_delete" 

42 

43 @classmethod 

44 def _job_parameter_check(cls, params): 

45 # we definitely need "ids" defined 

46 return bool(cls.get_param(params, 'ids')) 

47 

48 def run(self): 

49 """ 

50 Execute the task as specified by the background_job 

51 :return: 

52 """ 

53 job = self.background_job 

54 params = job.params 

55 

56 ids = self.get_param(params, 'ids') 

57 

58 if not self._job_parameter_check(params): 

59 raise BackgroundException("{}.run run without sufficient parameters".format(self.__class__.__name__)) 

60 

61 batches_count = len(ids) // self.BATCH_SIZE + (0 if len(ids) % self.BATCH_SIZE == 0 else 1) 

62 job.add_audit_message("About to delete {} articles in {} batches".format(len(ids), batches_count)) 

63 

64 for batch_num, batch in enumerate(batch_up(ids, self.BATCH_SIZE), start=1): 

65 article_delete_q_by_ids = models.Article.make_query(should_terms={'_id': batch}, consistent_order=False) 

66 models.Article.delete_selected(query=article_delete_q_by_ids, snapshot=True) 

67 job.add_audit_message("Deleted {} articles in batch {} of {}".format(len(batch), batch_num, batches_count)) 

68 

69 job.add_audit_message("Deleted {} articles".format(len(ids))) 

70 

71 def cleanup(self): 

72 """ 

73 Cleanup after a successful OR failed run of the task 

74 :return: 

75 """ 

76 pass 

77 

78 @classmethod 

79 def estimate_delete_counts(cls, selection_query): 

80 q = deepcopy(selection_query) 

81 res = models.Article.query(q=q) 

82 return res['hits']['total']['value'] 

83 

84 @classmethod 

85 def resolve_selection_query(cls, selection_query): 

86 q = deepcopy(selection_query) 

87 q["_source"] = False 

88 iterator = models.Article.iterate(q=q, page_size=5000, wrap=False) 

89 return [j['_id'] for j in iterator] 

90 

91 @classmethod 

92 def prepare(cls, username, **kwargs): 

93 """ 

94 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob, 

95 or fail with a suitable exception 

96 

97 :param kwargs: arbitrary keyword arguments pertaining to this task type 

98 :return: a BackgroundJob instance representing this task 

99 """ 

100 

101 super(ArticleBulkDeleteBackgroundTask, cls).prepare(username, **kwargs) 

102 

103 # first prepare a job record 

104 job = models.BackgroundJob() 

105 job.user = username 

106 job.action = cls.__action__ 

107 refs = {} 

108 cls.set_reference(refs, "selection_query", json.dumps(kwargs['selection_query'])) 

109 job.reference = refs 

110 

111 params = {} 

112 cls.set_param(params, 'ids', kwargs['ids']) 

113 

114 if not cls._job_parameter_check(params): 

115 raise BackgroundException("{}.prepare run without sufficient parameters".format(cls.__name__)) 

116 

117 job.params = params 

118 

119 return job 

120 

121 @classmethod 

122 def submit(cls, background_job): 

123 """ 

124 Submit the specified BackgroundJob to the background queue 

125 

126 :param background_job: the BackgroundJob instance 

127 :return: 

128 """ 

129 background_job.save(blocking=True) 

130 article_bulk_delete.schedule(args=(background_job.id,), delay=10) 

131 

132 

133@main_queue.task() 

134@write_required(script=True) 

135def article_bulk_delete(job_id): 

136 job = models.BackgroundJob.pull(job_id) 

137 task = ArticleBulkDeleteBackgroundTask(job) 

138 BackgroundApi.execute(task)