Coverage for portality / tasks / anon_export.py: 78%
150 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
1import functools
2import gzip
3import os
4import shutil
5import uuid
6from typing import Callable
7from werkzeug.security import generate_password_hash
9from portality import models, dao
10from portality.background import BackgroundTask
11from portality.core import app, es_connection
12from portality.lib import dates
13from portality.lib.anon import basic_hash, anon_email
14from portality.lib.dataobj import DataStructureException
15from portality.store import StoreFactory
16from portality.tasks.helpers import background_helper
17from portality.tasks.redis_huey import scheduled_long_queue as queue
19email_subs = {}
20email_counter = 0
21password = None
24def _anonymise_email(record):
25 if record.email not in email_subs:
26 global email_counter
27 email_counter += 1
28 email_subs[record.email] = str(email_counter) + "@example.com"
29 record.set_email(email_subs[record.email])
30 return record
33def _anonymise_admin(record):
34 for note in record.notes[:]:
35 record.remove_note(note)
36 record.add_note("---note removed for data security---", id=note["id"])
38 return record
41def _reset_api_key(record):
42 if record.api_key is not None:
43 record.generate_api_key()
44 return record
47def _reset_password(record):
48 global password
49 if password is None:
50 password = generate_password_hash(uuid.uuid4().hex)
51 record.set_password_hash(password)
52 return record
55# transform functions - return the JSON data source since
56# esprit doesn't understand our model classes
57def anonymise_account(record):
58 try:
59 a = models.Account(**record)
60 except DataStructureException:
61 return record
63 a = _anonymise_email(a)
64 a = _reset_api_key(a)
65 a = _reset_password(a)
67 return a.data
70def anonymise_journal(record):
71 try:
72 j = models.Journal(**record)
73 except DataStructureException:
74 return record
76 return _anonymise_admin(j).data
79def anonymise_application(record):
80 try:
81 appl = models.Application(**record)
82 except DataStructureException:
83 return record
85 appl = _anonymise_admin(appl)
86 return appl.data
89def anonymise_background_job(record):
90 try:
91 bgjob = models.BackgroundJob(**record)
92 except DataStructureException:
93 return record
95 if bgjob.params and 'suggestion_bulk_edit__note' in bgjob.params:
96 bgjob.params['suggestion_bulk_edit__note'] = basic_hash(bgjob.params['suggestion_bulk_edit__note'])
98 return bgjob.data
101anonymisation_procedures = {
102 'account': anonymise_account,
103 'background_job': anonymise_background_job,
104 'journal': anonymise_journal,
105 'application': anonymise_application
106}
108# types that should use prefix queries to optimise performance for bulk exporting
109striped = {
110 # "application": True,
111 "article": True
112}
114skip = []
117def _copy_on_complete(path, logger_fn, tmpStore, mainStore, container):
118 name = os.path.basename(path)
119 raw_size = os.path.getsize(path)
120 logger_fn(("Compressing temporary file {x} (from {y} bytes)".format(x=path, y=raw_size)))
121 zipped_name = name + ".gz"
122 dir = os.path.dirname(path)
123 zipped_path = os.path.join(dir, zipped_name)
124 with open(path, "rb") as f_in, gzip.open(zipped_path, "wb") as f_out:
125 shutil.copyfileobj(f_in, f_out)
126 zipped_size = os.path.getsize(zipped_path)
127 logger_fn(("Storing from temporary file {x} ({y} bytes)".format(x=zipped_name, y=zipped_size)))
128 mainStore.store(container, name, source_path=zipped_path)
129 tmpStore.delete_file(container, name)
130 tmpStore.delete_file(container, zipped_name)
133def run_anon_export(tmpStore, mainStore, container, clean=False, limit=None, batch_size=100000,
134 logger_fn: Callable[[str], None] = None):
135 if logger_fn is None:
136 logger_fn = print
137 if clean:
138 mainStore.delete_container(container)
140 doaj_types = es_connection.indices.get(app.config['ELASTIC_SEARCH_DB_PREFIX'] + '*')
141 true_names = []
142 for t, d in doaj_types.items():
143 if "aliases" in d:
144 aliases = list(d["aliases"].keys())
145 if len(aliases) > 0:
146 true_names.append(aliases[0])
147 else:
148 true_names.append(t)
149 else:
150 true_names.append(t)
152 # aliases = es_connection.indices.get_alias(index=app.config['ELASTIC_SEARCH_DB_PREFIX'] + "*")
153 # aliased_types = [item for sublist in [list(v.get("aliases").keys()) for k, v in aliases.items()] for item in sublist]
154 # prefixed_types = list(set(doaj_types + aliased_types))
155 # type_list = [t[len(app.config['ELASTIC_SEARCH_DB_PREFIX']):] for t in doaj_types] + [a[len(app.config['ELASTIC_SEARCH_DB_PREFIX']):] for a in aliases]
156 type_list = [a[len(app.config['ELASTIC_SEARCH_DB_PREFIX']):] for a in true_names]
158 for type_ in type_list:
159 if type_ in skip:
160 logger_fn("skipping " + type_)
161 continue
162 model = models.lookup_models_by_type(type_, dao.DomainObject)
163 if not model:
164 logger_fn("unable to locate model for " + type_)
165 continue
167 filename = type_ + ".bulk"
168 output_file = tmpStore.path(container, filename, create_container=True, must_exist=False)
169 logger_fn((dates.now_str() + " " + type_ + " => " + output_file + ".*"))
170 # iter_q = {"query": {"match_all": {}}, "sort": [{"_id": {"order": "asc"}}]}
171 transform = None
172 if type_ in anonymisation_procedures:
173 transform = anonymisation_procedures[type_]
175 # Use the model's dump method to write out this type to file
176 out_rollover_fn = functools.partial(_copy_on_complete, logger_fn=logger_fn, tmpStore=tmpStore,
177 mainStore=mainStore, container=container)
179 s = striped.get(type_, False)
180 _ = model.dump(limit=limit, transform=transform, out_template=output_file, out_batch_sizes=batch_size,
181 out_rollover_callback=out_rollover_fn, es_bulk_fields=["_id"], striped=s, prefix_size=3, logger=logger_fn)
183 logger_fn((dates.now_str() + " done\n"))
185 tmpStore.delete_container(container)
188class AnonExportBackgroundTask(BackgroundTask):
189 """
190 ~~AnonExport:Feature->BackgroundTask:Process~~
191 """
192 __action__ = "anon_export"
194 def run(self):
195 kwargs = self.get_bgjob_params()
196 kwargs['logger_fn'] = self.background_job.add_audit_message
198 tmpStore = StoreFactory.tmp()
199 mainStore = StoreFactory.get("anon_data")
200 container = app.config.get("STORE_ANON_DATA_CONTAINER")
202 run_anon_export(tmpStore, mainStore, container, **kwargs)
203 self.background_job.add_audit_message("Anon export completed")
205 def cleanup(self):
206 pass
208 @classmethod
209 def prepare(cls, username, **kwargs):
210 params = {}
211 cls.set_param(params, 'clean', background_helper.get_value_safe('clean', False, kwargs))
212 cls.set_param(params, "limit", kwargs.get('limit'))
213 cls.set_param(params, "batch_size", background_helper.get_value_safe('batch_size', 100000, kwargs))
214 return background_helper.create_job(username=username,
215 action=cls.__action__,
216 params=params,
217 queue_id=huey_helper.queue_id, )
219 @classmethod
220 def submit(cls, background_job):
221 background_job.save()
222 anon_export.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10))
225huey_helper = AnonExportBackgroundTask.create_huey_helper(queue)
228@huey_helper.register_schedule
229def scheduled_anon_export():
230 background_helper.submit_by_bg_task_type(AnonExportBackgroundTask,
231 clean=app.config.get("TASKS_ANON_EXPORT_CLEAN", False),
232 limit=app.config.get("TASKS_ANON_EXPORT_LIMIT", None),
233 batch_size=app.config.get("TASKS_ANON_EXPORT_BATCH_SIZE", 100000))
236@huey_helper.register_execute(is_load_config=False)
237def anon_export(job_id):
238 background_helper.execute_by_job_id(job_id, AnonExportBackgroundTask)