Coverage for portality / tasks / datalog_journal_added_update.py: 89%

133 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-04 09:41 +0100

1""" 

2Background job that automatically populate a Google sheet that shows accepted journals. 

3 

4about how to setup google sheet API key for testing, please reference to 

5`Setup google API key for google sheet` in `how-to-setup.md` 

6 

7References 

8* [Origin](https://github.com/DOAJ/doajPM/issues/2810) 

9* [No longer display Seal](https://github.com/DOAJ/doajPM/issues/3829) 

10""" 

11 

12from __future__ import annotations 

13 

14import datetime 

15import itertools 

16import logging 

17import re 

18import time 

19from typing import Callable, List, Iterable 

20 

21import gspread 

22 

23from portality import dao, regex 

24from portality.background import BackgroundTask 

25from portality.core import app 

26from portality.lib import dates, gsheet 

27from portality.models import Journal, datalog_journal_added 

28from portality.models.datalog_journal_added import DatalogJournalAdded, find_last_datalog, DateAddedDescQuery 

29from portality.tasks.helpers import background_helper 

30from portality.tasks.redis_huey import scheduled_short_queue as queue 

31 

32log = logging.getLogger(__name__) 

33 

34 

35class NewDatalogJournalQuery: 

36 def __init__(self, latest_date): 

37 self.latest_date = latest_date 

38 

39 def query(self): 

40 return { 

41 'query': { 

42 'bool': { 

43 'should': [{ 

44 'range': { 

45 "created_date": {"gt": dates.format(self.latest_date)}, 

46 } 

47 }] 

48 }, 

49 }, 

50 'sort': [ 

51 {'created_date': {'order': 'desc'}} 

52 ] 

53 } 

54 

55 

56def find_new_datalog_journals(fetch_date: datetime.datetime) -> Iterable[DatalogJournalAdded]: 

57 records = Journal.iterate(NewDatalogJournalQuery(fetch_date).query()) 

58 return (to_datalog_journal_added(j) for j in records) 

59 

60 

61def to_datalog_journal_added(journal: Journal) -> DatalogJournalAdded: 

62 bibjson = journal.bibjson() 

63 title = bibjson.title 

64 issn = bibjson.eissn or bibjson.pissn 

65 try: 

66 has_continuations = any([journal.get_future_continuations() + journal.get_past_continuations()]) 

67 except RecursionError: 

68 has_continuations = False 

69 

70 record = DatalogJournalAdded(title=title, issn=issn, 

71 date_added=journal.created_timestamp, 

72 has_continuations=has_continuations, 

73 journal_id=journal.id) 

74 

75 return record 

76 

77 

78def get_fetch_datalog_date(n_days=30): 

79 record = find_last_datalog() 

80 if record is None: 

81 return datetime.datetime(2024, 2, 1) 

82 else: 

83 d = dates.parse(record.date_added) 

84 # subtract n days to avoid missing records 

85 d -= datetime.timedelta(days=n_days) 

86 return d 

87 

88 

89def find_latest_row_index(records: List[List[str]]): 

90 records = iter(records) 

91 latest_row_index = 0 

92 while next(records, ['Journal Title'])[0] != 'Journal Title': 

93 latest_row_index += 1 

94 

95 while True: 

96 row = next(records, ['x']) 

97 latest_row_index += 1 

98 if any(r.strip() for r in row): 

99 return latest_row_index 

100 

101 

102def find_latest_issn_list(rows, n=10) -> list[str]: 

103 issn_list = [] 

104 for row in rows: 

105 for c in row: 

106 if re.match(regex.ISSN, c): 

107 issn_list.append(c) 

108 

109 if len(issn_list) >= n: 

110 return issn_list 

111 

112 return issn_list 

113 

114 

115def find_new_xlsx_rows(latest_issn_list, page_size=400) -> list: 

116 """ 

117 find new datalog records and convert to xlsx display format 

118 

119 Parameters 

120 ---------- 

121 latest_issn_list 

122 page_size 

123 

124 Returns 

125 ------- 

126 list of datalog as xlsx display format 

127 

128 

129 """ 

130 

131 new_records = DatalogJournalAdded.iterate(DateAddedDescQuery().query(), page_size=page_size) 

132 new_records = itertools.takewhile(lambda r: r.issn not in latest_issn_list, new_records) 

133 new_records = list(new_records) 

134 new_xlsx_rows = [to_display_data(j) for j in new_records] 

135 return new_xlsx_rows 

136 

137 

138def to_display_data(datalog: DatalogJournalAdded) -> list: 

139 return [datalog.title, datalog.issn, 

140 dates.reformat(datalog.date_added, out_format=DatalogJournalAdded.DATE_FMT), 

141 'Yes' if datalog.has_continuations else ''] 

142 

143 

144def records_new_journals(filename, 

145 worksheet_name, 

146 google_key_path, 

147 logger_fn: Callable[[str], None] = None 

148 ): 

149 if logger_fn is None: 

150 logger_fn = print 

151 

152 sync_datalog_journal_added(logger_fn) 

153 

154 # save new records to google sheet 

155 client = gsheet.load_client(google_key_path) 

156 try: 

157 sh = client.open(filename) 

158 except gspread.exceptions.SpreadsheetNotFound: 

159 logger_fn(f'No spreadsheet named "{filename}" found') 

160 return 

161 try: 

162 worksheet = sh.worksheet(worksheet_name) 

163 except gspread.exceptions.WorksheetNotFound: 

164 logger_fn(f'No worksheet named "{worksheet_name}" found') 

165 return 

166 

167 org_rows = worksheet.get_all_values() 

168 org_rows = list(org_rows) 

169 latest_row_idx = find_latest_row_index(org_rows) 

170 latest_issn_list = find_latest_issn_list(org_rows[latest_row_idx:]) 

171 logger_fn(f'latest_issn_list: {latest_issn_list}') 

172 

173 new_xlsx_rows = find_new_xlsx_rows(latest_issn_list) 

174 worksheet.insert_rows(new_xlsx_rows, latest_row_idx + 1) 

175 logger_fn(f'inserted rows to google sheet [{len(new_xlsx_rows)}]') 

176 

177 

178def sync_datalog_journal_added(logger_fn: Callable[[str], None] = None): 

179 if logger_fn is None: 

180 logger_fn = print 

181 

182 fetch_date = get_fetch_datalog_date() 

183 logger_fn(f'latest_date of Datalog: {fetch_date}') 

184 new_datalog_list = find_new_datalog_journals(fetch_date) 

185 new_datalog_list = (r for r in new_datalog_list if r.issn and not datalog_journal_added.is_issn_exists(r.issn, r.date_added)) 

186 new_datalog_list = (dao.patch_model_for_bulk(r) for r in new_datalog_list) 

187 new_datalog_list = list(new_datalog_list) 

188 if new_datalog_list: 

189 # save new records to DB 

190 DatalogJournalAdded.bulk([r.data for r in new_datalog_list], ) 

191 logger_fn(f'saved new records to datalog [{len(new_datalog_list)}]') 

192 time.sleep(6) # wait for bulk save to complete 

193 else: 

194 logger_fn('No new records found') 

195 

196 

197class DatalogJournalAddedUpdate(BackgroundTask): 

198 """ 

199 ~~DatalogJournalAddedUpdate:Feature->BackgroundTask:Process~~ 

200 """ 

201 __action__ = "datalog_journal_added_update" 

202 

203 def run(self): 

204 kwargs = self.get_bgjob_params() 

205 kwargs['logger_fn'] = self.background_job.add_audit_message 

206 

207 records_new_journals(kwargs.get('filename'), 

208 kwargs.get('worksheet_name'), 

209 kwargs.get('google_key_path'), 

210 logger_fn=kwargs.get('logger_fn') 

211 ) 

212 

213 def cleanup(self): 

214 pass 

215 

216 @classmethod 

217 def prepare(cls, username, **kwargs): 

218 params = {} 

219 cls.set_param(params, "filename", background_helper.get_value_safe('filename', '', kwargs)) 

220 cls.set_param(params, "worksheet_name", background_helper.get_value_safe('worksheet_name', 'Added', kwargs)) 

221 cls.set_param(params, "google_key_path", background_helper.get_value_safe('google_key_path', '', kwargs)) 

222 return background_helper.create_job(username=username, 

223 action=cls.__action__, 

224 params=params, 

225 queue_id=huey_helper.queue_id, ) 

226 

227 @classmethod 

228 def submit(cls, background_job): 

229 background_helper.submit_by_background_job(background_job, datalog_journal_added_update) 

230 

231 

232huey_helper = DatalogJournalAddedUpdate.create_huey_helper(queue) 

233 

234 

235@huey_helper.register_schedule 

236def scheduled_datalog_journal_added_update(): 

237 huey_helper.scheduled_common( 

238 filename=app.config.get("DATALOG_JA_FILENAME"), 

239 worksheet_name=app.config.get("DATALOG_JA_WORKSHEET_NAME"), 

240 google_key_path=app.config.get("GOOGLE_KEY_PATH"), 

241 ) 

242 

243 

244@huey_helper.register_execute(is_load_config=False) 

245def datalog_journal_added_update(job_id): 

246 huey_helper.execute_common(job_id) 

247 

248 

249if __name__ == '__main__': 

250 records_new_journals( 

251 filename=app.config.get("DATALOG_JA_FILENAME"), 

252 worksheet_name=app.config.get("DATALOG_JA_WORKSHEET_NAME"), 

253 google_key_path=app.config.get("GOOGLE_KEY_PATH"), 

254 )