Coverage for portality / tasks / datalog_journal_added_update.py: 89%
133 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
1"""
2Background job that automatically populate a Google sheet that shows accepted journals.
4about how to setup google sheet API key for testing, please reference to
5`Setup google API key for google sheet` in `how-to-setup.md`
7References
8* [Origin](https://github.com/DOAJ/doajPM/issues/2810)
9* [No longer display Seal](https://github.com/DOAJ/doajPM/issues/3829)
10"""
12from __future__ import annotations
14import datetime
15import itertools
16import logging
17import re
18import time
19from typing import Callable, List, Iterable
21import gspread
23from portality import dao, regex
24from portality.background import BackgroundTask
25from portality.core import app
26from portality.lib import dates, gsheet
27from portality.models import Journal, datalog_journal_added
28from portality.models.datalog_journal_added import DatalogJournalAdded, find_last_datalog, DateAddedDescQuery
29from portality.tasks.helpers import background_helper
30from portality.tasks.redis_huey import scheduled_short_queue as queue
32log = logging.getLogger(__name__)
35class NewDatalogJournalQuery:
36 def __init__(self, latest_date):
37 self.latest_date = latest_date
39 def query(self):
40 return {
41 'query': {
42 'bool': {
43 'should': [{
44 'range': {
45 "created_date": {"gt": dates.format(self.latest_date)},
46 }
47 }]
48 },
49 },
50 'sort': [
51 {'created_date': {'order': 'desc'}}
52 ]
53 }
56def find_new_datalog_journals(fetch_date: datetime.datetime) -> Iterable[DatalogJournalAdded]:
57 records = Journal.iterate(NewDatalogJournalQuery(fetch_date).query())
58 return (to_datalog_journal_added(j) for j in records)
61def to_datalog_journal_added(journal: Journal) -> DatalogJournalAdded:
62 bibjson = journal.bibjson()
63 title = bibjson.title
64 issn = bibjson.eissn or bibjson.pissn
65 try:
66 has_continuations = any([journal.get_future_continuations() + journal.get_past_continuations()])
67 except RecursionError:
68 has_continuations = False
70 record = DatalogJournalAdded(title=title, issn=issn,
71 date_added=journal.created_timestamp,
72 has_continuations=has_continuations,
73 journal_id=journal.id)
75 return record
78def get_fetch_datalog_date(n_days=30):
79 record = find_last_datalog()
80 if record is None:
81 return datetime.datetime(2024, 2, 1)
82 else:
83 d = dates.parse(record.date_added)
84 # subtract n days to avoid missing records
85 d -= datetime.timedelta(days=n_days)
86 return d
89def find_latest_row_index(records: List[List[str]]):
90 records = iter(records)
91 latest_row_index = 0
92 while next(records, ['Journal Title'])[0] != 'Journal Title':
93 latest_row_index += 1
95 while True:
96 row = next(records, ['x'])
97 latest_row_index += 1
98 if any(r.strip() for r in row):
99 return latest_row_index
102def find_latest_issn_list(rows, n=10) -> list[str]:
103 issn_list = []
104 for row in rows:
105 for c in row:
106 if re.match(regex.ISSN, c):
107 issn_list.append(c)
109 if len(issn_list) >= n:
110 return issn_list
112 return issn_list
115def find_new_xlsx_rows(latest_issn_list, page_size=400) -> list:
116 """
117 find new datalog records and convert to xlsx display format
119 Parameters
120 ----------
121 latest_issn_list
122 page_size
124 Returns
125 -------
126 list of datalog as xlsx display format
129 """
131 new_records = DatalogJournalAdded.iterate(DateAddedDescQuery().query(), page_size=page_size)
132 new_records = itertools.takewhile(lambda r: r.issn not in latest_issn_list, new_records)
133 new_records = list(new_records)
134 new_xlsx_rows = [to_display_data(j) for j in new_records]
135 return new_xlsx_rows
138def to_display_data(datalog: DatalogJournalAdded) -> list:
139 return [datalog.title, datalog.issn,
140 dates.reformat(datalog.date_added, out_format=DatalogJournalAdded.DATE_FMT),
141 'Yes' if datalog.has_continuations else '']
144def records_new_journals(filename,
145 worksheet_name,
146 google_key_path,
147 logger_fn: Callable[[str], None] = None
148 ):
149 if logger_fn is None:
150 logger_fn = print
152 sync_datalog_journal_added(logger_fn)
154 # save new records to google sheet
155 client = gsheet.load_client(google_key_path)
156 try:
157 sh = client.open(filename)
158 except gspread.exceptions.SpreadsheetNotFound:
159 logger_fn(f'No spreadsheet named "{filename}" found')
160 return
161 try:
162 worksheet = sh.worksheet(worksheet_name)
163 except gspread.exceptions.WorksheetNotFound:
164 logger_fn(f'No worksheet named "{worksheet_name}" found')
165 return
167 org_rows = worksheet.get_all_values()
168 org_rows = list(org_rows)
169 latest_row_idx = find_latest_row_index(org_rows)
170 latest_issn_list = find_latest_issn_list(org_rows[latest_row_idx:])
171 logger_fn(f'latest_issn_list: {latest_issn_list}')
173 new_xlsx_rows = find_new_xlsx_rows(latest_issn_list)
174 worksheet.insert_rows(new_xlsx_rows, latest_row_idx + 1)
175 logger_fn(f'inserted rows to google sheet [{len(new_xlsx_rows)}]')
178def sync_datalog_journal_added(logger_fn: Callable[[str], None] = None):
179 if logger_fn is None:
180 logger_fn = print
182 fetch_date = get_fetch_datalog_date()
183 logger_fn(f'latest_date of Datalog: {fetch_date}')
184 new_datalog_list = find_new_datalog_journals(fetch_date)
185 new_datalog_list = (r for r in new_datalog_list if r.issn and not datalog_journal_added.is_issn_exists(r.issn, r.date_added))
186 new_datalog_list = (dao.patch_model_for_bulk(r) for r in new_datalog_list)
187 new_datalog_list = list(new_datalog_list)
188 if new_datalog_list:
189 # save new records to DB
190 DatalogJournalAdded.bulk([r.data for r in new_datalog_list], )
191 logger_fn(f'saved new records to datalog [{len(new_datalog_list)}]')
192 time.sleep(6) # wait for bulk save to complete
193 else:
194 logger_fn('No new records found')
197class DatalogJournalAddedUpdate(BackgroundTask):
198 """
199 ~~DatalogJournalAddedUpdate:Feature->BackgroundTask:Process~~
200 """
201 __action__ = "datalog_journal_added_update"
203 def run(self):
204 kwargs = self.get_bgjob_params()
205 kwargs['logger_fn'] = self.background_job.add_audit_message
207 records_new_journals(kwargs.get('filename'),
208 kwargs.get('worksheet_name'),
209 kwargs.get('google_key_path'),
210 logger_fn=kwargs.get('logger_fn')
211 )
213 def cleanup(self):
214 pass
216 @classmethod
217 def prepare(cls, username, **kwargs):
218 params = {}
219 cls.set_param(params, "filename", background_helper.get_value_safe('filename', '', kwargs))
220 cls.set_param(params, "worksheet_name", background_helper.get_value_safe('worksheet_name', 'Added', kwargs))
221 cls.set_param(params, "google_key_path", background_helper.get_value_safe('google_key_path', '', kwargs))
222 return background_helper.create_job(username=username,
223 action=cls.__action__,
224 params=params,
225 queue_id=huey_helper.queue_id, )
227 @classmethod
228 def submit(cls, background_job):
229 background_helper.submit_by_background_job(background_job, datalog_journal_added_update)
232huey_helper = DatalogJournalAddedUpdate.create_huey_helper(queue)
235@huey_helper.register_schedule
236def scheduled_datalog_journal_added_update():
237 huey_helper.scheduled_common(
238 filename=app.config.get("DATALOG_JA_FILENAME"),
239 worksheet_name=app.config.get("DATALOG_JA_WORKSHEET_NAME"),
240 google_key_path=app.config.get("GOOGLE_KEY_PATH"),
241 )
244@huey_helper.register_execute(is_load_config=False)
245def datalog_journal_added_update(job_id):
246 huey_helper.execute_common(job_id)
249if __name__ == '__main__':
250 records_new_journals(
251 filename=app.config.get("DATALOG_JA_FILENAME"),
252 worksheet_name=app.config.get("DATALOG_JA_WORKSHEET_NAME"),
253 google_key_path=app.config.get("GOOGLE_KEY_PATH"),
254 )