Coverage for portality / tasks / harvester.py: 88%

73 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-04 09:41 +0100

1from datetime import datetime 

2 

3from portality import models 

4from portality.background import BackgroundTask, BackgroundApi, BackgroundException 

5from portality.core import app 

6from portality.lib import dates 

7from portality.models.harvester import HarvesterProgressReport as Report 

8from portality.store import StoreFactory 

9from portality.tasks.harvester_helpers import workflow 

10from portality.tasks.helpers import background_helper 

11from portality.tasks.redis_huey import scheduled_long_queue as queue 

12 

13 

14class BGHarvesterLogger(object): 

15 def __init__(self, job): 

16 self._job = job 

17 

18 self._logFile = "harvest-" + job.id + ".log" 

19 tempStore = StoreFactory.tmp() 

20 self._tempFile = tempStore.path(app.config.get("STORE_HARVESTER_CONTAINER"), self._logFile, create_container=True, must_exist=False) 

21 self._job.add_audit_message("Audit messages for this run will be stored in file {x}".format(x=self._tempFile)) 

22 self._job.save() 

23 

24 self._fh = open(self._tempFile, "w") 

25 

26 def log(self, msg): 

27 # self._job.add_audit_message(msg) 

28 self._fh.write("[{d}] {m}\n".format(d=dates.now_str(), m=msg)) 

29 

30 def close(self): 

31 self._fh.close() 

32 

33 mainStore = StoreFactory.get("harvester") 

34 mainStore.store(app.config.get("STORE_HARVESTER_CONTAINER"), self._logFile, source_path=self._tempFile) 

35 url = mainStore.url(app.config.get("STORE_HARVESTER_CONTAINER"), self._logFile) 

36 self._job.add_audit_message("Audit messages file moved to {x}".format(x=url)) 

37 

38 tempStore = StoreFactory.tmp() 

39 tempStore.delete_file(app.config.get("STORE_HARVESTER_CONTAINER"), self._logFile) 

40 

41 

42class HarvesterBackgroundTask(BackgroundTask): 

43 """ 

44 ~~Harvester:BackgroundTask~~ 

45 """ 

46 __action__ = "harvest" 

47 

48 def run(self): 

49 """ 

50 Execute the task as specified by the background_job 

51 :return: 

52 """ 

53 

54 if not self.only_me(): 

55 msg = "Another harvester is currently running, skipping this run" 

56 self.background_job.add_audit_message(msg) 

57 raise BackgroundException(msg) 

58 

59 logger = BGHarvesterLogger(self.background_job) 

60 accs = app.config.get("HARVEST_ACCOUNTS", []) 

61 harvester_workflow = workflow.HarvesterWorkflow(logger) 

62 for account_id in accs: 

63 harvester_workflow.process_account(account_id) 

64 

65 report = Report.write_report() 

66 logger.log(report) 

67 logger.close() 

68 

69 # self.background_job.add_audit_message(report) 

70 

71 def cleanup(self): 

72 """ 

73 Cleanup after a successful OR failed run of the task 

74 :return: 

75 """ 

76 pass 

77 

78 @classmethod 

79 def prepare(cls, username, **kwargs): 

80 """ 

81 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob, 

82 or fail with a suitable exception 

83 

84 :param kwargs: arbitrary keyword arguments pertaining to this task type 

85 :return: a BackgroundJob instance representing this task 

86 """ 

87 

88 # first prepare a job record 

89 return background_helper.create_job(username, cls.__action__, queue_id=huey_helper.queue_id) 

90 

91 @classmethod 

92 def submit(cls, background_job): 

93 """ 

94 Submit the specified BackgroundJob to the background queue 

95 

96 :param background_job: the BackgroundJob instance 

97 :return: 

98 """ 

99 background_job.save() 

100 harvest.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10)) 

101 # fixme: schedule() could raise a huey.exceptions.HueyException and not reach redis- would that be logged? 

102 

103 def only_me(self): 

104 age = app.config.get("HARVESTER_ZOMBIE_AGE") 

105 since = dates.format(dates.before_now(age)) 

106 actives = models.BackgroundJob.active(self.__action__, since=since) 

107 if self.background_job.id in [a.id for a in actives] and len(actives) == 1: 

108 return True 

109 if len(actives) == 0: 

110 return True 

111 return False 

112 

113 

114huey_helper = HarvesterBackgroundTask.create_huey_helper(queue) 

115 

116 

117@huey_helper.register_schedule 

118def scheduled_harvest(): 

119 user = app.config.get("SYSTEM_USERNAME") 

120 job = HarvesterBackgroundTask.prepare(user) 

121 HarvesterBackgroundTask.submit(job) 

122 

123 

124@huey_helper.register_execute(is_load_config=False) 

125def harvest(job_id): 

126 job = models.BackgroundJob.pull(job_id) 

127 task = HarvesterBackgroundTask(job) 

128 BackgroundApi.execute(task)