Coverage for portality / tasks / article_cleanup_sync.py: 87%

144 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-04 09:41 +0100

1""" 

2For each article in the DOAJ index: 

3 * Checks that it has a corresponding journal, deletes it otherwise 

4 * Ensures that the article in_doaj status is the same as the journal's 

5 * Applies the journal's information to the article metadata as needed 

6""" 

7 

8from datetime import datetime 

9 

10from portality import models 

11from portality.background import BackgroundTask, BackgroundApi, BackgroundException 

12from portality.core import app 

13from portality.tasks.helpers import background_helper 

14from portality.tasks.redis_huey import scheduled_long_queue as queue 

15 

16class ArticleCleanupSyncBackgroundTask(BackgroundTask): 

17 

18 __action__ = "article_cleanup_sync" 

19 

20 def run(self): 

21 """ 

22 Execute the task as specified by the background_jon 

23 :return: 

24 """ 

25 job = self.background_job 

26 params = job.params 

27 prep_all = self.get_param(params, "prepall", False) 

28 write_changes = self.get_param(params, "write", True) 

29 

30 batch_size = 100 

31 journal_cache = {} 

32 failed_articles = [] 

33 

34 write_batch = [] 

35 delete_batch = set() 

36 

37 updated_count = 0 

38 same_count = 0 

39 deleted_count = 0 

40 

41 # Scroll though all articles in the index within the update range 

42 i = 0 

43 page_size = 1000 

44 for article_model in models.Article.iterall_unstable( 

45 page_size=page_size, 

46 striped=True, 

47 prefix_size=3, 

48 wrap=True, 

49 logger=self.background_job.add_audit_message): 

50 

51 # for debugging, just print out the progress 

52 i += 1 

53 if i % page_size == 0: 

54 job.add_audit_message("Progress: Write batch/total: {x}/{a}; delete batch/total: {y}/{b}".format(x=len(write_batch), y=len(delete_batch), a=updated_count, b=deleted_count)) 

55 

56 # Try to find journal in our cache 

57 bibjson = article_model.bibjson() 

58 allissns = bibjson.issns() 

59 

60 cache_miss = False 

61 possibles = {} 

62 for issn in allissns: 

63 if issn in journal_cache: 

64 inst = models.Journal(**journal_cache[issn]) 

65 possibles[inst.id] = inst 

66 else: 

67 cache_miss = True 

68 assoc_journal = None 

69 if len(list(possibles.keys())) > 0: 

70 assoc_journal = self._get_best_journal(list(possibles.values())) 

71 

72 # Cache miss; ask the article model to try to find its journal 

73 if assoc_journal is None or cache_miss: 

74 journals = models.Journal.find_by_issn(allissns) 

75 if len(journals) > 0: 

76 assoc_journal = self._get_best_journal(journals) 

77 

78 # By the time we get to here, we still might not have a Journal, but we tried. 

79 if assoc_journal is not None: 

80 # Update the article's metadata, including in_doaj status 

81 reg = models.Journal() 

82 reg.set_id(assoc_journal.id) 

83 changed = article_model.add_journal_metadata(assoc_journal, reg) 

84 

85 # cache the minified journal register 

86 for issn in reg.bibjson().issns(): 

87 if issn not in journal_cache: 

88 journal_cache[issn] = reg.data 

89 

90 if not changed: 

91 same_count += 1 

92 if prep_all: # This gets done below, but can override to prep unchanged ones here 

93 article_model.prep() 

94 write_batch.append(article_model.data) 

95 else: 

96 updated_count += 1 

97 if write_changes: 

98 article_model.prep() 

99 write_batch.append(article_model.data) 

100 

101 else: 

102 # This article's Journal is no-more, or has evaded us; we delete the article. 

103 deleted_count += 1 

104 if write_changes: 

105 delete_batch.add(article_model.id) 

106 

107 # When we have reached the batch limit, do some writing or deleting 

108 if len(write_batch) >= batch_size: 

109 job.add_audit_message("Writing {x} articles".format(x=len(write_batch))) 

110 models.Article.bulk(documents=write_batch) 

111 write_batch = [] 

112 

113 if len(delete_batch) >= batch_size: 

114 job.add_audit_message("Deleting {x} articles".format(x=len(delete_batch))) 

115 models.Article.bulk_delete(delete_batch) 

116 delete_batch.clear() 

117 

118 # Finish the last part-batches of writes or deletes 

119 if len(write_batch) > 0: 

120 job.add_audit_message("Writing {x} articles".format(x=len(write_batch))) 

121 models.Article.bulk(documents=write_batch) 

122 if len(delete_batch) > 0: 

123 job.add_audit_message("Deleting {x} articles".format(x=len(delete_batch))) 

124 models.Article.bulk_delete(delete_batch) 

125 delete_batch.clear() 

126 

127 if write_changes: 

128 job.add_audit_message("Done. {0} articles updated, {1} remain unchanged, and {2} deleted.".format(updated_count, same_count, deleted_count)) 

129 else: 

130 job.add_audit_message("Done. Changes not written to index. {0} articles to be updated, {1} to remain unchanged, and {2} to be deleted. Set 'write' to write changes.".format(updated_count, same_count, deleted_count)) 

131 

132 if len(failed_articles) > 0: 

133 job.add_audit_message("Failed to create models for {x} articles in the index. Something is quite wrong.".format(x=len(failed_articles))) 

134 job.add_audit_message("Failed article ids: {x}".format(x=", ".join(failed_articles))) 

135 job.fail() 

136 

137 def _get_best_journal(self, journals): 

138 if len(journals) == 1: 

139 return list(journals)[0] 

140 

141 # in_doaj 

142 # most recently updated (manual, then automatic) 

143 # both issns match 

144 result = { "in_doaj" : {}, "not_in_doaj" : {}} 

145 for j in journals: 

146 in_doaj = j.is_in_doaj() 

147 lmu = j.last_manual_update_timestamp 

148 lu = j.last_updated_timestamp 

149 

150 context = None 

151 if in_doaj: 

152 context = result["in_doaj"] 

153 else: 

154 context = result["not_in_doaj"] 

155 

156 if lmu is None: 

157 lmu = datetime.utcfromtimestamp(0) 

158 if lmu not in context: 

159 context[lmu] = {} 

160 context[lmu][lu] = j 

161 

162 context = None 

163 if len(list(result["in_doaj"].keys())) > 0: 

164 context = result["in_doaj"] 

165 else: 

166 context = result["not_in_doaj"] 

167 

168 lmus = list(context.keys()) 

169 lmus.sort() 

170 context = context[lmus.pop()] 

171 

172 lus = list(context.keys()) 

173 lus.sort() 

174 best = context[lus.pop()] 

175 return best 

176 

177 def cleanup(self): 

178 """ 

179 Cleanup after a successful OR failed run of the task 

180 :return: 

181 """ 

182 pass 

183 

184 @classmethod 

185 def prepare(cls, username, **kwargs): 

186 """ 

187 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob, 

188 or fail with a suitable exception 

189 

190 :param kwargs: arbitrary keyword arguments pertaining to this task type 

191 :return: a BackgroundJob instance representing this task 

192 """ 

193 

194 write = kwargs.get("write", True) 

195 prepall = kwargs.get("prepall", False) 

196 all_time = kwargs.get("all_time", True) 

197 

198 if not write and prepall: 

199 raise BackgroundException("'prepall' must be used with the 'write' parameter set to True (why prep but not save?)") 

200 

201 params = {} 

202 cls.set_param(params, "write", write) 

203 cls.set_param(params, "prepall", prepall) 

204 cls.set_param(params, "all_time", all_time) 

205 

206 # first prepare a job record 

207 job = background_helper.create_job(username=username, 

208 action=cls.__action__, 

209 params=params, 

210 queue_id=huey_helper.queue_id, ) 

211 if prepall: 

212 job.add_audit_message("'prepall' arg set. 'unchanged' articles will also have their indexes refreshed.") 

213 return job 

214 

215 @classmethod 

216 def submit(cls, background_job): 

217 """ 

218 Submit the specified BackgroundJob to the background queue 

219 

220 :param background_job: the BackgroundJob instance 

221 :return: 

222 """ 

223 background_job.save() 

224 article_cleanup_sync.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) 

225 

226 

227huey_helper = ArticleCleanupSyncBackgroundTask.create_huey_helper(queue) 

228 

229 

230@huey_helper.register_schedule 

231def scheduled_article_cleanup_sync(): 

232 user = app.config.get("SYSTEM_USERNAME") 

233 job = ArticleCleanupSyncBackgroundTask.prepare(user) 

234 ArticleCleanupSyncBackgroundTask.submit(job) 

235 

236@huey_helper.register_execute(is_load_config=False) 

237def article_cleanup_sync(job_id): 

238 job = models.BackgroundJob.pull(job_id) 

239 task = ArticleCleanupSyncBackgroundTask(job) 

240 BackgroundApi.execute(task)