Coverage for portality/tasks/article_duplicate_report.py: 85%

157 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-22 15:59 +0100

1"""Task to generate a report on duplicated articles in the index""" 

2 

3from portality.tasks.redis_huey import long_running 

4from portality.app_email import email_archive 

5 

6from portality.background import BackgroundTask, BackgroundApi 

7 

8import os 

9import shutil 

10import json 

11import csv 

12from datetime import datetime 

13from portality import models 

14from portality.lib import dates 

15from portality.core import app, es_connection 

16from portality.bll.doaj import DOAJ 

17from portality.bll import exceptions 

18 

19 

20class ArticleDuplicateReportBackgroundTask(BackgroundTask): 

21 __action__ = "article_duplicate_report" 

22 

23 # Keep a cache of ISSNs to owners 

24 owner_cache = {} 

25 

26 def run(self): 

27 job = self.background_job 

28 params = job.params 

29 

30 # Set up the files we need to run this task - a dir to place the report, and a place to write the article csv 

31 outdir = self.get_param(params, "outdir", "article_duplicates_" + dates.today()) 

32 job.add_audit_message("Saving reports to " + outdir) 

33 if not os.path.exists(outdir): 

34 os.makedirs(outdir) 

35 

36 # Location for our interim CSV file of articles 

37 tmpdir = self.get_param(params, "tmpdir", 'tmp_article_duplicate_report') 

38 if not os.path.exists(tmpdir): 

39 os.makedirs(tmpdir) 

40 

41 tmp_csvname = self.get_param(params, "article_csv", False) 

42 tmp_csvpath, total = self._make_csv_dump(tmpdir, tmp_csvname) 

43 

44 # Initialise our reports 

45 global_reportfile = 'duplicate_articles_global_' + dates.today() + '.csv' 

46 global_reportpath = os.path.join(outdir, global_reportfile) 

47 f = open(global_reportpath, "w", encoding="utf-8") 

48 global_report = csv.writer(f) 

49 header = ["article_id", "article_created", "article_doi", "article_fulltext", "article_owner", "article_issns", "article_in_doaj", "n_matches", "match_type", "match_id", "match_created", "match_doi", "match_fulltext", "match_owner", "match_issns", "match_in_doaj", "owners_match", "titles_match", "article_title", "match_title"] 

50 global_report.writerow(header) 

51 

52 noids_reportfile = 'noids_' + dates.today() + '.csv' 

53 noids_reportpath = os.path.join(outdir, noids_reportfile) 

54 g = open(noids_reportpath, "w", encoding="utf-8") 

55 noids_report = csv.writer(g) 

56 header = ["article_id", "article_created", "article_owner", "article_issns", "article_in_doaj"] 

57 noids_report.writerow(header) 

58 

59 # Record the sets of duplicated articles 

60 global_matches = [] 

61 

62 a_count = 0 

63 

64 articleService = DOAJ.articleService() 

65 

66 # Read back in the article csv file we created earlier 

67 with open(tmp_csvpath, 'r', encoding='utf-8') as t: 

68 article_reader = csv.reader(t) 

69 

70 start = datetime.now() 

71 estimated_finish = "" 

72 for a in article_reader: 

73 if a_count > 1 and a_count % 100 == 0: 

74 n = datetime.now() 

75 diff = (n - start).total_seconds() 

76 expected_total = ((diff / a_count) * total) 

77 estimated_finish = dates.format(dates.after(start, expected_total)) 

78 a_count += 1 

79 

80 article = models.Article(_source={'id': a[0], 'created_date': a[1], 'bibjson': {'identifier': json.loads(a[2]), 'link': json.loads(a[3]), 'title': a[4]}, 'admin': {'in_doaj': json.loads(a[5])}}) 

81 

82 # Get the global duplicates 

83 try: 

84 global_duplicates = articleService.discover_duplicates(article, results_per_match_type=10000, include_article = False) 

85 except exceptions.DuplicateArticleException: 

86 # this means the article did not have any ids that could be used for deduplication 

87 owner = self._lookup_owner(article) 

88 noids_report.writerow([article.id, article.created_date, owner, ','.join(article.bibjson().issns()), article.is_in_doaj()]) 

89 continue 

90 

91 dupcount = 0 

92 if global_duplicates: 

93 

94 # Look up an article's owner 

95 owner = self._lookup_owner(article) 

96 

97 # Deduplicate the DOI and fulltext duplicate lists 

98 s = set([article.id] + [d.id for d in global_duplicates.get('doi', []) + global_duplicates.get('fulltext', [])]) 

99 # remove article's own id from global_duplicates 

100 dupcount = len(s)-1 

101 if s not in global_matches: 

102 self._write_rows_from_duplicates(article, owner, global_duplicates, global_report) 

103 global_matches.append(s) 

104 

105 app.logger.debug('{0}/{1} {2} {3} {4} {5}'.format(a_count, total, article.id, dupcount, len(global_matches), estimated_finish)) 

106 

107 job.add_audit_message('{0} articles processed for duplicates. {1} global duplicate sets found.'.format(a_count, len(global_matches))) 

108 f.close() 

109 g.close() 

110 

111 # Delete the transient temporary files. 

112 shutil.rmtree(tmpdir) 

113 

114 # Email the reports if that parameter has been set. 

115 send_email = self.get_param(params, "email", False) 

116 if send_email: 

117 archive_name = "article_duplicates_" + dates.today() 

118 email_archive(outdir, archive_name) 

119 job.add_audit_message("email alert sent") 

120 else: 

121 job.add_audit_message("no email alert sent") 

122 

123 @classmethod 

124 def _make_csv_dump(self, tmpdir, filename): 

125 # Connection to the ES index 

126 conn = es_connection 

127 

128 if not filename: 

129 filename = 'tmp_articles_' + dates.today() + '.csv' 

130 filename = os.path.join(tmpdir, filename) 

131 

132 with open(filename, 'w', encoding='utf-8') as t: 

133 count = self._create_article_csv(conn, t) 

134 

135 return filename, count 

136 

137 @classmethod 

138 def _lookup_owner(self, article): 

139 # Look up an article's owner 

140 journal = article.get_journal() 

141 owner = None 

142 if journal: 

143 owner = journal.owner 

144 for issn in journal.bibjson().issns(): 

145 if issn not in self.owner_cache: 

146 self.owner_cache[issn] = owner 

147 return owner 

148 

149 @staticmethod 

150 def _create_article_csv(connection, file_object): 

151 """ Create a CSV file with the minimum information we require to find and report duplicates. """ 

152 

153 csv_writer = csv.writer(file_object, quoting=csv.QUOTE_ALL) 

154 

155 # Scroll through all articles, newest to oldest 

156 scroll_query = { 

157 "_source": [ 

158 "id", 

159 "created_date", 

160 "bibjson.identifier", 

161 "bibjson.link", 

162 "bibjson.title", 

163 "admin.in_doaj" 

164 ], 

165 "query": { 

166 "match_all": {} 

167 }, 

168 "sort": [ 

169 {"last_updated": {"order": "desc"}} 

170 ] 

171 } 

172 

173 count = 0 

174 for a in models.Article.iterate(q=scroll_query, page_size=1000, keepalive='1m'): 

175 row = [ 

176 a['id'], 

177 a['created_date'], 

178 json.dumps(a['bibjson']['identifier']), 

179 json.dumps(a['bibjson'].get('link', [])), 

180 a['bibjson'].get('title', ''), 

181 json.dumps(a.get('admin', {}).get('in_doaj', '')) 

182 ] 

183 csv_writer.writerow(row) 

184 count += 1 

185 

186 return count 

187 

188 def _summarise_article(self, article, owner=None): 

189 a_doi = article.bibjson().get_identifiers('doi') 

190 a_fulltext = article.bibjson().get_urls('fulltext') 

191 

192 o = owner 

193 if o is None: 

194 for i in article.bibjson().issns(): 

195 o = self.owner_cache.get(i, None) 

196 if o is not None: 

197 break 

198 

199 return { 

200 'created': article.created_date, 

201 'doi': a_doi[0] if len(a_doi) > 0 else '', 

202 'fulltext': a_fulltext[0] if len(a_fulltext) > 0 else '', 

203 'owner': o if o is not None else '', 

204 'issns': ','.join(article.bibjson().issns()), 

205 'title': article.bibjson().title, 

206 'in_doaj': article.is_in_doaj() 

207 } 

208 

209 def _write_rows_from_duplicates(self, article, owner, duplicates, report): 

210 dups = {} 

211 for d in duplicates.get('doi', []): 

212 dups[d.id] = self._summarise_article(d, owner=owner) 

213 dups[d.id]['match_type'] = 'doi' 

214 

215 for d in duplicates.get('fulltext', []): 

216 if d.id in dups: 

217 dups[d.id]['match_type'] = 'doi+fulltext' 

218 else: 

219 dups[d.id] = self._summarise_article(d, owner=owner) 

220 dups[d.id]['match_type'] = 'fulltext' 

221 

222 # write rows to report 

223 a_summary = self._summarise_article(article, owner) 

224 for k, v in dups.items(): 

225 row = [article.id, 

226 a_summary['created'], 

227 a_summary['doi'], 

228 a_summary['fulltext'], 

229 a_summary['owner'], 

230 a_summary['issns'], 

231 a_summary['in_doaj'], 

232 str(len(dups)), 

233 v['match_type'], 

234 k, 

235 v['created'], 

236 v['doi'], 

237 v['fulltext'], 

238 v['owner'], 

239 v['issns'], 

240 v['in_doaj'], 

241 str(a_summary['owner'] == v['owner']), 

242 str(a_summary['title'] == v['title']), 

243 a_summary['title'] if a_summary['title'] != v['title'] else '', 

244 v['title'] if a_summary['title'] != v['title'] else ''] 

245 report.writerow(row) 

246 

247 def cleanup(self): 

248 """ 

249 Cleanup after a successful OR failed run of the task 

250 :return: 

251 """ 

252 pass 

253 

254 @classmethod 

255 def prepare(cls, username, **kwargs): 

256 """ 

257 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob, 

258 or fail with a suitable exception 

259 

260 :param kwargs: arbitrary keyword arguments pertaining to this task type 

261 :return: a BackgroundJob instance representing this task 

262 """ 

263 

264 # First prepare a job record 

265 job = models.BackgroundJob() 

266 job.user = username 

267 job.action = cls.__action__ 

268 

269 params = {} 

270 cls.set_param(params, "outdir", kwargs.get("outdir", "article_duplicates_" + dates.today())) 

271 cls.set_param(params, "email", kwargs.get("email", False)) 

272 cls.set_param(params, "tmpdir", kwargs.get("tmpdir", "tmp_article_duplicates_" + dates.today())) 

273 cls.set_param(params, "article_csv", kwargs.get("article_csv", False)) 

274 job.params = params 

275 

276 return job 

277 

278 @classmethod 

279 def submit(cls, background_job): 

280 """ 

281 Submit the specified BackgroundJob to the background queue 

282 

283 :param background_job: the BackgroundJob instance 

284 :return: 

285 """ 

286 background_job.save() 

287 article_duplicate_report.schedule(args=(background_job.id,), delay=10) 

288 

289''' 

290@long_running.periodic_task(schedule("article_duplicate_report")) 

291def scheduled_article_cleanup_sync(): 

292 user = app.config.get("SYSTEM_USERNAME") 

293 job = ArticleDuplicateReportBackgroundTask.prepare(user) 

294 ArticleDuplicateReportBackgroundTask.submit(job) 

295''' 

296 

297@long_running.task() 

298def article_duplicate_report(job_id): 

299 job = models.BackgroundJob.pull(job_id) 

300 task = ArticleDuplicateReportBackgroundTask(job) 

301 BackgroundApi.execute(task)