Coverage for portality / bll / services / public_data_dump.py: 84%
223 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
1from portality.models import DataDump
2from portality.core import app
3from portality.store import StoreFactory
4from portality import models, constants
5from portality.lib import dates
6from portality.api.current import DiscoveryApi
7from portality.bll import exceptions
9import os
10import tarfile
11import json
13class PublicDataDumpService:
14 ARTICLE = "article"
15 JOURNAL = "journal"
16 ALL = [ARTICLE, JOURNAL]
18 def __init__(self, logger=None):
19 self.logger = logger if logger is not None else lambda x: None
21 def remove_pdd_container(self, store=None):
22 """
23 Empty the public data dump container.
24 """
25 container = app.config.get("STORE_PUBLIC_DATA_DUMP_CONTAINER")
27 if store is None:
28 store = StoreFactory.get(constants.STORE__SCOPE__PUBLIC_DATA_DUMP)
30 store.delete_container(container)
32 def dump_type(self, type, dump_start_time=None, store=None):
33 if dump_start_time is None:
34 dump_start_time = dates.now()
35 dump_date = dates.format(dump_start_time, dates.FMT_DATE_STD)
37 container = app.config.get("STORE_PUBLIC_DATA_DUMP_CONTAINER")
38 tmpStore = StoreFactory.tmp()
40 out_dir = tmpStore.path(container,
41 "doaj_" + type + "_data_" + dump_date,
42 create_container=True,
43 must_exist=False)
45 out_name = os.path.basename(out_dir)
46 zipped_name = out_name + ".tar.gz"
47 zip_dir = os.path.dirname(out_dir)
48 zipped_path = os.path.join(zip_dir, zipped_name)
49 tarball = tarfile.open(zipped_path, "w:gz")
51 file_num = 1
52 out_file, path, filename = self._start_new_file(tmpStore, container, type, dump_date, file_num)
54 page_size = app.config.get("DISCOVERY_BULK_PAGE_SIZE", 1000)
55 records_per_file = app.config.get('DISCOVERY_RECORDS_PER_FILE', 100000)
57 first_in_file = True
58 count = 0
59 for result in DiscoveryApi.scroll(type, None, None, page_size, scan=True):
60 if not first_in_file:
61 out_file.write(",\n")
62 else:
63 first_in_file = False
64 out_file.write(json.dumps(result))
65 count += 1
67 if count >= records_per_file:
68 file_num += 1
69 self._finish_file(tmpStore, container, filename, path, out_file, tarball)
70 out_file, path, filename = self._start_new_file(tmpStore, container, type, dump_date, file_num)
71 first_in_file = True
72 count = 0
74 if count > 0:
75 self._finish_file(tmpStore, container, filename, path, out_file, tarball)
77 tarball.close()
79 if store is None:
80 store = StoreFactory.get(constants.STORE__SCOPE__PUBLIC_DATA_DUMP)
82 # Copy the source directory to main store
83 try:
84 filesize = self._copy_on_complete(store, tmpStore, container, zipped_path)
85 except Exception as e:
86 raise exceptions.SaveException("Error copying {0} data on complete {1}\n".format(type, str(e)))
87 finally:
88 tmpStore.delete_container(container)
90 store_url = store.url(container, zipped_name)
91 return container, zipped_name, filesize, store_url
93 def dump(self, types=None, store=None, prune=True):
94 if store is None:
95 store = StoreFactory.get(constants.STORE__SCOPE__PUBLIC_DATA_DUMP)
97 dump_start_time = dates.now()
98 if types is None:
99 types = self.ALL
101 dd = models.DataDump()
102 dd.dump_date = dump_start_time
104 for typ in types:
105 self.logger("Starting export of " + typ)
106 result = self.dump_type(typ, dump_start_time, store=store)
108 match typ:
109 case self.ARTICLE:
110 dd.set_article_dump(*result)
111 case self.JOURNAL:
112 dd.set_journal_dump(*result)
114 dd.save()
116 if prune:
117 self.prune(store=store, ignore=[dd.article_filename, dd.journal_filename])
119 return dd
121 def get_premium_dump(self):
122 # Get the latest data dump
123 return models.DataDump.find_latest()
125 def get_free_dump(self, cutoff=None):
126 if cutoff is None:
127 cutoff_seconds = app.config.get("NON_PREMIUM_DELAY_SECONDS", 2592000) + 86400
129 # if we are in the phase-in period, cap the delay to the phase in date
130 if app.config.get("PREMIUM_PHASE_IN", False):
131 phase_in_start = app.config.get("PREMIUM_PHASE_IN_START")
132 if phase_in_start is not None:
133 max_delay = dates.now() - phase_in_start
134 if max_delay.total_seconds() < cutoff_seconds:
135 cutoff_seconds = max_delay.total_seconds()
137 cutoff = dates.before_now(cutoff_seconds)
139 # get the first dump after the cutoff
140 option = models.DataDump.first_dump_after(cutoff=cutoff)
141 if option is not None:
142 return option
144 # if there was no such dump, just return the latest
145 return models.DataDump.find_latest()
147 def get_temporary_url(self, data_dump: models.DataDump, type):
148 container, filename = None, None
149 match type:
150 case self.ARTICLE:
151 container = data_dump.article_container
152 filename = data_dump.article_filename
153 case self.JOURNAL:
154 container = data_dump.journal_container
155 filename = data_dump.journal_filename
157 if container is None or filename is None:
158 raise exceptions.NoSuchPropertyException("Cannot find container and filename for {type} data dump".format(type=type))
160 main_store = StoreFactory.get(constants.STORE__SCOPE__PUBLIC_DATA_DUMP)
161 store_url = main_store.temporary_url(container, filename,
162 timeout=app.config.get("PUBLIC_DATA_DUMP_URL_TIMEOUT", 3600))
163 return store_url
165 def prune(self, store=None, ignore=None):
166 if store is None:
167 store = StoreFactory.get(constants.STORE__SCOPE__PUBLIC_DATA_DUMP)
169 if ignore is None:
170 ignore = []
172 # First we're going to remove all the files for data dump records which are too old to keep
173 total = models.DataDump.count()
174 old_dds = models.DataDump.all_dumps_before(dates.before_now(app.config.get("NON_PREMIUM_DELAY_SECONDS") + 86400))
176 # if removing the old_dds would leave us without any data dump records, then don't do anything
177 if total <= len(old_dds):
178 self.logger("Not removing any old data dump records, as this would leave us with none")
179 else:
180 for dd in old_dds:
181 ac = dd.article_container
182 af = dd.article_filename
183 try:
184 store.delete_file(ac, af)
185 except:
186 pass
188 jc = dd.journal_container
189 jf = dd.journal_filename
190 try:
191 store.delete_file(jc, jf)
192 except:
193 pass
195 dd.delete()
197 # Second we're going to check the container for files which don't have index records, and
198 # clean them up
200 # get the files in storage
201 container = app.config.get("STORE_PUBLIC_DATA_DUMP_CONTAINER")
203 try:
204 container_files = store.list(container)
205 except:
206 container_files = []
208 # if the filename doesn't match anything, remove the file
209 for cf in container_files:
210 if cf in ignore:
211 continue
212 dd = models.DataDump.find_by_filename(cf)
213 if dd is None or len(dd) == 0:
214 self.logger("No related index record; Deleting file {x} from storage container {y}".format(x=cf, y=container))
215 try:
216 store.delete_file(container, cf)
217 except:
218 pass
220 # Finally, we check all the records in the index and confirm their files exist, and if not
221 # remove the record
222 for dd in DataDump.iterate_unstable():
223 article_missing = False
224 journal_missing = False
225 if dd.article_container is not None and dd.article_filename is not None:
226 try:
227 container_files = store.list(dd.article_container)
228 if dd.article_filename not in container_files:
229 self.logger("File {x} in container {y} does not exist".format(x=dd.article_filename,
230 y=dd.article_container))
231 article_missing = True
232 except:
233 pass
235 if dd.journal_container is not None and dd.journal_filename is not None:
236 try:
237 container_files = store.list(dd.journal_container)
238 if dd.journal_filename not in container_files:
239 self.logger("File {x} in container {y} does not exist".format(x=dd.journal_filename, y=dd.journal_container))
240 journal_missing = True
241 except:
242 pass
244 if article_missing and journal_missing:
245 self.logger("Both files missing for {x}".format(x=dd.id))
246 dd.delete()
248 elif article_missing:
249 dd.remove_article_dump()
250 dd.save()
252 elif journal_missing:
253 dd.remove_journal_dump()
254 dd.save()
256 def delete_pdd(self, id):
257 dd = models.DataDump.pull(id)
258 if dd is None:
259 return False
261 store = StoreFactory.get(constants.STORE__SCOPE__PUBLIC_DATA_DUMP)
263 ac = dd.article_container
264 af = dd.article_filename
265 try:
266 store.delete_file(ac, af)
267 except:
268 pass
270 jc = dd.journal_container
271 jf = dd.journal_filename
272 try:
273 store.delete_file(jc, jf)
274 except:
275 pass
277 dd.delete()
278 return True
280 def _start_new_file(self, storage, container, typ, day_at_start, file_num):
281 filename = self._filename(typ, day_at_start, file_num)
282 output_file = storage.path(container, filename, create_container=True, must_exist=False)
283 dn = os.path.dirname(output_file)
284 if not os.path.exists(dn):
285 os.makedirs(dn)
286 self.logger("Saving to file {filename}".format(filename=filename))
288 out_file = open(output_file, "w", encoding="utf-8")
289 out_file.write("[")
290 return out_file, output_file, filename
292 def _filename(self, typ, day_at_start, file_num):
293 return os.path.join("doaj_" + typ + "_data_" + day_at_start, "{typ}_batch_{file_num}.json".format(typ=typ, file_num=file_num))
295 def _finish_file(self, storage, container, filename, path, out_file, tarball):
296 out_file.write("]")
297 out_file.close()
299 self.logger("Adding file {filename} to compressed tar".format(filename=filename))
300 tarball.add(path, arcname=filename)
301 storage.delete_file(container, filename)
303 def _copy_on_complete(self, mainStore, tmpStore, container, zipped_path):
304 zipped_size = os.path.getsize(zipped_path)
305 zipped_name = os.path.basename(zipped_path)
306 self.logger("Storing from temporary file {0} ({1} bytes) to container {2}".format(zipped_name, zipped_size, container))
307 mainStore.store(container, zipped_name, source_path=zipped_path)
308 tmpStore.delete_file(container, zipped_name)
309 return zipped_size