Coverage for portality / tasks / old_data_cleanup.py: 82%

56 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 00:09 +0100

1import datetime 

2from typing import Type 

3 

4from portality.background import BackgroundTask 

5from portality.core import app 

6from portality.dao import DomainObject 

7from portality.lib import dates 

8from portality.lib.es_queries import ES_DATETIME_FMT 

9from portality.models import Notification, BackgroundJob, AdminAlert 

10from portality.tasks.helpers import background_helper 

11from portality.tasks.redis_huey import scheduled_short_queue as queue 

12 

13 

14class RetentionQuery: 

15 def __init__(self, 

16 last_retention_date: datetime.datetime, 

17 datetime_field='created_date'): 

18 self.last_retention_date = last_retention_date 

19 self.datetime_field = datetime_field 

20 

21 def query(self): 

22 # returns the query dict 

23 return { 

24 'query': { 

25 'range': { 

26 self.datetime_field: { 

27 'lte': self.last_retention_date.strftime(ES_DATETIME_FMT), 

28 } 

29 } 

30 } 

31 } 

32 

33 

34def _clean_old_data(domain_class: Type[DomainObject], 

35 datetime_field='created_date', 

36 logger_fn=None, ): 

37 if logger_fn is None: 

38 logger_fn = print 

39 

40 n_retention_day = app.config.get("TASK_DATA_RETENTION_DAYS", {}).get(domain_class.__type__, 180) 

41 if not n_retention_day or n_retention_day <= 0: 

42 logger_fn(f'stop cleanup for invalid retention_day [{domain_class.__type__}][{n_retention_day}]') 

43 return 

44 

45 logger_fn(f'working for clean_old_data [{domain_class.__type__}][{n_retention_day}]') 

46 

47 last_retention_date = dates.now() - datetime.timedelta(days=n_retention_day) 

48 retention_query = RetentionQuery(last_retention_date, datetime_field=datetime_field).query() 

49 num_record = domain_class.hit_count(retention_query) 

50 logger_fn(f'remove [{domain_class.__name__}] -- {datetime_field} <= {last_retention_date}') 

51 logger_fn(f'number of [{domain_class.__name__}][{num_record}] to be removed.') 

52 domain_class.delete_by_query(retention_query) 

53 

54 

55def clean_all_old_data(logger_fn=None): 

56 if logger_fn is None: 

57 logger_fn = print 

58 

59 for klazz in [Notification, BackgroundJob, AdminAlert]: 

60 _clean_old_data(klazz, logger_fn=logger_fn) 

61 logger_fn("old data cleanup completed") 

62 

63 

64class OldDataCleanupBackgroundTask(BackgroundTask): 

65 __action__ = "old_data_cleanup" 

66 

67 def run(self): 

68 kwargs = {'logger_fn': self.background_job.add_audit_message} 

69 clean_all_old_data(**kwargs) 

70 

71 def cleanup(self): 

72 pass 

73 

74 @classmethod 

75 def prepare(cls, username, **kwargs): 

76 return background_helper.create_job(username=username, 

77 action=cls.__action__) 

78 

79 @classmethod 

80 def submit(cls, background_job): 

81 background_helper.submit_by_background_job( 

82 background_job, old_data_cleanup 

83 ) 

84 

85 

86huey_helper = OldDataCleanupBackgroundTask.create_huey_helper(queue) 

87 

88 

89@huey_helper.register_schedule 

90def scheduled_old_data_cleanup(): 

91 huey_helper.scheduled_common() 

92 

93 

94@huey_helper.register_execute(is_load_config=False) 

95def old_data_cleanup(job_id): 

96 huey_helper.scheduled_common(job_id)