Coverage for portality/tasks/article_cleanup_sync.py: 88%

144 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-22 15:59 +0100

1""" 

2For each article in the DOAJ index: 

3 * Checks that it has a corresponding journal, deletes it otherwise 

4 * Ensures that the article in_doaj status is the same as the journal's 

5 * Applies the journal's information to the article metadata as needed 

6""" 

7 

8from datetime import datetime 

9 

10from portality import models 

11from portality.core import app 

12from portality.tasks.redis_huey import long_running, schedule 

13from portality.decorators import write_required 

14from portality.background import BackgroundTask, BackgroundApi, BackgroundException 

15 

16 

17class ArticleCleanupSyncBackgroundTask(BackgroundTask): 

18 

19 __action__ = "article_cleanup_sync" 

20 

21 def run(self): 

22 """ 

23 Execute the task as specified by the background_jon 

24 :return: 

25 """ 

26 job = self.background_job 

27 params = job.params 

28 prep_all = self.get_param(params, "prepall", False) 

29 write_changes = self.get_param(params, "write", True) 

30 

31 batch_size = 100 

32 journal_cache = {} 

33 failed_articles = [] 

34 

35 write_batch = [] 

36 delete_batch = set() 

37 

38 updated_count = 0 

39 same_count = 0 

40 deleted_count = 0 

41 

42 # Scroll though all articles in the index 

43 i = 0 

44 for article_model in models.Article.iterate(q={"query": {"match_all": {}}, "sort": ["_doc"]}, page_size=100, wrap=True, keepalive='5m'): 

45 

46 # for debugging, just print out the progress 

47 i += 1 

48 print(i, article_model.id, len(list(journal_cache.keys())), len(write_batch), len(delete_batch)) 

49 

50 # Try to find journal in our cache 

51 bibjson = article_model.bibjson() 

52 allissns = bibjson.issns() 

53 

54 cache_miss = False 

55 possibles = {} 

56 for issn in allissns: 

57 if issn in journal_cache: 

58 inst = models.Journal(**journal_cache[issn]) 

59 possibles[inst.id] = inst 

60 else: 

61 cache_miss = True 

62 assoc_journal = None 

63 if len(list(possibles.keys())) > 0: 

64 assoc_journal = self._get_best_journal(list(possibles.values())) 

65 

66 # Cache miss; ask the article model to try to find its journal 

67 if assoc_journal is None or cache_miss: 

68 journals = models.Journal.find_by_issn(allissns) 

69 if len(journals) > 0: 

70 assoc_journal = self._get_best_journal(journals) 

71 

72 # By the time we get to here, we still might not have a Journal, but we tried. 

73 if assoc_journal is not None: 

74 # Update the article's metadata, including in_doaj status 

75 reg = models.Journal() 

76 reg.set_id(assoc_journal.id) 

77 changed = article_model.add_journal_metadata(assoc_journal, reg) 

78 

79 # cache the minified journal register 

80 for issn in reg.bibjson().issns(): 

81 if issn not in journal_cache: 

82 journal_cache[issn] = reg.data 

83 

84 if not changed: 

85 same_count += 1 

86 if prep_all: # This gets done below, but can override to prep unchanged ones here 

87 article_model.prep() 

88 write_batch.append(article_model.data) 

89 else: 

90 updated_count += 1 

91 if write_changes: 

92 article_model.prep() 

93 write_batch.append(article_model.data) 

94 

95 else: 

96 # This article's Journal is no-more, or has evaded us; we delete the article. 

97 deleted_count += 1 

98 if write_changes: 

99 delete_batch.add(article_model.id) 

100 

101 # When we have reached the batch limit, do some writing or deleting 

102 if len(write_batch) >= batch_size: 

103 job.add_audit_message("Writing {x} articles".format(x=len(write_batch))) 

104 models.Article.bulk(documents=write_batch) 

105 write_batch = [] 

106 

107 if len(delete_batch) >= batch_size: 

108 job.add_audit_message("Deleting {x} articles".format(x=len(delete_batch))) 

109 models.Article.bulk_delete(delete_batch) 

110 delete_batch.clear() 

111 

112 # Finish the last part-batches of writes or deletes 

113 if len(write_batch) > 0: 

114 job.add_audit_message("Writing {x} articles".format(x=len(write_batch))) 

115 models.Article.bulk(documents=write_batch) 

116 if len(delete_batch) > 0: 

117 job.add_audit_message("Deleting {x} articles".format(x=len(delete_batch))) 

118 models.Article.bulk_delete(delete_batch) 

119 delete_batch.clear() 

120 

121 if write_changes: 

122 job.add_audit_message("Done. {0} articles updated, {1} remain unchanged, and {2} deleted.".format(updated_count, same_count, deleted_count)) 

123 else: 

124 job.add_audit_message("Done. Changes not written to index. {0} articles to be updated, {1} to remain unchanged, and {2} to be deleted. Set 'write' to write changes.".format(updated_count, same_count, deleted_count)) 

125 

126 if len(failed_articles) > 0: 

127 job.add_audit_message("Failed to create models for {x} articles in the index. Something is quite wrong.".format(x=len(failed_articles))) 

128 job.add_audit_message("Failed article ids: {x}".format(x=", ".join(failed_articles))) 

129 job.fail() 

130 

131 def _get_best_journal(self, journals): 

132 if len(journals) == 1: 

133 return list(journals)[0] 

134 

135 # in_doaj 

136 # most recently updated (manual, then automatic) 

137 # both issns match 

138 result = { "in_doaj" : {}, "not_in_doaj" : {}} 

139 for j in journals: 

140 in_doaj = j.is_in_doaj() 

141 lmu = j.last_manual_update_timestamp 

142 lu = j.last_updated_timestamp 

143 

144 context = None 

145 if in_doaj: 

146 context = result["in_doaj"] 

147 else: 

148 context = result["not_in_doaj"] 

149 

150 if lmu is None: 

151 lmu = datetime.utcfromtimestamp(0) 

152 if lmu not in context: 

153 context[lmu] = {} 

154 context[lmu][lu] = j 

155 

156 context = None 

157 if len(list(result["in_doaj"].keys())) > 0: 

158 context = result["in_doaj"] 

159 else: 

160 context = result["not_in_doaj"] 

161 

162 lmus = list(context.keys()) 

163 lmus.sort() 

164 context = context[lmus.pop()] 

165 

166 lus = list(context.keys()) 

167 lus.sort() 

168 best = context[lus.pop()] 

169 return best 

170 

171 def cleanup(self): 

172 """ 

173 Cleanup after a successful OR failed run of the task 

174 :return: 

175 """ 

176 pass 

177 

178 @classmethod 

179 def prepare(cls, username, **kwargs): 

180 """ 

181 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob, 

182 or fail with a suitable exception 

183 

184 :param kwargs: arbitrary keyword arguments pertaining to this task type 

185 :return: a BackgroundJob instance representing this task 

186 """ 

187 

188 write = kwargs.get("write", True) 

189 prepall = kwargs.get("prepall", False) 

190 

191 if not write and prepall: 

192 raise BackgroundException("'prepall' must be used with the 'write' parameter set to True (why prep but not save?)") 

193 

194 params = {} 

195 cls.set_param(params, "write", write) 

196 cls.set_param(params, "prepall", prepall) 

197 

198 # first prepare a job record 

199 job = models.BackgroundJob() 

200 job.user = username 

201 job.action = cls.__action__ 

202 job.params = params 

203 if prepall: 

204 job.add_audit_message("'prepall' arg set. 'unchanged' articles will also have their indexes refreshed.") 

205 return job 

206 

207 @classmethod 

208 def submit(cls, background_job): 

209 """ 

210 Submit the specified BackgroundJob to the background queue 

211 

212 :param background_job: the BackgroundJob instance 

213 :return: 

214 """ 

215 background_job.save() 

216 article_cleanup_sync.schedule(args=(background_job.id,), delay=10) 

217 

218 

219@long_running.periodic_task(schedule("article_cleanup_sync")) 

220@write_required(script=True) 

221def scheduled_article_cleanup_sync(): 

222 user = app.config.get("SYSTEM_USERNAME") 

223 job = ArticleCleanupSyncBackgroundTask.prepare(user) 

224 ArticleCleanupSyncBackgroundTask.submit(job) 

225 

226 

227@long_running.task() 

228@write_required(script=True) 

229def article_cleanup_sync(job_id): 

230 job = models.BackgroundJob.pull(job_id) 

231 task = ArticleCleanupSyncBackgroundTask(job) 

232 BackgroundApi.execute(task)