Coverage for portality/tasks/article_duplicate_report.py: 85%
157 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-22 15:59 +0100
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-22 15:59 +0100
1"""Task to generate a report on duplicated articles in the index"""
3from portality.tasks.redis_huey import long_running
4from portality.app_email import email_archive
6from portality.background import BackgroundTask, BackgroundApi
8import os
9import shutil
10import json
11import csv
12from datetime import datetime
13from portality import models
14from portality.lib import dates
15from portality.core import app, es_connection
16from portality.bll.doaj import DOAJ
17from portality.bll import exceptions
20class ArticleDuplicateReportBackgroundTask(BackgroundTask):
21 __action__ = "article_duplicate_report"
23 # Keep a cache of ISSNs to owners
24 owner_cache = {}
26 def run(self):
27 job = self.background_job
28 params = job.params
30 # Set up the files we need to run this task - a dir to place the report, and a place to write the article csv
31 outdir = self.get_param(params, "outdir", "article_duplicates_" + dates.today())
32 job.add_audit_message("Saving reports to " + outdir)
33 if not os.path.exists(outdir):
34 os.makedirs(outdir)
36 # Location for our interim CSV file of articles
37 tmpdir = self.get_param(params, "tmpdir", 'tmp_article_duplicate_report')
38 if not os.path.exists(tmpdir):
39 os.makedirs(tmpdir)
41 tmp_csvname = self.get_param(params, "article_csv", False)
42 tmp_csvpath, total = self._make_csv_dump(tmpdir, tmp_csvname)
44 # Initialise our reports
45 global_reportfile = 'duplicate_articles_global_' + dates.today() + '.csv'
46 global_reportpath = os.path.join(outdir, global_reportfile)
47 f = open(global_reportpath, "w", encoding="utf-8")
48 global_report = csv.writer(f)
49 header = ["article_id", "article_created", "article_doi", "article_fulltext", "article_owner", "article_issns", "article_in_doaj", "n_matches", "match_type", "match_id", "match_created", "match_doi", "match_fulltext", "match_owner", "match_issns", "match_in_doaj", "owners_match", "titles_match", "article_title", "match_title"]
50 global_report.writerow(header)
52 noids_reportfile = 'noids_' + dates.today() + '.csv'
53 noids_reportpath = os.path.join(outdir, noids_reportfile)
54 g = open(noids_reportpath, "w", encoding="utf-8")
55 noids_report = csv.writer(g)
56 header = ["article_id", "article_created", "article_owner", "article_issns", "article_in_doaj"]
57 noids_report.writerow(header)
59 # Record the sets of duplicated articles
60 global_matches = []
62 a_count = 0
64 articleService = DOAJ.articleService()
66 # Read back in the article csv file we created earlier
67 with open(tmp_csvpath, 'r', encoding='utf-8') as t:
68 article_reader = csv.reader(t)
70 start = datetime.now()
71 estimated_finish = ""
72 for a in article_reader:
73 if a_count > 1 and a_count % 100 == 0:
74 n = datetime.now()
75 diff = (n - start).total_seconds()
76 expected_total = ((diff / a_count) * total)
77 estimated_finish = dates.format(dates.after(start, expected_total))
78 a_count += 1
80 article = models.Article(_source={'id': a[0], 'created_date': a[1], 'bibjson': {'identifier': json.loads(a[2]), 'link': json.loads(a[3]), 'title': a[4]}, 'admin': {'in_doaj': json.loads(a[5])}})
82 # Get the global duplicates
83 try:
84 global_duplicates = articleService.discover_duplicates(article, results_per_match_type=10000, include_article = False)
85 except exceptions.DuplicateArticleException:
86 # this means the article did not have any ids that could be used for deduplication
87 owner = self._lookup_owner(article)
88 noids_report.writerow([article.id, article.created_date, owner, ','.join(article.bibjson().issns()), article.is_in_doaj()])
89 continue
91 dupcount = 0
92 if global_duplicates:
94 # Look up an article's owner
95 owner = self._lookup_owner(article)
97 # Deduplicate the DOI and fulltext duplicate lists
98 s = set([article.id] + [d.id for d in global_duplicates.get('doi', []) + global_duplicates.get('fulltext', [])])
99 # remove article's own id from global_duplicates
100 dupcount = len(s)-1
101 if s not in global_matches:
102 self._write_rows_from_duplicates(article, owner, global_duplicates, global_report)
103 global_matches.append(s)
105 app.logger.debug('{0}/{1} {2} {3} {4} {5}'.format(a_count, total, article.id, dupcount, len(global_matches), estimated_finish))
107 job.add_audit_message('{0} articles processed for duplicates. {1} global duplicate sets found.'.format(a_count, len(global_matches)))
108 f.close()
109 g.close()
111 # Delete the transient temporary files.
112 shutil.rmtree(tmpdir)
114 # Email the reports if that parameter has been set.
115 send_email = self.get_param(params, "email", False)
116 if send_email:
117 archive_name = "article_duplicates_" + dates.today()
118 email_archive(outdir, archive_name)
119 job.add_audit_message("email alert sent")
120 else:
121 job.add_audit_message("no email alert sent")
123 @classmethod
124 def _make_csv_dump(self, tmpdir, filename):
125 # Connection to the ES index
126 conn = es_connection
128 if not filename:
129 filename = 'tmp_articles_' + dates.today() + '.csv'
130 filename = os.path.join(tmpdir, filename)
132 with open(filename, 'w', encoding='utf-8') as t:
133 count = self._create_article_csv(conn, t)
135 return filename, count
137 @classmethod
138 def _lookup_owner(self, article):
139 # Look up an article's owner
140 journal = article.get_journal()
141 owner = None
142 if journal:
143 owner = journal.owner
144 for issn in journal.bibjson().issns():
145 if issn not in self.owner_cache:
146 self.owner_cache[issn] = owner
147 return owner
149 @staticmethod
150 def _create_article_csv(connection, file_object):
151 """ Create a CSV file with the minimum information we require to find and report duplicates. """
153 csv_writer = csv.writer(file_object, quoting=csv.QUOTE_ALL)
155 # Scroll through all articles, newest to oldest
156 scroll_query = {
157 "_source": [
158 "id",
159 "created_date",
160 "bibjson.identifier",
161 "bibjson.link",
162 "bibjson.title",
163 "admin.in_doaj"
164 ],
165 "query": {
166 "match_all": {}
167 },
168 "sort": [
169 {"last_updated": {"order": "desc"}}
170 ]
171 }
173 count = 0
174 for a in models.Article.iterate(q=scroll_query, page_size=1000, keepalive='1m'):
175 row = [
176 a['id'],
177 a['created_date'],
178 json.dumps(a['bibjson']['identifier']),
179 json.dumps(a['bibjson'].get('link', [])),
180 a['bibjson'].get('title', ''),
181 json.dumps(a.get('admin', {}).get('in_doaj', ''))
182 ]
183 csv_writer.writerow(row)
184 count += 1
186 return count
188 def _summarise_article(self, article, owner=None):
189 a_doi = article.bibjson().get_identifiers('doi')
190 a_fulltext = article.bibjson().get_urls('fulltext')
192 o = owner
193 if o is None:
194 for i in article.bibjson().issns():
195 o = self.owner_cache.get(i, None)
196 if o is not None:
197 break
199 return {
200 'created': article.created_date,
201 'doi': a_doi[0] if len(a_doi) > 0 else '',
202 'fulltext': a_fulltext[0] if len(a_fulltext) > 0 else '',
203 'owner': o if o is not None else '',
204 'issns': ','.join(article.bibjson().issns()),
205 'title': article.bibjson().title,
206 'in_doaj': article.is_in_doaj()
207 }
209 def _write_rows_from_duplicates(self, article, owner, duplicates, report):
210 dups = {}
211 for d in duplicates.get('doi', []):
212 dups[d.id] = self._summarise_article(d, owner=owner)
213 dups[d.id]['match_type'] = 'doi'
215 for d in duplicates.get('fulltext', []):
216 if d.id in dups:
217 dups[d.id]['match_type'] = 'doi+fulltext'
218 else:
219 dups[d.id] = self._summarise_article(d, owner=owner)
220 dups[d.id]['match_type'] = 'fulltext'
222 # write rows to report
223 a_summary = self._summarise_article(article, owner)
224 for k, v in dups.items():
225 row = [article.id,
226 a_summary['created'],
227 a_summary['doi'],
228 a_summary['fulltext'],
229 a_summary['owner'],
230 a_summary['issns'],
231 a_summary['in_doaj'],
232 str(len(dups)),
233 v['match_type'],
234 k,
235 v['created'],
236 v['doi'],
237 v['fulltext'],
238 v['owner'],
239 v['issns'],
240 v['in_doaj'],
241 str(a_summary['owner'] == v['owner']),
242 str(a_summary['title'] == v['title']),
243 a_summary['title'] if a_summary['title'] != v['title'] else '',
244 v['title'] if a_summary['title'] != v['title'] else '']
245 report.writerow(row)
247 def cleanup(self):
248 """
249 Cleanup after a successful OR failed run of the task
250 :return:
251 """
252 pass
254 @classmethod
255 def prepare(cls, username, **kwargs):
256 """
257 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob,
258 or fail with a suitable exception
260 :param kwargs: arbitrary keyword arguments pertaining to this task type
261 :return: a BackgroundJob instance representing this task
262 """
264 # First prepare a job record
265 job = models.BackgroundJob()
266 job.user = username
267 job.action = cls.__action__
269 params = {}
270 cls.set_param(params, "outdir", kwargs.get("outdir", "article_duplicates_" + dates.today()))
271 cls.set_param(params, "email", kwargs.get("email", False))
272 cls.set_param(params, "tmpdir", kwargs.get("tmpdir", "tmp_article_duplicates_" + dates.today()))
273 cls.set_param(params, "article_csv", kwargs.get("article_csv", False))
274 job.params = params
276 return job
278 @classmethod
279 def submit(cls, background_job):
280 """
281 Submit the specified BackgroundJob to the background queue
283 :param background_job: the BackgroundJob instance
284 :return:
285 """
286 background_job.save()
287 article_duplicate_report.schedule(args=(background_job.id,), delay=10)
289'''
290@long_running.periodic_task(schedule("article_duplicate_report"))
291def scheduled_article_cleanup_sync():
292 user = app.config.get("SYSTEM_USERNAME")
293 job = ArticleDuplicateReportBackgroundTask.prepare(user)
294 ArticleDuplicateReportBackgroundTask.submit(job)
295'''
297@long_running.task()
298def article_duplicate_report(job_id):
299 job = models.BackgroundJob.pull(job_id)
300 task = ArticleDuplicateReportBackgroundTask(job)
301 BackgroundApi.execute(task)