Coverage for portality/tasks/harvester.py: 88%

77 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-19 18:38 +0100

1from portality import models 

2from portality.background import BackgroundTask, BackgroundApi, BackgroundException 

3 

4from portality.tasks.harvester_helpers import workflow 

5from portality.core import app 

6from portality.models.harvester import HarvesterProgressReport as Report 

7from portality.tasks.redis_huey import schedule, long_running 

8from portality.decorators import write_required 

9from portality.lib import dates 

10from portality.store import StoreFactory 

11 

12from datetime import datetime 

13 

14 

15class BGHarvesterLogger(object): 

16 def __init__(self, job): 

17 self._job = job 

18 

19 self._logFile = "harvest-" + job.id + ".log" 

20 tempStore = StoreFactory.tmp() 

21 self._tempFile = tempStore.path(app.config.get("STORE_HARVESTER_CONTAINER"), self._logFile, create_container=True, must_exist=False) 

22 self._job.add_audit_message("Audit messages for this run will be stored in file {x}".format(x=self._tempFile)) 

23 self._job.save() 

24 

25 self._fh = open(self._tempFile, "w") 

26 

27 def log(self, msg): 

28 # self._job.add_audit_message(msg) 

29 self._fh.write("[{d}] {m}\n".format(d=dates.now(), m=msg)) 

30 

31 def close(self): 

32 self._fh.close() 

33 

34 mainStore = StoreFactory.get("harvester") 

35 mainStore.store(app.config.get("STORE_HARVESTER_CONTAINER"), self._logFile, source_path=self._tempFile) 

36 url = mainStore.url(app.config.get("STORE_HARVESTER_CONTAINER"), self._logFile) 

37 self._job.add_audit_message("Audit messages file moved to {x}".format(x=url)) 

38 

39 tempStore = StoreFactory.tmp() 

40 tempStore.delete_file(app.config.get("STORE_HARVESTER_CONTAINER"), self._logFile) 

41 

42 

43class HarvesterBackgroundTask(BackgroundTask): 

44 """ 

45 ~~Harvester:BackgroundTask~~ 

46 """ 

47 __action__ = "harvest" 

48 

49 def run(self): 

50 """ 

51 Execute the task as specified by the background_job 

52 :return: 

53 """ 

54 

55 if not self.only_me(): 

56 msg = "Another harvester is currently running, skipping this run" 

57 self.background_job.add_audit_message(msg) 

58 raise BackgroundException(msg) 

59 

60 logger = BGHarvesterLogger(self.background_job) 

61 accs = app.config.get("HARVEST_ACCOUNTS", []) 

62 harvester_workflow = workflow.HarvesterWorkflow(logger) 

63 for account_id in accs: 

64 harvester_workflow.process_account(account_id) 

65 

66 report = Report.write_report() 

67 logger.log(report) 

68 logger.close() 

69 

70 # self.background_job.add_audit_message(report) 

71 

72 def cleanup(self): 

73 """ 

74 Cleanup after a successful OR failed run of the task 

75 :return: 

76 """ 

77 pass 

78 

79 @classmethod 

80 def prepare(cls, username, **kwargs): 

81 """ 

82 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob, 

83 or fail with a suitable exception 

84 

85 :param kwargs: arbitrary keyword arguments pertaining to this task type 

86 :return: a BackgroundJob instance representing this task 

87 """ 

88 

89 # first prepare a job record 

90 job = models.BackgroundJob() 

91 job.user = username 

92 job.action = cls.__action__ 

93 return job 

94 

95 @classmethod 

96 def submit(cls, background_job): 

97 """ 

98 Submit the specified BackgroundJob to the background queue 

99 

100 :param background_job: the BackgroundJob instance 

101 :return: 

102 """ 

103 background_job.save() 

104 harvest.schedule(args=(background_job.id,), delay=10) 

105 # fixme: schedule() could raise a huey.exceptions.HueyException and not reach redis- would that be logged? 

106 

107 def only_me(self): 

108 age = app.config.get("HARVESTER_ZOMBIE_AGE") 

109 since = dates.format(dates.before(datetime.utcnow(), age)) 

110 actives = models.BackgroundJob.active(self.__action__, since=since) 

111 if self.background_job.id in [a.id for a in actives] and len(actives) == 1: 

112 return True 

113 if len(actives) == 0: 

114 return True 

115 return False 

116 

117 

118@long_running.periodic_task(schedule("harvest")) 

119@write_required(script=True) 

120def scheduled_harvest(): 

121 user = app.config.get("SYSTEM_USERNAME") 

122 job = HarvesterBackgroundTask.prepare(user) 

123 HarvesterBackgroundTask.submit(job) 

124 

125@long_running.task() 

126@write_required(script=True) 

127def harvest(job_id): 

128 job = models.BackgroundJob.pull(job_id) 

129 task = HarvesterBackgroundTask(job) 

130 BackgroundApi.execute(task)