Coverage for portality / tasks / article_duplicate_report.py: 86%
159 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
1"""Task to generate a report on duplicated articles in the index"""
2import csv
3import json
4import os
5import shutil
6from datetime import datetime
8from portality import models
9from portality.app_email import email_archive
10from portality.background import BackgroundTask, BackgroundApi
11from portality.bll import exceptions
12from portality.bll.doaj import DOAJ
13from portality.core import app, es_connection
14from portality.lib import dates
15from portality.tasks.redis_huey import scheduled_long_queue as queue
18class ArticleDuplicateReportBackgroundTask(BackgroundTask):
19 __action__ = "article_duplicate_report"
21 # Keep a cache of ISSNs to owners
22 owner_cache = {}
24 def run(self):
25 job = self.background_job
26 params = job.params
28 # Set up the files we need to run this task - a dir to place the report, and a place to write the article csv
29 outdir = self.get_param(params, "outdir", "article_duplicates_" + dates.today())
30 job.add_audit_message("Saving reports to " + outdir)
31 if not os.path.exists(outdir):
32 os.makedirs(outdir)
34 # Location for our interim CSV file of articles
35 tmpdir = self.get_param(params, "tmpdir", 'tmp_article_duplicate_report')
36 if not os.path.exists(tmpdir):
37 os.makedirs(tmpdir)
39 tmp_csvname = self.get_param(params, "article_csv", False)
40 tmp_csvpath, total = self._make_csv_dump(tmpdir, tmp_csvname)
42 # Initialise our reports
43 global_reportfile = 'duplicate_articles_global_' + dates.today() + '.csv'
44 global_reportpath = os.path.join(outdir, global_reportfile)
45 f = open(global_reportpath, "w", encoding="utf-8")
46 global_report = csv.writer(f)
47 header = ["article_id", "article_created", "article_doi", "article_fulltext", "article_owner", "article_issns", "article_in_doaj", "n_matches", "match_type", "match_id", "match_created", "match_doi", "match_fulltext", "match_owner", "match_issns", "match_in_doaj", "owners_match", "titles_match", "article_title", "match_title"]
48 global_report.writerow(header)
50 noids_reportfile = 'noids_' + dates.today() + '.csv'
51 noids_reportpath = os.path.join(outdir, noids_reportfile)
52 g = open(noids_reportpath, "w", encoding="utf-8")
53 noids_report = csv.writer(g)
54 header = ["article_id", "article_created", "article_owner", "article_issns", "article_in_doaj"]
55 noids_report.writerow(header)
57 # Record the sets of duplicated articles
58 global_matches = []
60 a_count = 0
62 articleService = DOAJ.articleService()
64 # Read back in the article csv file we created earlier
65 with open(tmp_csvpath, 'r', encoding='utf-8') as t:
66 article_reader = csv.reader(t)
68 start = dates.now()
69 estimated_finish = ""
70 for a in article_reader:
71 if a_count > 1 and a_count % 100 == 0:
72 n = dates.now()
73 diff = (n - start).total_seconds()
74 expected_total = ((diff / a_count) * total)
75 estimated_finish = dates.format(dates.seconds_after(start, expected_total))
76 a_count += 1
78 article = models.Article(_source={'id': a[0], 'created_date': a[1], 'bibjson': {'identifier': json.loads(a[2]), 'link': json.loads(a[3]), 'title': a[4]}, 'admin': {'in_doaj': json.loads(a[5])}})
80 # Get the global duplicates
81 try:
82 global_duplicates = articleService.discover_duplicates(article, results_per_match_type=10000, include_article = False)
83 except exceptions.DuplicateArticleException:
84 # this means the article did not have any ids that could be used for deduplication
85 owner = self._lookup_owner(article)
86 noids_report.writerow([article.id, article.created_date, owner, ','.join(article.bibjson().issns()), article.is_in_doaj()])
87 continue
89 dupcount = 0
90 if global_duplicates:
92 # Look up an article's owner
93 owner = self._lookup_owner(article)
95 # Deduplicate the DOI and fulltext duplicate lists
96 s = set([article.id] + [d.id for d in global_duplicates.get('doi', []) + global_duplicates.get('fulltext', [])])
97 # remove article's own id from global_duplicates
98 dupcount = len(s)-1
99 if s not in global_matches:
100 self._write_rows_from_duplicates(article, owner, global_duplicates, global_report)
101 global_matches.append(s)
103 app.logger.debug('{0}/{1} {2} {3} {4} {5}'.format(a_count, total, article.id, dupcount, len(global_matches), estimated_finish))
105 job.add_audit_message('{0} articles processed for duplicates. {1} global duplicate sets found.'.format(a_count, len(global_matches)))
106 f.close()
107 g.close()
109 # Delete the transient temporary files.
110 shutil.rmtree(tmpdir)
112 # Email the reports if that parameter has been set.
113 send_email = self.get_param(params, "email", False)
114 if send_email:
115 archive_name = "article_duplicates_" + dates.today()
116 email_archive(outdir, archive_name)
117 job.add_audit_message("email alert sent")
118 else:
119 job.add_audit_message("no email alert sent")
121 @classmethod
122 def _make_csv_dump(self, tmpdir, filename):
123 # Connection to the ES index
124 conn = es_connection
126 if not filename:
127 filename = 'tmp_articles_' + dates.today() + '.csv'
128 filename = os.path.join(tmpdir, filename)
130 with open(filename, 'w', encoding='utf-8') as t:
131 count = self._create_article_csv(conn, t)
133 return filename, count
135 @classmethod
136 def _lookup_owner(self, article):
137 # Look up an article's owner
138 journal = article.get_journal()
139 owner = None
140 if journal:
141 owner = journal.owner
142 for issn in journal.bibjson().issns():
143 if issn not in self.owner_cache:
144 self.owner_cache[issn] = owner
145 return owner
147 @staticmethod
148 def _create_article_csv(connection, file_object):
149 """ Create a CSV file with the minimum information we require to find and report duplicates. """
151 csv_writer = csv.writer(file_object, quoting=csv.QUOTE_ALL)
153 # Scroll through all articles, newest to oldest
154 scroll_query = {
155 "_source": [
156 "id",
157 "created_date",
158 "bibjson.identifier",
159 "bibjson.link",
160 "bibjson.title",
161 "admin.in_doaj"
162 ],
163 "query": {
164 "match_all": {}
165 },
166 "sort": [
167 {"last_updated": {"order": "desc"}}
168 ]
169 }
171 count = 0
172 for a in models.Article.iterate(q=scroll_query, page_size=1000, keepalive='1m'):
173 row = [
174 a['id'],
175 a['created_date'],
176 json.dumps(a['bibjson']['identifier']),
177 json.dumps(a['bibjson'].get('link', [])),
178 a['bibjson'].get('title', ''),
179 json.dumps(a.get('admin', {}).get('in_doaj', ''))
180 ]
181 csv_writer.writerow(row)
182 count += 1
184 return count
186 def _summarise_article(self, article, owner=None):
187 a_doi = article.bibjson().get_identifiers('doi')
188 a_fulltext = article.bibjson().get_urls('fulltext')
190 o = owner
191 if o is None:
192 for i in article.bibjson().issns():
193 o = self.owner_cache.get(i, None)
194 if o is not None:
195 break
197 return {
198 'created': article.created_date,
199 'doi': a_doi[0] if len(a_doi) > 0 else '',
200 'fulltext': a_fulltext[0] if len(a_fulltext) > 0 else '',
201 'owner': o if o is not None else '',
202 'issns': ','.join(article.bibjson().issns()),
203 'title': article.bibjson().title,
204 'in_doaj': article.is_in_doaj()
205 }
207 def _write_rows_from_duplicates(self, article, owner, duplicates, report):
208 dups = {}
209 for d in duplicates.get('doi', []):
210 dups[d.id] = self._summarise_article(d, owner=owner)
211 dups[d.id]['match_type'] = 'doi'
213 for d in duplicates.get('fulltext', []):
214 if d.id in dups:
215 dups[d.id]['match_type'] = 'doi+fulltext'
216 else:
217 dups[d.id] = self._summarise_article(d, owner=owner)
218 dups[d.id]['match_type'] = 'fulltext'
220 # write rows to report
221 a_summary = self._summarise_article(article, owner)
222 for k, v in dups.items():
223 row = [article.id,
224 a_summary['created'],
225 a_summary['doi'],
226 a_summary['fulltext'],
227 a_summary['owner'],
228 a_summary['issns'],
229 a_summary['in_doaj'],
230 str(len(dups)),
231 v['match_type'],
232 k,
233 v['created'],
234 v['doi'],
235 v['fulltext'],
236 v['owner'],
237 v['issns'],
238 v['in_doaj'],
239 str(a_summary['owner'] == v['owner']),
240 str(a_summary['title'] == v['title']),
241 a_summary['title'] if a_summary['title'] != v['title'] else '',
242 v['title'] if a_summary['title'] != v['title'] else '']
243 report.writerow(row)
245 def cleanup(self):
246 """
247 Cleanup after a successful OR failed run of the task
248 :return:
249 """
250 pass
252 @classmethod
253 def prepare(cls, username, **kwargs):
254 """
255 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob,
256 or fail with a suitable exception
258 :param kwargs: arbitrary keyword arguments pertaining to this task type
259 :return: a BackgroundJob instance representing this task
260 """
262 # First prepare a job record
263 job = models.BackgroundJob()
264 job.user = username
265 job.action = cls.__action__
267 params = {}
268 cls.set_param(params, "outdir", kwargs.get("outdir", "article_duplicates_" + dates.today()))
269 cls.set_param(params, "email", kwargs.get("email", False))
270 cls.set_param(params, "tmpdir", kwargs.get("tmpdir", "tmp_article_duplicates_" + dates.today()))
271 cls.set_param(params, "article_csv", kwargs.get("article_csv", False))
272 job.params = params
273 job.queue_id = huey_helper.queue_id
275 return job
277 @classmethod
278 def submit(cls, background_job):
279 """
280 Submit the specified BackgroundJob to the background queue
282 :param background_job: the BackgroundJob instance
283 :return:
284 """
285 background_job.save()
286 article_duplicate_report.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10))
289huey_helper = ArticleDuplicateReportBackgroundTask.create_huey_helper(queue)
291'''
292@long_running.periodic_task(schedule("article_duplicate_report"))
293def scheduled_article_cleanup_sync():
294 user = app.config.get("SYSTEM_USERNAME")
295 job = ArticleDuplicateReportBackgroundTask.prepare(user)
296 ArticleDuplicateReportBackgroundTask.submit(job)
297'''
300@huey_helper.register_execute(is_load_config=False)
301def article_duplicate_report(job_id):
302 job = models.BackgroundJob.pull(job_id)
303 task = ArticleDuplicateReportBackgroundTask(job)
304 BackgroundApi.execute(task)