Coverage for portality/tasks/anon_export.py: 82%

136 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-20 17:11 +0100

1import functools 

2import gzip 

3import os 

4import shutil 

5import uuid 

6from typing import Callable, NoReturn 

7 

8from portality import models, dao 

9from portality.background import BackgroundTask 

10from portality.core import app, es_connection 

11from portality.decorators import write_required 

12from portality.lib import dates 

13from portality.lib.anon import basic_hash, anon_email 

14from portality.lib.dataobj import DataStructureException 

15from portality.store import StoreFactory 

16from portality.tasks.helpers import background_helper 

17from portality.tasks.redis_huey import schedule, long_running 

18 

19 

20def _anonymise_email(record): 

21 record.set_email(anon_email(record.email)) 

22 return record 

23 

24 

25def _anonymise_admin(record): 

26 for note in record.notes[:]: 

27 record.remove_note(note) 

28 record.add_note(basic_hash(note['note']), id=note["id"]) 

29 

30 return record 

31 

32 

33def _reset_api_key(record): 

34 if record.api_key is not None: 

35 record.generate_api_key() 

36 return record 

37 

38 

39def _reset_password(record): 

40 record.set_password(uuid.uuid4().hex) 

41 return record 

42 

43 

44# transform functions - return the JSON data source since 

45# esprit doesn't understand our model classes 

46def anonymise_account(record): 

47 try: 

48 a = models.Account(**record) 

49 except DataStructureException: 

50 return record 

51 

52 a = _anonymise_email(a) 

53 a = _reset_api_key(a) 

54 a = _reset_password(a) 

55 

56 return a.data 

57 

58 

59def anonymise_journal(record): 

60 try: 

61 j = models.Journal(**record) 

62 except DataStructureException: 

63 return record 

64 

65 return _anonymise_admin(j).data 

66 

67 

68def anonymise_suggestion(record): 

69 try: 

70 sug = models.Suggestion(**record) 

71 except DataStructureException: 

72 return record 

73 

74 sug = _anonymise_admin(sug) 

75 return sug.data 

76 

77 

78def anonymise_background_job(record): 

79 try: 

80 bgjob = models.BackgroundJob(**record) 

81 except DataStructureException: 

82 return record 

83 

84 if bgjob.params and 'suggestion_bulk_edit__note' in bgjob.params: 

85 bgjob.params['suggestion_bulk_edit__note'] = basic_hash(bgjob.params['suggestion_bulk_edit__note']) 

86 

87 return bgjob.data 

88 

89 

90anonymisation_procedures = { 

91 'account': anonymise_account, 

92 'background_job': anonymise_background_job, 

93 'journal': anonymise_journal, 

94 'suggestion': anonymise_suggestion 

95} 

96 

97 

98def _copy_on_complete(path, logger_fn, tmpStore, mainStore, container): 

99 name = os.path.basename(path) 

100 raw_size = os.path.getsize(path) 

101 logger_fn(("Compressing temporary file {x} (from {y} bytes)".format(x=path, y=raw_size))) 

102 zipped_name = name + ".gz" 

103 dir = os.path.dirname(path) 

104 zipped_path = os.path.join(dir, zipped_name) 

105 with open(path, "rb") as f_in, gzip.open(zipped_path, "wb") as f_out: 

106 shutil.copyfileobj(f_in, f_out) 

107 zipped_size = os.path.getsize(zipped_path) 

108 logger_fn(("Storing from temporary file {x} ({y} bytes)".format(x=zipped_name, y=zipped_size))) 

109 mainStore.store(container, name, source_path=zipped_path) 

110 tmpStore.delete_file(container, name) 

111 tmpStore.delete_file(container, zipped_name) 

112 

113 

114def run_anon_export(tmpStore, mainStore, container, clean=False, limit=None, batch_size=100000, 

115 logger_fn: Callable[[str], NoReturn] = None): 

116 if logger_fn is None: 

117 logger_fn = print 

118 if clean: 

119 mainStore.delete_container(container) 

120 

121 doaj_types = es_connection.indices.get(app.config['ELASTIC_SEARCH_DB_PREFIX'] + '*') 

122 type_list = [t[len(app.config['ELASTIC_SEARCH_DB_PREFIX']):] for t in doaj_types] 

123 

124 for type_ in type_list: 

125 model = models.lookup_models_by_type(type_, dao.DomainObject) 

126 if not model: 

127 logger_fn("unable to locate model for " + type_) 

128 continue 

129 

130 filename = type_ + ".bulk" 

131 output_file = tmpStore.path(container, filename, create_container=True, must_exist=False) 

132 logger_fn((dates.now() + " " + type_ + " => " + output_file + ".*")) 

133 iter_q = {"query": {"match_all": {}}, "sort": [{"_id": {"order": "asc"}}]} 

134 transform = None 

135 if type_ in anonymisation_procedures: 

136 transform = anonymisation_procedures[type_] 

137 

138 # Use the model's dump method to write out this type to file 

139 out_rollover_fn = functools.partial(_copy_on_complete, logger_fn=logger_fn, tmpStore=tmpStore, mainStore=mainStore, container=container) 

140 _ = model.dump(q=iter_q, limit=limit, transform=transform, out_template=output_file, out_batch_sizes=batch_size, 

141 out_rollover_callback=out_rollover_fn, es_bulk_fields=["_id"], scroll_keepalive='3m') 

142 

143 logger_fn((dates.now() + " done\n")) 

144 

145 tmpStore.delete_container(container) 

146 

147 

148def get_value_safe(key, default_v, kwargs, default_cond_fn=None): 

149 v = kwargs.get(key, default_v) 

150 default_cond_fn = default_cond_fn or (lambda _v: _v is None) 

151 if default_cond_fn(v): 

152 v = default_v 

153 return v 

154 

155 

156class AnonExportBackgroundTask(BackgroundTask): 

157 __action__ = "anon_export" 

158 

159 def run(self): 

160 kwargs = {k: self.get_param(self.background_job.params, k) 

161 for k in ['clean', 'limit', 'batch_size']} 

162 kwargs['logger_fn'] = self.background_job.add_audit_message 

163 

164 tmpStore = StoreFactory.tmp() 

165 mainStore = StoreFactory.get("anon_data") 

166 container = app.config.get("STORE_ANON_DATA_CONTAINER") 

167 

168 run_anon_export(tmpStore, mainStore, container, **kwargs) 

169 self.background_job.add_audit_message("Anon export completed") 

170 

171 def cleanup(self): 

172 pass 

173 

174 @classmethod 

175 def prepare(cls, username, **kwargs): 

176 params = {} 

177 cls.set_param(params, 'clean', get_value_safe('clean', False, kwargs)) 

178 cls.set_param(params, "limit", kwargs.get('limit')) 

179 cls.set_param(params, "batch_size", get_value_safe('batch_size', 100000, kwargs)) 

180 return background_helper.create_job(username=username, 

181 action=cls.__action__, 

182 params=params) 

183 

184 @classmethod 

185 def submit(cls, background_job): 

186 background_job.save() 

187 anon_export.schedule(args=(background_job.id,), delay=10) 

188 

189 

190@long_running.periodic_task(schedule(AnonExportBackgroundTask.__action__)) 

191@write_required(script=True) 

192def scheduled_anon_export(): 

193 background_helper.submit_by_bg_task_type(AnonExportBackgroundTask, 

194 clean=app.config.get("TASKS_ANON_EXPORT_CLEAN", False), 

195 limit=app.config.get("TASKS_ANON_EXPORT_LIMIT", None), 

196 batch_size=app.config.get("TASKS_ANON_EXPORT_BATCH_SIZE", 100000)) 

197 

198 

199@long_running.task() 

200@write_required(script=True) 

201def anon_export(job_id): 

202 background_helper.execute_by_job_id(job_id, AnonExportBackgroundTask)