Coverage for portality / tasks / article_cleanup_sync.py: 87%
144 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
1"""
2For each article in the DOAJ index:
3 * Checks that it has a corresponding journal, deletes it otherwise
4 * Ensures that the article in_doaj status is the same as the journal's
5 * Applies the journal's information to the article metadata as needed
6"""
8from datetime import datetime
10from portality import models
11from portality.background import BackgroundTask, BackgroundApi, BackgroundException
12from portality.core import app
13from portality.tasks.helpers import background_helper
14from portality.tasks.redis_huey import scheduled_long_queue as queue
16class ArticleCleanupSyncBackgroundTask(BackgroundTask):
18 __action__ = "article_cleanup_sync"
20 def run(self):
21 """
22 Execute the task as specified by the background_jon
23 :return:
24 """
25 job = self.background_job
26 params = job.params
27 prep_all = self.get_param(params, "prepall", False)
28 write_changes = self.get_param(params, "write", True)
30 batch_size = 100
31 journal_cache = {}
32 failed_articles = []
34 write_batch = []
35 delete_batch = set()
37 updated_count = 0
38 same_count = 0
39 deleted_count = 0
41 # Scroll though all articles in the index within the update range
42 i = 0
43 page_size = 1000
44 for article_model in models.Article.iterall_unstable(
45 page_size=page_size,
46 striped=True,
47 prefix_size=3,
48 wrap=True,
49 logger=self.background_job.add_audit_message):
51 # for debugging, just print out the progress
52 i += 1
53 if i % page_size == 0:
54 job.add_audit_message("Progress: Write batch/total: {x}/{a}; delete batch/total: {y}/{b}".format(x=len(write_batch), y=len(delete_batch), a=updated_count, b=deleted_count))
56 # Try to find journal in our cache
57 bibjson = article_model.bibjson()
58 allissns = bibjson.issns()
60 cache_miss = False
61 possibles = {}
62 for issn in allissns:
63 if issn in journal_cache:
64 inst = models.Journal(**journal_cache[issn])
65 possibles[inst.id] = inst
66 else:
67 cache_miss = True
68 assoc_journal = None
69 if len(list(possibles.keys())) > 0:
70 assoc_journal = self._get_best_journal(list(possibles.values()))
72 # Cache miss; ask the article model to try to find its journal
73 if assoc_journal is None or cache_miss:
74 journals = models.Journal.find_by_issn(allissns)
75 if len(journals) > 0:
76 assoc_journal = self._get_best_journal(journals)
78 # By the time we get to here, we still might not have a Journal, but we tried.
79 if assoc_journal is not None:
80 # Update the article's metadata, including in_doaj status
81 reg = models.Journal()
82 reg.set_id(assoc_journal.id)
83 changed = article_model.add_journal_metadata(assoc_journal, reg)
85 # cache the minified journal register
86 for issn in reg.bibjson().issns():
87 if issn not in journal_cache:
88 journal_cache[issn] = reg.data
90 if not changed:
91 same_count += 1
92 if prep_all: # This gets done below, but can override to prep unchanged ones here
93 article_model.prep()
94 write_batch.append(article_model.data)
95 else:
96 updated_count += 1
97 if write_changes:
98 article_model.prep()
99 write_batch.append(article_model.data)
101 else:
102 # This article's Journal is no-more, or has evaded us; we delete the article.
103 deleted_count += 1
104 if write_changes:
105 delete_batch.add(article_model.id)
107 # When we have reached the batch limit, do some writing or deleting
108 if len(write_batch) >= batch_size:
109 job.add_audit_message("Writing {x} articles".format(x=len(write_batch)))
110 models.Article.bulk(documents=write_batch)
111 write_batch = []
113 if len(delete_batch) >= batch_size:
114 job.add_audit_message("Deleting {x} articles".format(x=len(delete_batch)))
115 models.Article.bulk_delete(delete_batch)
116 delete_batch.clear()
118 # Finish the last part-batches of writes or deletes
119 if len(write_batch) > 0:
120 job.add_audit_message("Writing {x} articles".format(x=len(write_batch)))
121 models.Article.bulk(documents=write_batch)
122 if len(delete_batch) > 0:
123 job.add_audit_message("Deleting {x} articles".format(x=len(delete_batch)))
124 models.Article.bulk_delete(delete_batch)
125 delete_batch.clear()
127 if write_changes:
128 job.add_audit_message("Done. {0} articles updated, {1} remain unchanged, and {2} deleted.".format(updated_count, same_count, deleted_count))
129 else:
130 job.add_audit_message("Done. Changes not written to index. {0} articles to be updated, {1} to remain unchanged, and {2} to be deleted. Set 'write' to write changes.".format(updated_count, same_count, deleted_count))
132 if len(failed_articles) > 0:
133 job.add_audit_message("Failed to create models for {x} articles in the index. Something is quite wrong.".format(x=len(failed_articles)))
134 job.add_audit_message("Failed article ids: {x}".format(x=", ".join(failed_articles)))
135 job.fail()
137 def _get_best_journal(self, journals):
138 if len(journals) == 1:
139 return list(journals)[0]
141 # in_doaj
142 # most recently updated (manual, then automatic)
143 # both issns match
144 result = { "in_doaj" : {}, "not_in_doaj" : {}}
145 for j in journals:
146 in_doaj = j.is_in_doaj()
147 lmu = j.last_manual_update_timestamp
148 lu = j.last_updated_timestamp
150 context = None
151 if in_doaj:
152 context = result["in_doaj"]
153 else:
154 context = result["not_in_doaj"]
156 if lmu is None:
157 lmu = datetime.utcfromtimestamp(0)
158 if lmu not in context:
159 context[lmu] = {}
160 context[lmu][lu] = j
162 context = None
163 if len(list(result["in_doaj"].keys())) > 0:
164 context = result["in_doaj"]
165 else:
166 context = result["not_in_doaj"]
168 lmus = list(context.keys())
169 lmus.sort()
170 context = context[lmus.pop()]
172 lus = list(context.keys())
173 lus.sort()
174 best = context[lus.pop()]
175 return best
177 def cleanup(self):
178 """
179 Cleanup after a successful OR failed run of the task
180 :return:
181 """
182 pass
184 @classmethod
185 def prepare(cls, username, **kwargs):
186 """
187 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob,
188 or fail with a suitable exception
190 :param kwargs: arbitrary keyword arguments pertaining to this task type
191 :return: a BackgroundJob instance representing this task
192 """
194 write = kwargs.get("write", True)
195 prepall = kwargs.get("prepall", False)
196 all_time = kwargs.get("all_time", True)
198 if not write and prepall:
199 raise BackgroundException("'prepall' must be used with the 'write' parameter set to True (why prep but not save?)")
201 params = {}
202 cls.set_param(params, "write", write)
203 cls.set_param(params, "prepall", prepall)
204 cls.set_param(params, "all_time", all_time)
206 # first prepare a job record
207 job = background_helper.create_job(username=username,
208 action=cls.__action__,
209 params=params,
210 queue_id=huey_helper.queue_id, )
211 if prepall:
212 job.add_audit_message("'prepall' arg set. 'unchanged' articles will also have their indexes refreshed.")
213 return job
215 @classmethod
216 def submit(cls, background_job):
217 """
218 Submit the specified BackgroundJob to the background queue
220 :param background_job: the BackgroundJob instance
221 :return:
222 """
223 background_job.save()
224 article_cleanup_sync.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10))
227huey_helper = ArticleCleanupSyncBackgroundTask.create_huey_helper(queue)
230@huey_helper.register_schedule
231def scheduled_article_cleanup_sync():
232 user = app.config.get("SYSTEM_USERNAME")
233 job = ArticleCleanupSyncBackgroundTask.prepare(user)
234 ArticleCleanupSyncBackgroundTask.submit(job)
236@huey_helper.register_execute(is_load_config=False)
237def article_cleanup_sync(job_id):
238 job = models.BackgroundJob.pull(job_id)
239 task = ArticleCleanupSyncBackgroundTask(job)
240 BackgroundApi.execute(task)