Coverage for portality/tasks/anon_export.py: 82%
136 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-11-09 15:10 +0000
« prev ^ index » next coverage.py v6.4.2, created at 2022-11-09 15:10 +0000
1import functools
2import gzip
3import os
4import shutil
5import uuid
6from typing import Callable, NoReturn
8from portality import models, dao
9from portality.background import BackgroundTask
10from portality.core import app, es_connection
11from portality.decorators import write_required
12from portality.lib import dates
13from portality.lib.anon import basic_hash, anon_email
14from portality.lib.dataobj import DataStructureException
15from portality.store import StoreFactory
16from portality.tasks.helpers import background_helper
17from portality.tasks.redis_huey import schedule, long_running
20def _anonymise_email(record):
21 record.set_email(anon_email(record.email))
22 return record
25def _anonymise_admin(record):
26 for note in record.notes[:]:
27 record.remove_note(note)
28 record.add_note(basic_hash(note['note']), id=note["id"])
30 return record
33def _reset_api_key(record):
34 if record.api_key is not None:
35 record.generate_api_key()
36 return record
39def _reset_password(record):
40 record.set_password(uuid.uuid4().hex)
41 return record
44# transform functions - return the JSON data source since
45# esprit doesn't understand our model classes
46def anonymise_account(record):
47 try:
48 a = models.Account(**record)
49 except DataStructureException:
50 return record
52 a = _anonymise_email(a)
53 a = _reset_api_key(a)
54 a = _reset_password(a)
56 return a.data
59def anonymise_journal(record):
60 try:
61 j = models.Journal(**record)
62 except DataStructureException:
63 return record
65 return _anonymise_admin(j).data
68def anonymise_suggestion(record):
69 try:
70 sug = models.Suggestion(**record)
71 except DataStructureException:
72 return record
74 sug = _anonymise_admin(sug)
75 return sug.data
78def anonymise_background_job(record):
79 try:
80 bgjob = models.BackgroundJob(**record)
81 except DataStructureException:
82 return record
84 if bgjob.params and 'suggestion_bulk_edit__note' in bgjob.params:
85 bgjob.params['suggestion_bulk_edit__note'] = basic_hash(bgjob.params['suggestion_bulk_edit__note'])
87 return bgjob.data
90anonymisation_procedures = {
91 'account': anonymise_account,
92 'background_job': anonymise_background_job,
93 'journal': anonymise_journal,
94 'suggestion': anonymise_suggestion
95}
98def _copy_on_complete(path, logger_fn, tmpStore, mainStore, container):
99 name = os.path.basename(path)
100 raw_size = os.path.getsize(path)
101 logger_fn(("Compressing temporary file {x} (from {y} bytes)".format(x=path, y=raw_size)))
102 zipped_name = name + ".gz"
103 dir = os.path.dirname(path)
104 zipped_path = os.path.join(dir, zipped_name)
105 with open(path, "rb") as f_in, gzip.open(zipped_path, "wb") as f_out:
106 shutil.copyfileobj(f_in, f_out)
107 zipped_size = os.path.getsize(zipped_path)
108 logger_fn(("Storing from temporary file {x} ({y} bytes)".format(x=zipped_name, y=zipped_size)))
109 mainStore.store(container, name, source_path=zipped_path)
110 tmpStore.delete_file(container, name)
111 tmpStore.delete_file(container, zipped_name)
114def run_anon_export(tmpStore, mainStore, container, clean=False, limit=None, batch_size=100000,
115 logger_fn: Callable[[str], NoReturn] = None):
116 if logger_fn is None:
117 logger_fn = print
118 if clean:
119 mainStore.delete_container(container)
121 doaj_types = es_connection.indices.get(app.config['ELASTIC_SEARCH_DB_PREFIX'] + '*')
122 type_list = [t[len(app.config['ELASTIC_SEARCH_DB_PREFIX']):] for t in doaj_types]
124 for type_ in type_list:
125 model = models.lookup_models_by_type(type_, dao.DomainObject)
126 if not model:
127 logger_fn("unable to locate model for " + type_)
128 continue
130 filename = type_ + ".bulk"
131 output_file = tmpStore.path(container, filename, create_container=True, must_exist=False)
132 logger_fn((dates.now() + " " + type_ + " => " + output_file + ".*"))
133 iter_q = {"query": {"match_all": {}}, "sort": [{"_id": {"order": "asc"}}]}
134 transform = None
135 if type_ in anonymisation_procedures:
136 transform = anonymisation_procedures[type_]
138 # Use the model's dump method to write out this type to file
139 out_rollover_fn = functools.partial(_copy_on_complete, logger_fn=logger_fn, tmpStore=tmpStore, mainStore=mainStore, container=container)
140 _ = model.dump(q=iter_q, limit=limit, transform=transform, out_template=output_file, out_batch_sizes=batch_size,
141 out_rollover_callback=out_rollover_fn, es_bulk_fields=["_id"], scroll_keepalive='3m')
143 logger_fn((dates.now() + " done\n"))
145 tmpStore.delete_container(container)
148def get_value_safe(key, default_v, kwargs, default_cond_fn=None):
149 v = kwargs.get(key, default_v)
150 default_cond_fn = default_cond_fn or (lambda _v: _v is None)
151 if default_cond_fn(v):
152 v = default_v
153 return v
156class AnonExportBackgroundTask(BackgroundTask):
157 __action__ = "anon_export"
159 def run(self):
160 kwargs = {k: self.get_param(self.background_job.params, k)
161 for k in ['clean', 'limit', 'batch_size']}
162 kwargs['logger_fn'] = self.background_job.add_audit_message
164 tmpStore = StoreFactory.tmp()
165 mainStore = StoreFactory.get("anon_data")
166 container = app.config.get("STORE_ANON_DATA_CONTAINER")
168 run_anon_export(tmpStore, mainStore, container, **kwargs)
169 self.background_job.add_audit_message("Anon export completed")
171 def cleanup(self):
172 pass
174 @classmethod
175 def prepare(cls, username, **kwargs):
176 params = {}
177 cls.set_param(params, 'clean', get_value_safe('clean', False, kwargs))
178 cls.set_param(params, "limit", kwargs.get('limit'))
179 cls.set_param(params, "batch_size", get_value_safe('batch_size', 100000, kwargs))
180 return background_helper.create_job(username=username,
181 action=cls.__action__,
182 params=params)
184 @classmethod
185 def submit(cls, background_job):
186 background_job.save()
187 anon_export.schedule(args=(background_job.id,), delay=10)
190@long_running.periodic_task(schedule(AnonExportBackgroundTask.__action__))
191@write_required(script=True)
192def scheduled_anon_export():
193 background_helper.submit_by_bg_task_type(AnonExportBackgroundTask,
194 clean=app.config.get("TASKS_ANON_EXPORT_CLEAN", False),
195 limit=app.config.get("TASKS_ANON_EXPORT_LIMIT", None),
196 batch_size=app.config.get("TASKS_ANON_EXPORT_BATCH_SIZE", 100000))
199@long_running.task()
200@write_required(script=True)
201def anon_export(job_id):
202 background_helper.execute_by_job_id(job_id, AnonExportBackgroundTask)