Coverage for portality / tasks / anon_export.py: 81%

150 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-04 09:41 +0100

1import functools 

2import gzip 

3import os 

4import shutil 

5import uuid 

6from typing import Callable 

7from werkzeug.security import generate_password_hash 

8 

9from portality import models, dao 

10from portality.background import BackgroundTask 

11from portality.core import app, es_connection 

12from portality.lib import dates 

13from portality.lib.anon import basic_hash, anon_email 

14from portality.lib.dataobj import DataStructureException 

15from portality.store import StoreFactory 

16from portality.tasks.helpers import background_helper 

17from portality.tasks.redis_huey import scheduled_long_queue as queue 

18 

19email_subs = {} 

20email_counter = 0 

21password = None 

22 

23 

24def _anonymise_email(record): 

25 if record.email not in email_subs: 

26 global email_counter 

27 email_counter += 1 

28 email_subs[record.email] = str(email_counter) + "@example.com" 

29 record.set_email(email_subs[record.email]) 

30 return record 

31 

32 

33def _anonymise_admin(record): 

34 for note in record.notes[:]: 

35 record.remove_note(note) 

36 record.add_note("---note removed for data security---", id=note["id"]) 

37 

38 return record 

39 

40 

41def _reset_api_key(record): 

42 if record.api_key is not None: 

43 record.generate_api_key() 

44 return record 

45 

46 

47def _reset_password(record): 

48 global password 

49 if password is None: 

50 password = generate_password_hash(uuid.uuid4().hex) 

51 record.set_password_hash(password) 

52 return record 

53 

54 

55# transform functions - return the JSON data source since 

56# esprit doesn't understand our model classes 

57def anonymise_account(record): 

58 try: 

59 a = models.Account(**record) 

60 except DataStructureException: 

61 return record 

62 

63 a = _anonymise_email(a) 

64 a = _reset_api_key(a) 

65 a = _reset_password(a) 

66 

67 return a.data 

68 

69 

70def anonymise_journal(record): 

71 try: 

72 j = models.Journal(**record) 

73 except DataStructureException: 

74 return record 

75 

76 return _anonymise_admin(j).data 

77 

78 

79def anonymise_application(record): 

80 try: 

81 appl = models.Application(**record) 

82 except DataStructureException: 

83 return record 

84 

85 appl = _anonymise_admin(appl) 

86 return appl.data 

87 

88 

89def anonymise_background_job(record): 

90 try: 

91 bgjob = models.BackgroundJob(**record) 

92 except DataStructureException: 

93 return record 

94 

95 if bgjob.params and 'suggestion_bulk_edit__note' in bgjob.params: 

96 bgjob.params['suggestion_bulk_edit__note'] = basic_hash(bgjob.params['suggestion_bulk_edit__note']) 

97 

98 return bgjob.data 

99 

100 

101anonymisation_procedures = { 

102 'account': anonymise_account, 

103 'background_job': anonymise_background_job, 

104 'journal': anonymise_journal, 

105 'application': anonymise_application 

106} 

107 

108# types that should use prefix queries to optimise performance for bulk exporting 

109striped = { 

110 # "application": True, 

111 "article": True 

112} 

113 

114skip = [] 

115 

116 

117def _copy_on_complete(path, logger_fn, tmpStore, mainStore, container): 

118 name = os.path.basename(path) 

119 raw_size = os.path.getsize(path) 

120 logger_fn(("Compressing temporary file {x} (from {y} bytes)".format(x=path, y=raw_size))) 

121 zipped_name = name + ".gz" 

122 dir = os.path.dirname(path) 

123 zipped_path = os.path.join(dir, zipped_name) 

124 with open(path, "rb") as f_in, gzip.open(zipped_path, "wb") as f_out: 

125 shutil.copyfileobj(f_in, f_out) 

126 zipped_size = os.path.getsize(zipped_path) 

127 logger_fn(("Storing from temporary file {x} ({y} bytes)".format(x=zipped_name, y=zipped_size))) 

128 mainStore.store(container, name, source_path=zipped_path) 

129 tmpStore.delete_file(container, name) 

130 tmpStore.delete_file(container, zipped_name) 

131 

132 

133def run_anon_export(tmpStore, mainStore, container, clean=False, limit=None, batch_size=100000, 

134 logger_fn: Callable[[str], None] = None): 

135 if logger_fn is None: 

136 logger_fn = print 

137 if clean: 

138 mainStore.delete_container(container) 

139 

140 doaj_types = es_connection.indices.get(app.config['ELASTIC_SEARCH_DB_PREFIX'] + '*') 

141 true_names = [] 

142 for t, d in doaj_types.items(): 

143 if "aliases" in d: 

144 aliases = list(d["aliases"].keys()) 

145 if len(aliases) > 0: 

146 true_names.append(aliases[0]) 

147 else: 

148 true_names.append(t) 

149 else: 

150 true_names.append(t) 

151 

152 # aliases = es_connection.indices.get_alias(index=app.config['ELASTIC_SEARCH_DB_PREFIX'] + "*") 

153 # aliased_types = [item for sublist in [list(v.get("aliases").keys()) for k, v in aliases.items()] for item in sublist] 

154 # prefixed_types = list(set(doaj_types + aliased_types)) 

155 # type_list = [t[len(app.config['ELASTIC_SEARCH_DB_PREFIX']):] for t in doaj_types] + [a[len(app.config['ELASTIC_SEARCH_DB_PREFIX']):] for a in aliases] 

156 type_list = [a[len(app.config['ELASTIC_SEARCH_DB_PREFIX']):] for a in true_names] 

157 

158 for type_ in type_list: 

159 if type_ in skip: 

160 logger_fn("skipping " + type_) 

161 continue 

162 model = models.lookup_models_by_type(type_, dao.DomainObject) 

163 if not model: 

164 logger_fn("unable to locate model for " + type_) 

165 continue 

166 

167 filename = type_ + ".bulk" 

168 output_file = tmpStore.path(container, filename, create_container=True, must_exist=False) 

169 logger_fn((dates.now_str() + " " + type_ + " => " + output_file + ".*")) 

170 # iter_q = {"query": {"match_all": {}}, "sort": [{"_id": {"order": "asc"}}]} 

171 transform = None 

172 if type_ in anonymisation_procedures: 

173 transform = anonymisation_procedures[type_] 

174 

175 # Use the model's dump method to write out this type to file 

176 out_rollover_fn = functools.partial(_copy_on_complete, logger_fn=logger_fn, tmpStore=tmpStore, 

177 mainStore=mainStore, container=container) 

178 

179 s = striped.get(type_, False) 

180 _ = model.dump(limit=limit, transform=transform, out_template=output_file, out_batch_sizes=batch_size, 

181 out_rollover_callback=out_rollover_fn, es_bulk_fields=["_id"], striped=s, prefix_size=3, logger=logger_fn) 

182 

183 logger_fn((dates.now_str() + " done\n")) 

184 

185 tmpStore.delete_container(container) 

186 

187 

188class AnonExportBackgroundTask(BackgroundTask): 

189 """ 

190 ~~AnonExport:Feature->BackgroundTask:Process~~ 

191 """ 

192 __action__ = "anon_export" 

193 

194 def run(self): 

195 kwargs = self.get_bgjob_params() 

196 kwargs['logger_fn'] = self.background_job.add_audit_message 

197 

198 tmpStore = StoreFactory.tmp() 

199 mainStore = StoreFactory.get("anon_data") 

200 container = app.config.get("STORE_ANON_DATA_CONTAINER") 

201 

202 run_anon_export(tmpStore, mainStore, container, **kwargs) 

203 self.background_job.add_audit_message("Anon export completed") 

204 

205 def cleanup(self): 

206 pass 

207 

208 @classmethod 

209 def prepare(cls, username, **kwargs): 

210 params = {} 

211 cls.set_param(params, 'clean', background_helper.get_value_safe('clean', False, kwargs)) 

212 cls.set_param(params, "limit", kwargs.get('limit')) 

213 cls.set_param(params, "batch_size", background_helper.get_value_safe('batch_size', 100000, kwargs)) 

214 return background_helper.create_job(username=username, 

215 action=cls.__action__, 

216 params=params, 

217 queue_id=huey_helper.queue_id, ) 

218 

219 @classmethod 

220 def submit(cls, background_job): 

221 background_job.save() 

222 anon_export.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) 

223 

224 

225huey_helper = AnonExportBackgroundTask.create_huey_helper(queue) 

226 

227 

228@huey_helper.register_schedule 

229def scheduled_anon_export(): 

230 background_helper.submit_by_bg_task_type(AnonExportBackgroundTask, 

231 clean=app.config.get("TASKS_ANON_EXPORT_CLEAN", False), 

232 limit=app.config.get("TASKS_ANON_EXPORT_LIMIT", None), 

233 batch_size=app.config.get("TASKS_ANON_EXPORT_BATCH_SIZE", 100000)) 

234 

235 

236@huey_helper.register_execute(is_load_config=False) 

237def anon_export(job_id): 

238 background_helper.execute_by_job_id(job_id, AnonExportBackgroundTask)