Coverage for portality / tasks / article_duplicate_report.py: 86%

159 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-04 09:41 +0100

1"""Task to generate a report on duplicated articles in the index""" 

2import csv 

3import json 

4import os 

5import shutil 

6from datetime import datetime 

7 

8from portality import models 

9from portality.app_email import email_archive 

10from portality.background import BackgroundTask, BackgroundApi 

11from portality.bll import exceptions 

12from portality.bll.doaj import DOAJ 

13from portality.core import app, es_connection 

14from portality.lib import dates 

15from portality.tasks.redis_huey import scheduled_long_queue as queue 

16 

17 

18class ArticleDuplicateReportBackgroundTask(BackgroundTask): 

19 __action__ = "article_duplicate_report" 

20 

21 # Keep a cache of ISSNs to owners 

22 owner_cache = {} 

23 

24 def run(self): 

25 job = self.background_job 

26 params = job.params 

27 

28 # Set up the files we need to run this task - a dir to place the report, and a place to write the article csv 

29 outdir = self.get_param(params, "outdir", "article_duplicates_" + dates.today()) 

30 job.add_audit_message("Saving reports to " + outdir) 

31 if not os.path.exists(outdir): 

32 os.makedirs(outdir) 

33 

34 # Location for our interim CSV file of articles 

35 tmpdir = self.get_param(params, "tmpdir", 'tmp_article_duplicate_report') 

36 if not os.path.exists(tmpdir): 

37 os.makedirs(tmpdir) 

38 

39 tmp_csvname = self.get_param(params, "article_csv", False) 

40 tmp_csvpath, total = self._make_csv_dump(tmpdir, tmp_csvname) 

41 

42 # Initialise our reports 

43 global_reportfile = 'duplicate_articles_global_' + dates.today() + '.csv' 

44 global_reportpath = os.path.join(outdir, global_reportfile) 

45 f = open(global_reportpath, "w", encoding="utf-8") 

46 global_report = csv.writer(f) 

47 header = ["article_id", "article_created", "article_doi", "article_fulltext", "article_owner", "article_issns", "article_in_doaj", "n_matches", "match_type", "match_id", "match_created", "match_doi", "match_fulltext", "match_owner", "match_issns", "match_in_doaj", "owners_match", "titles_match", "article_title", "match_title"] 

48 global_report.writerow(header) 

49 

50 noids_reportfile = 'noids_' + dates.today() + '.csv' 

51 noids_reportpath = os.path.join(outdir, noids_reportfile) 

52 g = open(noids_reportpath, "w", encoding="utf-8") 

53 noids_report = csv.writer(g) 

54 header = ["article_id", "article_created", "article_owner", "article_issns", "article_in_doaj"] 

55 noids_report.writerow(header) 

56 

57 # Record the sets of duplicated articles 

58 global_matches = [] 

59 

60 a_count = 0 

61 

62 articleService = DOAJ.articleService() 

63 

64 # Read back in the article csv file we created earlier 

65 with open(tmp_csvpath, 'r', encoding='utf-8') as t: 

66 article_reader = csv.reader(t) 

67 

68 start = dates.now() 

69 estimated_finish = "" 

70 for a in article_reader: 

71 if a_count > 1 and a_count % 100 == 0: 

72 n = dates.now() 

73 diff = (n - start).total_seconds() 

74 expected_total = ((diff / a_count) * total) 

75 estimated_finish = dates.format(dates.seconds_after(start, expected_total)) 

76 a_count += 1 

77 

78 article = models.Article(_source={'id': a[0], 'created_date': a[1], 'bibjson': {'identifier': json.loads(a[2]), 'link': json.loads(a[3]), 'title': a[4]}, 'admin': {'in_doaj': json.loads(a[5])}}) 

79 

80 # Get the global duplicates 

81 try: 

82 global_duplicates = articleService.discover_duplicates(article, results_per_match_type=10000, include_article = False) 

83 except exceptions.DuplicateArticleException: 

84 # this means the article did not have any ids that could be used for deduplication 

85 owner = self._lookup_owner(article) 

86 noids_report.writerow([article.id, article.created_date, owner, ','.join(article.bibjson().issns()), article.is_in_doaj()]) 

87 continue 

88 

89 dupcount = 0 

90 if global_duplicates: 

91 

92 # Look up an article's owner 

93 owner = self._lookup_owner(article) 

94 

95 # Deduplicate the DOI and fulltext duplicate lists 

96 s = set([article.id] + [d.id for d in global_duplicates.get('doi', []) + global_duplicates.get('fulltext', [])]) 

97 # remove article's own id from global_duplicates 

98 dupcount = len(s)-1 

99 if s not in global_matches: 

100 self._write_rows_from_duplicates(article, owner, global_duplicates, global_report) 

101 global_matches.append(s) 

102 

103 app.logger.debug('{0}/{1} {2} {3} {4} {5}'.format(a_count, total, article.id, dupcount, len(global_matches), estimated_finish)) 

104 

105 job.add_audit_message('{0} articles processed for duplicates. {1} global duplicate sets found.'.format(a_count, len(global_matches))) 

106 f.close() 

107 g.close() 

108 

109 # Delete the transient temporary files. 

110 shutil.rmtree(tmpdir) 

111 

112 # Email the reports if that parameter has been set. 

113 send_email = self.get_param(params, "email", False) 

114 if send_email: 

115 archive_name = "article_duplicates_" + dates.today() 

116 email_archive(outdir, archive_name) 

117 job.add_audit_message("email alert sent") 

118 else: 

119 job.add_audit_message("no email alert sent") 

120 

121 @classmethod 

122 def _make_csv_dump(self, tmpdir, filename): 

123 # Connection to the ES index 

124 conn = es_connection 

125 

126 if not filename: 

127 filename = 'tmp_articles_' + dates.today() + '.csv' 

128 filename = os.path.join(tmpdir, filename) 

129 

130 with open(filename, 'w', encoding='utf-8') as t: 

131 count = self._create_article_csv(conn, t) 

132 

133 return filename, count 

134 

135 @classmethod 

136 def _lookup_owner(self, article): 

137 # Look up an article's owner 

138 journal = article.get_journal() 

139 owner = None 

140 if journal: 

141 owner = journal.owner 

142 for issn in journal.bibjson().issns(): 

143 if issn not in self.owner_cache: 

144 self.owner_cache[issn] = owner 

145 return owner 

146 

147 @staticmethod 

148 def _create_article_csv(connection, file_object): 

149 """ Create a CSV file with the minimum information we require to find and report duplicates. """ 

150 

151 csv_writer = csv.writer(file_object, quoting=csv.QUOTE_ALL) 

152 

153 # Scroll through all articles, newest to oldest 

154 scroll_query = { 

155 "_source": [ 

156 "id", 

157 "created_date", 

158 "bibjson.identifier", 

159 "bibjson.link", 

160 "bibjson.title", 

161 "admin.in_doaj" 

162 ], 

163 "query": { 

164 "match_all": {} 

165 }, 

166 "sort": [ 

167 {"last_updated": {"order": "desc"}} 

168 ] 

169 } 

170 

171 count = 0 

172 for a in models.Article.iterate(q=scroll_query, page_size=1000, keepalive='1m'): 

173 row = [ 

174 a['id'], 

175 a['created_date'], 

176 json.dumps(a['bibjson']['identifier']), 

177 json.dumps(a['bibjson'].get('link', [])), 

178 a['bibjson'].get('title', ''), 

179 json.dumps(a.get('admin', {}).get('in_doaj', '')) 

180 ] 

181 csv_writer.writerow(row) 

182 count += 1 

183 

184 return count 

185 

186 def _summarise_article(self, article, owner=None): 

187 a_doi = article.bibjson().get_identifiers('doi') 

188 a_fulltext = article.bibjson().get_urls('fulltext') 

189 

190 o = owner 

191 if o is None: 

192 for i in article.bibjson().issns(): 

193 o = self.owner_cache.get(i, None) 

194 if o is not None: 

195 break 

196 

197 return { 

198 'created': article.created_date, 

199 'doi': a_doi[0] if len(a_doi) > 0 else '', 

200 'fulltext': a_fulltext[0] if len(a_fulltext) > 0 else '', 

201 'owner': o if o is not None else '', 

202 'issns': ','.join(article.bibjson().issns()), 

203 'title': article.bibjson().title, 

204 'in_doaj': article.is_in_doaj() 

205 } 

206 

207 def _write_rows_from_duplicates(self, article, owner, duplicates, report): 

208 dups = {} 

209 for d in duplicates.get('doi', []): 

210 dups[d.id] = self._summarise_article(d, owner=owner) 

211 dups[d.id]['match_type'] = 'doi' 

212 

213 for d in duplicates.get('fulltext', []): 

214 if d.id in dups: 

215 dups[d.id]['match_type'] = 'doi+fulltext' 

216 else: 

217 dups[d.id] = self._summarise_article(d, owner=owner) 

218 dups[d.id]['match_type'] = 'fulltext' 

219 

220 # write rows to report 

221 a_summary = self._summarise_article(article, owner) 

222 for k, v in dups.items(): 

223 row = [article.id, 

224 a_summary['created'], 

225 a_summary['doi'], 

226 a_summary['fulltext'], 

227 a_summary['owner'], 

228 a_summary['issns'], 

229 a_summary['in_doaj'], 

230 str(len(dups)), 

231 v['match_type'], 

232 k, 

233 v['created'], 

234 v['doi'], 

235 v['fulltext'], 

236 v['owner'], 

237 v['issns'], 

238 v['in_doaj'], 

239 str(a_summary['owner'] == v['owner']), 

240 str(a_summary['title'] == v['title']), 

241 a_summary['title'] if a_summary['title'] != v['title'] else '', 

242 v['title'] if a_summary['title'] != v['title'] else ''] 

243 report.writerow(row) 

244 

245 def cleanup(self): 

246 """ 

247 Cleanup after a successful OR failed run of the task 

248 :return: 

249 """ 

250 pass 

251 

252 @classmethod 

253 def prepare(cls, username, **kwargs): 

254 """ 

255 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob, 

256 or fail with a suitable exception 

257 

258 :param kwargs: arbitrary keyword arguments pertaining to this task type 

259 :return: a BackgroundJob instance representing this task 

260 """ 

261 

262 # First prepare a job record 

263 job = models.BackgroundJob() 

264 job.user = username 

265 job.action = cls.__action__ 

266 

267 params = {} 

268 cls.set_param(params, "outdir", kwargs.get("outdir", "article_duplicates_" + dates.today())) 

269 cls.set_param(params, "email", kwargs.get("email", False)) 

270 cls.set_param(params, "tmpdir", kwargs.get("tmpdir", "tmp_article_duplicates_" + dates.today())) 

271 cls.set_param(params, "article_csv", kwargs.get("article_csv", False)) 

272 job.params = params 

273 job.queue_id = huey_helper.queue_id 

274 

275 return job 

276 

277 @classmethod 

278 def submit(cls, background_job): 

279 """ 

280 Submit the specified BackgroundJob to the background queue 

281 

282 :param background_job: the BackgroundJob instance 

283 :return: 

284 """ 

285 background_job.save() 

286 article_duplicate_report.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) 

287 

288 

289huey_helper = ArticleDuplicateReportBackgroundTask.create_huey_helper(queue) 

290 

291''' 

292@long_running.periodic_task(schedule("article_duplicate_report")) 

293def scheduled_article_cleanup_sync(): 

294 user = app.config.get("SYSTEM_USERNAME") 

295 job = ArticleDuplicateReportBackgroundTask.prepare(user) 

296 ArticleDuplicateReportBackgroundTask.submit(job) 

297''' 

298 

299 

300@huey_helper.register_execute(is_load_config=False) 

301def article_duplicate_report(job_id): 

302 job = models.BackgroundJob.pull(job_id) 

303 task = ArticleDuplicateReportBackgroundTask(job) 

304 BackgroundApi.execute(task)