Coverage for portality/tasks/public_data_dump.py: 93%
154 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-22 15:59 +0100
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-22 15:59 +0100
1from portality import models
2from portality.core import app
3from portality.lib import dates
4from portality.models import cache
6from portality.tasks.redis_huey import long_running, schedule
7from portality.decorators import write_required
9from portality.background import BackgroundTask, BackgroundApi, BackgroundException
10from portality.store import StoreFactory
11from portality.api.current import DiscoveryApi
13import os, tarfile, json
16class PublicDataDumpBackgroundTask(BackgroundTask):
17 """
18 This task allows us to generate the public data dumps for the system. It provides a number of
19 configuration options, and it is IMPORTANT to note that in production it MUST only be run with the
20 following settings:
23 types: all
24 clean: False
25 prune: True
27 If you run this in production with either `journal` or `article` as type,
28 then any existing link to the other data type will be no longer available.
30 If you run this with clean set True, there is a chance that in the event of an error the live
31 data will be deleted, and not replaced with new data. Better to prune after the
32 new data has been generated instead.
33 """
35 __action__ = "public_data_dump"
37 def run(self):
38 """
39 Execute the task as specified by the background_job
40 :return:
41 """
42 job = self.background_job
43 params = job.params
45 clean = self.get_param(params, 'clean')
46 prune = self.get_param(params, 'prune')
47 types = self.get_param(params, 'types')
49 tmpStore = StoreFactory.tmp()
50 mainStore = StoreFactory.get("public_data_dump")
51 container = app.config.get("STORE_PUBLIC_DATA_DUMP_CONTAINER")
53 if clean:
54 mainStore.delete_container(container)
55 job.add_audit_message("Deleted existing data dump files")
56 job.save()
58 # create dir with today's date
59 day_at_start = dates.today()
61 # Do the search and save it
62 page_size = app.config.get("DISCOVERY_BULK_PAGE_SIZE", 1000)
63 records_per_file = app.config.get('DISCOVERY_RECORDS_PER_FILE', 100000)
65 if types == 'all':
66 types = ['article', 'journal']
67 else:
68 types = [types]
70 urls = {"article" : None, "journal" : None}
71 sizes = {"article" : None, "journal" : None}
73 # Scroll for article and/or journal
74 for typ in types:
75 job.add_audit_message(dates.now() + ": Starting export of " + typ)
76 job.save()
78 out_dir = tmpStore.path(container, "doaj_" + typ + "_data_" + day_at_start, create_container=True, must_exist=False)
79 out_name = os.path.basename(out_dir)
80 zipped_name = out_name + ".tar.gz"
81 zip_dir = os.path.dirname(out_dir)
82 zipped_path = os.path.join(zip_dir, zipped_name)
83 tarball = tarfile.open(zipped_path, "w:gz")
85 file_num = 1
86 out_file, path, filename = self._start_new_file(tmpStore, container, typ, day_at_start, file_num)
88 first_in_file = True
89 count = 0
90 for result in DiscoveryApi.scroll(typ, None, None, page_size, scan=True):
91 if not first_in_file:
92 out_file.write(",\n")
93 else:
94 first_in_file = False
95 out_file.write(json.dumps(result))
96 count += 1
98 if count >= records_per_file:
99 file_num += 1
100 self._finish_file(tmpStore, container, filename, path, out_file, tarball)
101 job.save()
102 out_file, path, filename = self._start_new_file(tmpStore, container, typ, day_at_start, file_num)
103 first_in_file = True
104 count = 0
106 if count > 0:
107 self._finish_file(tmpStore, container, filename, path, out_file, tarball)
108 job.save()
110 tarball.close()
112 # Copy the source directory to main store
113 try:
114 filesize = self._copy_on_complete(mainStore, tmpStore, container, zipped_path)
115 job.save()
116 except Exception as e:
117 tmpStore.delete_container(container)
118 raise BackgroundException("Error copying {0} data on complete {1}\n".format(typ, str(e)))
120 store_url = mainStore.url(container, zipped_name)
121 urls[typ] = store_url
122 sizes[typ] = filesize
124 if prune:
125 self._prune_container(mainStore, container, day_at_start, types)
126 job.save()
128 self.background_job.add_audit_message("Removing temp store container {x}".format(x=container))
129 tmpStore.delete_container(container)
131 # finally update the cache
132 cache.Cache.cache_public_data_dump(urls["article"], sizes["article"], urls["journal"], sizes["journal"])
134 job.add_audit_message(dates.now() + ": done")
136 def _finish_file(self, storage, container, filename, path, out_file, tarball):
137 out_file.write("]")
138 out_file.close()
140 self.background_job.add_audit_message("Adding file {filename} to compressed tar".format(filename=filename))
141 tarball.add(path, arcname=filename)
142 storage.delete_file(container, filename)
144 def _start_new_file(self, storage, container, typ, day_at_start, file_num):
145 filename = self._filename(typ, day_at_start, file_num)
146 output_file = storage.path(container, filename, create_container=True, must_exist=False)
147 dn = os.path.dirname(output_file)
148 if not os.path.exists(dn):
149 os.makedirs(dn)
150 self.background_job.add_audit_message("Saving to file {filename}".format(filename=filename))
152 out_file = open(output_file, "w", encoding="utf-8")
153 out_file.write("[")
154 return out_file, output_file, filename
156 def _filename(self, typ, day_at_start, file_num):
157 return os.path.join("doaj_" + typ + "_data_" + day_at_start, "{typ}_batch_{file_num}.json".format(typ=typ, file_num=file_num))
159 def _tarball_name(self, typ, day_at_start):
160 return "doaj_" + typ + "_data_" + day_at_start + ".tar.gz"
162 def _copy_on_complete(self, mainStore, tmpStore, container, zipped_path):
163 zipped_size = os.path.getsize(zipped_path)
164 zipped_name = os.path.basename(zipped_path)
165 self.background_job.add_audit_message("Storing from temporary file {0} ({1} bytes) to container {2}".format(zipped_name, zipped_size, container))
166 mainStore.store(container, zipped_name, source_path=zipped_path)
167 tmpStore.delete_file(container, zipped_name)
168 return zipped_size
170 def _prune_container(self, mainStore, container, day_at_start, types):
171 # Delete all files and dirs in the container that does not contain today's date
172 files_for_today = []
173 for typ in types:
174 files_for_today.append(self._tarball_name(typ, day_at_start))
176 # get the files in storage
177 container_files = mainStore.list(container)
179 # only delete if today's files exist
180 found = 0
181 for fn in files_for_today:
182 if fn in container_files:
183 found += 1
185 # only proceed if the files for today are present
186 if found != len(files_for_today):
187 self.background_job.add_audit_message("Files not pruned. One of {0} is missing".format(",".join(files_for_today)))
188 return
190 # go through the container files and remove any that are not today's files
191 for container_file in container_files:
192 if container_file not in files_for_today:
193 self.background_job.add_audit_message("Pruning old file {x} from storage container {y}".format(x=container_file, y=container))
194 mainStore.delete_file(container, container_file)
196 def cleanup(self):
197 """
198 Cleanup after a successful OR failed run of the task
199 :return:
200 """
201 pass
203 @classmethod
204 def prepare(cls, username, **kwargs):
205 """
206 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob,
207 or fail with a suitable exception
209 :param kwargs: arbitrary keyword arguments pertaining to this task type
210 :return: a BackgroundJob instance representing this task
211 """
212 params = {}
213 cls.set_param(params, 'clean', False if "clean" not in kwargs else kwargs["clean"] if kwargs["clean"] is not None else False)
214 cls.set_param(params, "prune", False if "prune" not in kwargs else kwargs["prune"] if kwargs["prune"] is not None else False)
215 cls.set_param(params, "types", "all" if "types" not in kwargs else kwargs["types"] if kwargs["types"] in ["all", "journal", "article"] else "all")
217 container = app.config.get("STORE_PUBLIC_DATA_DUMP_CONTAINER")
218 if container is None:
219 raise BackgroundException("You must set STORE_PUBLIC_DATA_DUMP_CONTAINER in the config")
221 # first prepare a job record
222 job = models.BackgroundJob()
223 job.user = username
224 job.action = cls.__action__
225 job.params = params
226 return job
228 @classmethod
229 def submit(cls, background_job):
230 """
231 Submit the specified BackgroundJob to the background queue
233 :param background_job: the BackgroundJob instance
234 :return:
235 """
236 background_job.save()
237 public_data_dump.schedule(args=(background_job.id,), delay=10)
240@long_running.periodic_task(schedule("public_data_dump"))
241@write_required(script=True)
242def scheduled_public_data_dump():
243 user = app.config.get("SYSTEM_USERNAME")
244 job = PublicDataDumpBackgroundTask.prepare(user, clean=False, prune=True, types="all")
245 PublicDataDumpBackgroundTask.submit(job)
248@long_running.task()
249@write_required(script=True)
250def public_data_dump(job_id):
251 job = models.BackgroundJob.pull(job_id)
252 task = PublicDataDumpBackgroundTask(job)
253 BackgroundApi.execute(task)