Coverage for portality / tasks / old_data_cleanup.py: 82%
56 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
1import datetime
2from typing import Type
4from portality.background import BackgroundTask
5from portality.core import app
6from portality.dao import DomainObject
7from portality.lib import dates
8from portality.lib.es_queries import ES_DATETIME_FMT
9from portality.models import Notification, BackgroundJob, AdminAlert
10from portality.tasks.helpers import background_helper
11from portality.tasks.redis_huey import scheduled_short_queue as queue
14class RetentionQuery:
15 def __init__(self,
16 last_retention_date: datetime.datetime,
17 datetime_field='created_date'):
18 self.last_retention_date = last_retention_date
19 self.datetime_field = datetime_field
21 def query(self):
22 # returns the query dict
23 return {
24 'query': {
25 'range': {
26 self.datetime_field: {
27 'lte': self.last_retention_date.strftime(ES_DATETIME_FMT),
28 }
29 }
30 }
31 }
34def _clean_old_data(domain_class: Type[DomainObject],
35 datetime_field='created_date',
36 logger_fn=None, ):
37 if logger_fn is None:
38 logger_fn = print
40 n_retention_day = app.config.get("TASK_DATA_RETENTION_DAYS", {}).get(domain_class.__type__, 180)
41 if not n_retention_day or n_retention_day <= 0:
42 logger_fn(f'stop cleanup for invalid retention_day [{domain_class.__type__}][{n_retention_day}]')
43 return
45 logger_fn(f'working for clean_old_data [{domain_class.__type__}][{n_retention_day}]')
47 last_retention_date = dates.now() - datetime.timedelta(days=n_retention_day)
48 retention_query = RetentionQuery(last_retention_date, datetime_field=datetime_field).query()
49 num_record = domain_class.hit_count(retention_query)
50 logger_fn(f'remove [{domain_class.__name__}] -- {datetime_field} <= {last_retention_date}')
51 logger_fn(f'number of [{domain_class.__name__}][{num_record}] to be removed.')
52 domain_class.delete_by_query(retention_query)
55def clean_all_old_data(logger_fn=None):
56 if logger_fn is None:
57 logger_fn = print
59 for klazz in [Notification, BackgroundJob, AdminAlert]:
60 _clean_old_data(klazz, logger_fn=logger_fn)
61 logger_fn("old data cleanup completed")
64class OldDataCleanupBackgroundTask(BackgroundTask):
65 __action__ = "old_data_cleanup"
67 def run(self):
68 kwargs = {'logger_fn': self.background_job.add_audit_message}
69 clean_all_old_data(**kwargs)
71 def cleanup(self):
72 pass
74 @classmethod
75 def prepare(cls, username, **kwargs):
76 return background_helper.create_job(username=username,
77 action=cls.__action__)
79 @classmethod
80 def submit(cls, background_job):
81 background_helper.submit_by_background_job(
82 background_job, old_data_cleanup
83 )
86huey_helper = OldDataCleanupBackgroundTask.create_huey_helper(queue)
89@huey_helper.register_schedule
90def scheduled_old_data_cleanup():
91 huey_helper.scheduled_common()
94@huey_helper.register_execute(is_load_config=False)
95def old_data_cleanup(job_id):
96 huey_helper.scheduled_common(job_id)