Coverage for portality/tasks/article_cleanup_sync.py: 88%
144 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-22 15:59 +0100
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-22 15:59 +0100
1"""
2For each article in the DOAJ index:
3 * Checks that it has a corresponding journal, deletes it otherwise
4 * Ensures that the article in_doaj status is the same as the journal's
5 * Applies the journal's information to the article metadata as needed
6"""
8from datetime import datetime
10from portality import models
11from portality.core import app
12from portality.tasks.redis_huey import long_running, schedule
13from portality.decorators import write_required
14from portality.background import BackgroundTask, BackgroundApi, BackgroundException
17class ArticleCleanupSyncBackgroundTask(BackgroundTask):
19 __action__ = "article_cleanup_sync"
21 def run(self):
22 """
23 Execute the task as specified by the background_jon
24 :return:
25 """
26 job = self.background_job
27 params = job.params
28 prep_all = self.get_param(params, "prepall", False)
29 write_changes = self.get_param(params, "write", True)
31 batch_size = 100
32 journal_cache = {}
33 failed_articles = []
35 write_batch = []
36 delete_batch = set()
38 updated_count = 0
39 same_count = 0
40 deleted_count = 0
42 # Scroll though all articles in the index
43 i = 0
44 for article_model in models.Article.iterate(q={"query": {"match_all": {}}, "sort": ["_doc"]}, page_size=100, wrap=True, keepalive='5m'):
46 # for debugging, just print out the progress
47 i += 1
48 print(i, article_model.id, len(list(journal_cache.keys())), len(write_batch), len(delete_batch))
50 # Try to find journal in our cache
51 bibjson = article_model.bibjson()
52 allissns = bibjson.issns()
54 cache_miss = False
55 possibles = {}
56 for issn in allissns:
57 if issn in journal_cache:
58 inst = models.Journal(**journal_cache[issn])
59 possibles[inst.id] = inst
60 else:
61 cache_miss = True
62 assoc_journal = None
63 if len(list(possibles.keys())) > 0:
64 assoc_journal = self._get_best_journal(list(possibles.values()))
66 # Cache miss; ask the article model to try to find its journal
67 if assoc_journal is None or cache_miss:
68 journals = models.Journal.find_by_issn(allissns)
69 if len(journals) > 0:
70 assoc_journal = self._get_best_journal(journals)
72 # By the time we get to here, we still might not have a Journal, but we tried.
73 if assoc_journal is not None:
74 # Update the article's metadata, including in_doaj status
75 reg = models.Journal()
76 reg.set_id(assoc_journal.id)
77 changed = article_model.add_journal_metadata(assoc_journal, reg)
79 # cache the minified journal register
80 for issn in reg.bibjson().issns():
81 if issn not in journal_cache:
82 journal_cache[issn] = reg.data
84 if not changed:
85 same_count += 1
86 if prep_all: # This gets done below, but can override to prep unchanged ones here
87 article_model.prep()
88 write_batch.append(article_model.data)
89 else:
90 updated_count += 1
91 if write_changes:
92 article_model.prep()
93 write_batch.append(article_model.data)
95 else:
96 # This article's Journal is no-more, or has evaded us; we delete the article.
97 deleted_count += 1
98 if write_changes:
99 delete_batch.add(article_model.id)
101 # When we have reached the batch limit, do some writing or deleting
102 if len(write_batch) >= batch_size:
103 job.add_audit_message("Writing {x} articles".format(x=len(write_batch)))
104 models.Article.bulk(documents=write_batch)
105 write_batch = []
107 if len(delete_batch) >= batch_size:
108 job.add_audit_message("Deleting {x} articles".format(x=len(delete_batch)))
109 models.Article.bulk_delete(delete_batch)
110 delete_batch.clear()
112 # Finish the last part-batches of writes or deletes
113 if len(write_batch) > 0:
114 job.add_audit_message("Writing {x} articles".format(x=len(write_batch)))
115 models.Article.bulk(documents=write_batch)
116 if len(delete_batch) > 0:
117 job.add_audit_message("Deleting {x} articles".format(x=len(delete_batch)))
118 models.Article.bulk_delete(delete_batch)
119 delete_batch.clear()
121 if write_changes:
122 job.add_audit_message("Done. {0} articles updated, {1} remain unchanged, and {2} deleted.".format(updated_count, same_count, deleted_count))
123 else:
124 job.add_audit_message("Done. Changes not written to index. {0} articles to be updated, {1} to remain unchanged, and {2} to be deleted. Set 'write' to write changes.".format(updated_count, same_count, deleted_count))
126 if len(failed_articles) > 0:
127 job.add_audit_message("Failed to create models for {x} articles in the index. Something is quite wrong.".format(x=len(failed_articles)))
128 job.add_audit_message("Failed article ids: {x}".format(x=", ".join(failed_articles)))
129 job.fail()
131 def _get_best_journal(self, journals):
132 if len(journals) == 1:
133 return list(journals)[0]
135 # in_doaj
136 # most recently updated (manual, then automatic)
137 # both issns match
138 result = { "in_doaj" : {}, "not_in_doaj" : {}}
139 for j in journals:
140 in_doaj = j.is_in_doaj()
141 lmu = j.last_manual_update_timestamp
142 lu = j.last_updated_timestamp
144 context = None
145 if in_doaj:
146 context = result["in_doaj"]
147 else:
148 context = result["not_in_doaj"]
150 if lmu is None:
151 lmu = datetime.utcfromtimestamp(0)
152 if lmu not in context:
153 context[lmu] = {}
154 context[lmu][lu] = j
156 context = None
157 if len(list(result["in_doaj"].keys())) > 0:
158 context = result["in_doaj"]
159 else:
160 context = result["not_in_doaj"]
162 lmus = list(context.keys())
163 lmus.sort()
164 context = context[lmus.pop()]
166 lus = list(context.keys())
167 lus.sort()
168 best = context[lus.pop()]
169 return best
171 def cleanup(self):
172 """
173 Cleanup after a successful OR failed run of the task
174 :return:
175 """
176 pass
178 @classmethod
179 def prepare(cls, username, **kwargs):
180 """
181 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob,
182 or fail with a suitable exception
184 :param kwargs: arbitrary keyword arguments pertaining to this task type
185 :return: a BackgroundJob instance representing this task
186 """
188 write = kwargs.get("write", True)
189 prepall = kwargs.get("prepall", False)
191 if not write and prepall:
192 raise BackgroundException("'prepall' must be used with the 'write' parameter set to True (why prep but not save?)")
194 params = {}
195 cls.set_param(params, "write", write)
196 cls.set_param(params, "prepall", prepall)
198 # first prepare a job record
199 job = models.BackgroundJob()
200 job.user = username
201 job.action = cls.__action__
202 job.params = params
203 if prepall:
204 job.add_audit_message("'prepall' arg set. 'unchanged' articles will also have their indexes refreshed.")
205 return job
207 @classmethod
208 def submit(cls, background_job):
209 """
210 Submit the specified BackgroundJob to the background queue
212 :param background_job: the BackgroundJob instance
213 :return:
214 """
215 background_job.save()
216 article_cleanup_sync.schedule(args=(background_job.id,), delay=10)
219@long_running.periodic_task(schedule("article_cleanup_sync"))
220@write_required(script=True)
221def scheduled_article_cleanup_sync():
222 user = app.config.get("SYSTEM_USERNAME")
223 job = ArticleCleanupSyncBackgroundTask.prepare(user)
224 ArticleCleanupSyncBackgroundTask.submit(job)
227@long_running.task()
228@write_required(script=True)
229def article_cleanup_sync(job_id):
230 job = models.BackgroundJob.pull(job_id)
231 task = ArticleCleanupSyncBackgroundTask(job)
232 BackgroundApi.execute(task)