Coverage for portality/tasks/public_data_dump.py: 93%

154 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-19 18:38 +0100

1from portality import models 

2from portality.core import app 

3from portality.lib import dates 

4from portality.models import cache 

5 

6from portality.tasks.redis_huey import long_running, schedule 

7from portality.decorators import write_required 

8 

9from portality.background import BackgroundTask, BackgroundApi, BackgroundException 

10from portality.store import StoreFactory 

11from portality.api.current import DiscoveryApi 

12 

13import os, tarfile, json 

14 

15 

16class PublicDataDumpBackgroundTask(BackgroundTask): 

17 """ 

18 This task allows us to generate the public data dumps for the system. It provides a number of 

19 configuration options, and it is IMPORTANT to note that in production it MUST only be run with the 

20 following settings: 

21 

22 

23 types: all 

24 clean: False 

25 prune: True 

26 

27 If you run this in production with either `journal` or `article` as type, 

28 then any existing link to the other data type will be no longer available. 

29 

30 If you run this with clean set True, there is a chance that in the event of an error the live 

31 data will be deleted, and not replaced with new data. Better to prune after the 

32 new data has been generated instead. 

33 """ 

34 

35 __action__ = "public_data_dump" 

36 

37 def run(self): 

38 """ 

39 Execute the task as specified by the background_job 

40 :return: 

41 """ 

42 job = self.background_job 

43 params = job.params 

44 

45 clean = self.get_param(params, 'clean') 

46 prune = self.get_param(params, 'prune') 

47 types = self.get_param(params, 'types') 

48 

49 tmpStore = StoreFactory.tmp() 

50 mainStore = StoreFactory.get("public_data_dump") 

51 container = app.config.get("STORE_PUBLIC_DATA_DUMP_CONTAINER") 

52 

53 if clean: 

54 mainStore.delete_container(container) 

55 job.add_audit_message("Deleted existing data dump files") 

56 job.save() 

57 

58 # create dir with today's date 

59 day_at_start = dates.today() 

60 

61 # Do the search and save it 

62 page_size = app.config.get("DISCOVERY_BULK_PAGE_SIZE", 1000) 

63 records_per_file = app.config.get('DISCOVERY_RECORDS_PER_FILE', 100000) 

64 

65 if types == 'all': 

66 types = ['article', 'journal'] 

67 else: 

68 types = [types] 

69 

70 urls = {"article" : None, "journal" : None} 

71 sizes = {"article" : None, "journal" : None} 

72 

73 # Scroll for article and/or journal 

74 for typ in types: 

75 job.add_audit_message(dates.now() + ": Starting export of " + typ) 

76 job.save() 

77 

78 out_dir = tmpStore.path(container, "doaj_" + typ + "_data_" + day_at_start, create_container=True, must_exist=False) 

79 out_name = os.path.basename(out_dir) 

80 zipped_name = out_name + ".tar.gz" 

81 zip_dir = os.path.dirname(out_dir) 

82 zipped_path = os.path.join(zip_dir, zipped_name) 

83 tarball = tarfile.open(zipped_path, "w:gz") 

84 

85 file_num = 1 

86 out_file, path, filename = self._start_new_file(tmpStore, container, typ, day_at_start, file_num) 

87 

88 first_in_file = True 

89 count = 0 

90 for result in DiscoveryApi.scroll(typ, None, None, page_size, scan=True): 

91 if not first_in_file: 

92 out_file.write(",\n") 

93 else: 

94 first_in_file = False 

95 out_file.write(json.dumps(result)) 

96 count += 1 

97 

98 if count >= records_per_file: 

99 file_num += 1 

100 self._finish_file(tmpStore, container, filename, path, out_file, tarball) 

101 job.save() 

102 out_file, path, filename = self._start_new_file(tmpStore, container, typ, day_at_start, file_num) 

103 first_in_file = True 

104 count = 0 

105 

106 if count > 0: 

107 self._finish_file(tmpStore, container, filename, path, out_file, tarball) 

108 job.save() 

109 

110 tarball.close() 

111 

112 # Copy the source directory to main store 

113 try: 

114 filesize = self._copy_on_complete(mainStore, tmpStore, container, zipped_path) 

115 job.save() 

116 except Exception as e: 

117 tmpStore.delete_container(container) 

118 raise BackgroundException("Error copying {0} data on complete {1}\n".format(typ, str(e))) 

119 

120 store_url = mainStore.url(container, zipped_name) 

121 urls[typ] = store_url 

122 sizes[typ] = filesize 

123 

124 if prune: 

125 self._prune_container(mainStore, container, day_at_start, types) 

126 job.save() 

127 

128 self.background_job.add_audit_message("Removing temp store container {x}".format(x=container)) 

129 tmpStore.delete_container(container) 

130 

131 # finally update the cache 

132 cache.Cache.cache_public_data_dump(urls["article"], sizes["article"], urls["journal"], sizes["journal"]) 

133 

134 job.add_audit_message(dates.now() + ": done") 

135 

136 def _finish_file(self, storage, container, filename, path, out_file, tarball): 

137 out_file.write("]") 

138 out_file.close() 

139 

140 self.background_job.add_audit_message("Adding file {filename} to compressed tar".format(filename=filename)) 

141 tarball.add(path, arcname=filename) 

142 storage.delete_file(container, filename) 

143 

144 def _start_new_file(self, storage, container, typ, day_at_start, file_num): 

145 filename = self._filename(typ, day_at_start, file_num) 

146 output_file = storage.path(container, filename, create_container=True, must_exist=False) 

147 dn = os.path.dirname(output_file) 

148 if not os.path.exists(dn): 

149 os.makedirs(dn) 

150 self.background_job.add_audit_message("Saving to file {filename}".format(filename=filename)) 

151 

152 out_file = open(output_file, "w", encoding="utf-8") 

153 out_file.write("[") 

154 return out_file, output_file, filename 

155 

156 def _filename(self, typ, day_at_start, file_num): 

157 return os.path.join("doaj_" + typ + "_data_" + day_at_start, "{typ}_batch_{file_num}.json".format(typ=typ, file_num=file_num)) 

158 

159 def _tarball_name(self, typ, day_at_start): 

160 return "doaj_" + typ + "_data_" + day_at_start + ".tar.gz" 

161 

162 def _copy_on_complete(self, mainStore, tmpStore, container, zipped_path): 

163 zipped_size = os.path.getsize(zipped_path) 

164 zipped_name = os.path.basename(zipped_path) 

165 self.background_job.add_audit_message("Storing from temporary file {0} ({1} bytes) to container {2}".format(zipped_name, zipped_size, container)) 

166 mainStore.store(container, zipped_name, source_path=zipped_path) 

167 tmpStore.delete_file(container, zipped_name) 

168 return zipped_size 

169 

170 def _prune_container(self, mainStore, container, day_at_start, types): 

171 # Delete all files and dirs in the container that does not contain today's date 

172 files_for_today = [] 

173 for typ in types: 

174 files_for_today.append(self._tarball_name(typ, day_at_start)) 

175 

176 # get the files in storage 

177 container_files = mainStore.list(container) 

178 

179 # only delete if today's files exist 

180 found = 0 

181 for fn in files_for_today: 

182 if fn in container_files: 

183 found += 1 

184 

185 # only proceed if the files for today are present 

186 if found != len(files_for_today): 

187 self.background_job.add_audit_message("Files not pruned. One of {0} is missing".format(",".join(files_for_today))) 

188 return 

189 

190 # go through the container files and remove any that are not today's files 

191 for container_file in container_files: 

192 if container_file not in files_for_today: 

193 self.background_job.add_audit_message("Pruning old file {x} from storage container {y}".format(x=container_file, y=container)) 

194 mainStore.delete_file(container, container_file) 

195 

196 def cleanup(self): 

197 """ 

198 Cleanup after a successful OR failed run of the task 

199 :return: 

200 """ 

201 pass 

202 

203 @classmethod 

204 def prepare(cls, username, **kwargs): 

205 """ 

206 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob, 

207 or fail with a suitable exception 

208 

209 :param kwargs: arbitrary keyword arguments pertaining to this task type 

210 :return: a BackgroundJob instance representing this task 

211 """ 

212 params = {} 

213 cls.set_param(params, 'clean', False if "clean" not in kwargs else kwargs["clean"] if kwargs["clean"] is not None else False) 

214 cls.set_param(params, "prune", False if "prune" not in kwargs else kwargs["prune"] if kwargs["prune"] is not None else False) 

215 cls.set_param(params, "types", "all" if "types" not in kwargs else kwargs["types"] if kwargs["types"] in ["all", "journal", "article"] else "all") 

216 

217 container = app.config.get("STORE_PUBLIC_DATA_DUMP_CONTAINER") 

218 if container is None: 

219 raise BackgroundException("You must set STORE_PUBLIC_DATA_DUMP_CONTAINER in the config") 

220 

221 # first prepare a job record 

222 job = models.BackgroundJob() 

223 job.user = username 

224 job.action = cls.__action__ 

225 job.params = params 

226 return job 

227 

228 @classmethod 

229 def submit(cls, background_job): 

230 """ 

231 Submit the specified BackgroundJob to the background queue 

232 

233 :param background_job: the BackgroundJob instance 

234 :return: 

235 """ 

236 background_job.save() 

237 public_data_dump.schedule(args=(background_job.id,), delay=10) 

238 

239 

240@long_running.periodic_task(schedule("public_data_dump")) 

241@write_required(script=True) 

242def scheduled_public_data_dump(): 

243 user = app.config.get("SYSTEM_USERNAME") 

244 job = PublicDataDumpBackgroundTask.prepare(user, clean=False, prune=True, types="all") 

245 PublicDataDumpBackgroundTask.submit(job) 

246 

247 

248@long_running.task() 

249@write_required(script=True) 

250def public_data_dump(job_id): 

251 job = models.BackgroundJob.pull(job_id) 

252 task = PublicDataDumpBackgroundTask(job) 

253 BackgroundApi.execute(task)