Coverage for portality / bll / services / public_data_dump.py: 84%

223 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-04 09:41 +0100

1from portality.models import DataDump 

2from portality.core import app 

3from portality.store import StoreFactory 

4from portality import models, constants 

5from portality.lib import dates 

6from portality.api.current import DiscoveryApi 

7from portality.bll import exceptions 

8 

9import os 

10import tarfile 

11import json 

12 

13class PublicDataDumpService: 

14 ARTICLE = "article" 

15 JOURNAL = "journal" 

16 ALL = [ARTICLE, JOURNAL] 

17 

18 def __init__(self, logger=None): 

19 self.logger = logger if logger is not None else lambda x: None 

20 

21 def remove_pdd_container(self, store=None): 

22 """ 

23 Empty the public data dump container. 

24 """ 

25 container = app.config.get("STORE_PUBLIC_DATA_DUMP_CONTAINER") 

26 

27 if store is None: 

28 store = StoreFactory.get(constants.STORE__SCOPE__PUBLIC_DATA_DUMP) 

29 

30 store.delete_container(container) 

31 

32 def dump_type(self, type, dump_start_time=None, store=None): 

33 if dump_start_time is None: 

34 dump_start_time = dates.now() 

35 dump_date = dates.format(dump_start_time, dates.FMT_DATE_STD) 

36 

37 container = app.config.get("STORE_PUBLIC_DATA_DUMP_CONTAINER") 

38 tmpStore = StoreFactory.tmp() 

39 

40 out_dir = tmpStore.path(container, 

41 "doaj_" + type + "_data_" + dump_date, 

42 create_container=True, 

43 must_exist=False) 

44 

45 out_name = os.path.basename(out_dir) 

46 zipped_name = out_name + ".tar.gz" 

47 zip_dir = os.path.dirname(out_dir) 

48 zipped_path = os.path.join(zip_dir, zipped_name) 

49 tarball = tarfile.open(zipped_path, "w:gz") 

50 

51 file_num = 1 

52 out_file, path, filename = self._start_new_file(tmpStore, container, type, dump_date, file_num) 

53 

54 page_size = app.config.get("DISCOVERY_BULK_PAGE_SIZE", 1000) 

55 records_per_file = app.config.get('DISCOVERY_RECORDS_PER_FILE', 100000) 

56 

57 first_in_file = True 

58 count = 0 

59 for result in DiscoveryApi.scroll(type, None, None, page_size, scan=True): 

60 if not first_in_file: 

61 out_file.write(",\n") 

62 else: 

63 first_in_file = False 

64 out_file.write(json.dumps(result)) 

65 count += 1 

66 

67 if count >= records_per_file: 

68 file_num += 1 

69 self._finish_file(tmpStore, container, filename, path, out_file, tarball) 

70 out_file, path, filename = self._start_new_file(tmpStore, container, type, dump_date, file_num) 

71 first_in_file = True 

72 count = 0 

73 

74 if count > 0: 

75 self._finish_file(tmpStore, container, filename, path, out_file, tarball) 

76 

77 tarball.close() 

78 

79 if store is None: 

80 store = StoreFactory.get(constants.STORE__SCOPE__PUBLIC_DATA_DUMP) 

81 

82 # Copy the source directory to main store 

83 try: 

84 filesize = self._copy_on_complete(store, tmpStore, container, zipped_path) 

85 except Exception as e: 

86 raise exceptions.SaveException("Error copying {0} data on complete {1}\n".format(type, str(e))) 

87 finally: 

88 tmpStore.delete_container(container) 

89 

90 store_url = store.url(container, zipped_name) 

91 return container, zipped_name, filesize, store_url 

92 

93 def dump(self, types=None, store=None, prune=True): 

94 if store is None: 

95 store = StoreFactory.get(constants.STORE__SCOPE__PUBLIC_DATA_DUMP) 

96 

97 dump_start_time = dates.now() 

98 if types is None: 

99 types = self.ALL 

100 

101 dd = models.DataDump() 

102 dd.dump_date = dump_start_time 

103 

104 for typ in types: 

105 self.logger("Starting export of " + typ) 

106 result = self.dump_type(typ, dump_start_time, store=store) 

107 

108 match typ: 

109 case self.ARTICLE: 

110 dd.set_article_dump(*result) 

111 case self.JOURNAL: 

112 dd.set_journal_dump(*result) 

113 

114 dd.save() 

115 

116 if prune: 

117 self.prune(store=store, ignore=[dd.article_filename, dd.journal_filename]) 

118 

119 return dd 

120 

121 def get_premium_dump(self): 

122 # Get the latest data dump 

123 return models.DataDump.find_latest() 

124 

125 def get_free_dump(self, cutoff=None): 

126 if cutoff is None: 

127 cutoff_seconds = app.config.get("NON_PREMIUM_DELAY_SECONDS", 2592000) + 86400 

128 

129 # if we are in the phase-in period, cap the delay to the phase in date 

130 if app.config.get("PREMIUM_PHASE_IN", False): 

131 phase_in_start = app.config.get("PREMIUM_PHASE_IN_START") 

132 if phase_in_start is not None: 

133 max_delay = dates.now() - phase_in_start 

134 if max_delay.total_seconds() < cutoff_seconds: 

135 cutoff_seconds = max_delay.total_seconds() 

136 

137 cutoff = dates.before_now(cutoff_seconds) 

138 

139 # get the first dump after the cutoff 

140 option = models.DataDump.first_dump_after(cutoff=cutoff) 

141 if option is not None: 

142 return option 

143 

144 # if there was no such dump, just return the latest 

145 return models.DataDump.find_latest() 

146 

147 def get_temporary_url(self, data_dump: models.DataDump, type): 

148 container, filename = None, None 

149 match type: 

150 case self.ARTICLE: 

151 container = data_dump.article_container 

152 filename = data_dump.article_filename 

153 case self.JOURNAL: 

154 container = data_dump.journal_container 

155 filename = data_dump.journal_filename 

156 

157 if container is None or filename is None: 

158 raise exceptions.NoSuchPropertyException("Cannot find container and filename for {type} data dump".format(type=type)) 

159 

160 main_store = StoreFactory.get(constants.STORE__SCOPE__PUBLIC_DATA_DUMP) 

161 store_url = main_store.temporary_url(container, filename, 

162 timeout=app.config.get("PUBLIC_DATA_DUMP_URL_TIMEOUT", 3600)) 

163 return store_url 

164 

165 def prune(self, store=None, ignore=None): 

166 if store is None: 

167 store = StoreFactory.get(constants.STORE__SCOPE__PUBLIC_DATA_DUMP) 

168 

169 if ignore is None: 

170 ignore = [] 

171 

172 # First we're going to remove all the files for data dump records which are too old to keep 

173 total = models.DataDump.count() 

174 old_dds = models.DataDump.all_dumps_before(dates.before_now(app.config.get("NON_PREMIUM_DELAY_SECONDS") + 86400)) 

175 

176 # if removing the old_dds would leave us without any data dump records, then don't do anything 

177 if total <= len(old_dds): 

178 self.logger("Not removing any old data dump records, as this would leave us with none") 

179 else: 

180 for dd in old_dds: 

181 ac = dd.article_container 

182 af = dd.article_filename 

183 try: 

184 store.delete_file(ac, af) 

185 except: 

186 pass 

187 

188 jc = dd.journal_container 

189 jf = dd.journal_filename 

190 try: 

191 store.delete_file(jc, jf) 

192 except: 

193 pass 

194 

195 dd.delete() 

196 

197 # Second we're going to check the container for files which don't have index records, and 

198 # clean them up 

199 

200 # get the files in storage 

201 container = app.config.get("STORE_PUBLIC_DATA_DUMP_CONTAINER") 

202 

203 try: 

204 container_files = store.list(container) 

205 except: 

206 container_files = [] 

207 

208 # if the filename doesn't match anything, remove the file 

209 for cf in container_files: 

210 if cf in ignore: 

211 continue 

212 dd = models.DataDump.find_by_filename(cf) 

213 if dd is None or len(dd) == 0: 

214 self.logger("No related index record; Deleting file {x} from storage container {y}".format(x=cf, y=container)) 

215 try: 

216 store.delete_file(container, cf) 

217 except: 

218 pass 

219 

220 # Finally, we check all the records in the index and confirm their files exist, and if not 

221 # remove the record 

222 for dd in DataDump.iterate_unstable(): 

223 article_missing = False 

224 journal_missing = False 

225 if dd.article_container is not None and dd.article_filename is not None: 

226 try: 

227 container_files = store.list(dd.article_container) 

228 if dd.article_filename not in container_files: 

229 self.logger("File {x} in container {y} does not exist".format(x=dd.article_filename, 

230 y=dd.article_container)) 

231 article_missing = True 

232 except: 

233 pass 

234 

235 if dd.journal_container is not None and dd.journal_filename is not None: 

236 try: 

237 container_files = store.list(dd.journal_container) 

238 if dd.journal_filename not in container_files: 

239 self.logger("File {x} in container {y} does not exist".format(x=dd.journal_filename, y=dd.journal_container)) 

240 journal_missing = True 

241 except: 

242 pass 

243 

244 if article_missing and journal_missing: 

245 self.logger("Both files missing for {x}".format(x=dd.id)) 

246 dd.delete() 

247 

248 elif article_missing: 

249 dd.remove_article_dump() 

250 dd.save() 

251 

252 elif journal_missing: 

253 dd.remove_journal_dump() 

254 dd.save() 

255 

256 def delete_pdd(self, id): 

257 dd = models.DataDump.pull(id) 

258 if dd is None: 

259 return False 

260 

261 store = StoreFactory.get(constants.STORE__SCOPE__PUBLIC_DATA_DUMP) 

262 

263 ac = dd.article_container 

264 af = dd.article_filename 

265 try: 

266 store.delete_file(ac, af) 

267 except: 

268 pass 

269 

270 jc = dd.journal_container 

271 jf = dd.journal_filename 

272 try: 

273 store.delete_file(jc, jf) 

274 except: 

275 pass 

276 

277 dd.delete() 

278 return True 

279 

280 def _start_new_file(self, storage, container, typ, day_at_start, file_num): 

281 filename = self._filename(typ, day_at_start, file_num) 

282 output_file = storage.path(container, filename, create_container=True, must_exist=False) 

283 dn = os.path.dirname(output_file) 

284 if not os.path.exists(dn): 

285 os.makedirs(dn) 

286 self.logger("Saving to file {filename}".format(filename=filename)) 

287 

288 out_file = open(output_file, "w", encoding="utf-8") 

289 out_file.write("[") 

290 return out_file, output_file, filename 

291 

292 def _filename(self, typ, day_at_start, file_num): 

293 return os.path.join("doaj_" + typ + "_data_" + day_at_start, "{typ}_batch_{file_num}.json".format(typ=typ, file_num=file_num)) 

294 

295 def _finish_file(self, storage, container, filename, path, out_file, tarball): 

296 out_file.write("]") 

297 out_file.close() 

298 

299 self.logger("Adding file {filename} to compressed tar".format(filename=filename)) 

300 tarball.add(path, arcname=filename) 

301 storage.delete_file(container, filename) 

302 

303 def _copy_on_complete(self, mainStore, tmpStore, container, zipped_path): 

304 zipped_size = os.path.getsize(zipped_path) 

305 zipped_name = os.path.basename(zipped_path) 

306 self.logger("Storing from temporary file {0} ({1} bytes) to container {2}".format(zipped_name, zipped_size, container)) 

307 mainStore.store(container, zipped_name, source_path=zipped_path) 

308 tmpStore.delete_file(container, zipped_name) 

309 return zipped_size