Coverage for portality/tasks/harvester.py: 88%
77 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-19 18:38 +0100
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-19 18:38 +0100
1from portality import models
2from portality.background import BackgroundTask, BackgroundApi, BackgroundException
4from portality.tasks.harvester_helpers import workflow
5from portality.core import app
6from portality.models.harvester import HarvesterProgressReport as Report
7from portality.tasks.redis_huey import schedule, long_running
8from portality.decorators import write_required
9from portality.lib import dates
10from portality.store import StoreFactory
12from datetime import datetime
15class BGHarvesterLogger(object):
16 def __init__(self, job):
17 self._job = job
19 self._logFile = "harvest-" + job.id + ".log"
20 tempStore = StoreFactory.tmp()
21 self._tempFile = tempStore.path(app.config.get("STORE_HARVESTER_CONTAINER"), self._logFile, create_container=True, must_exist=False)
22 self._job.add_audit_message("Audit messages for this run will be stored in file {x}".format(x=self._tempFile))
23 self._job.save()
25 self._fh = open(self._tempFile, "w")
27 def log(self, msg):
28 # self._job.add_audit_message(msg)
29 self._fh.write("[{d}] {m}\n".format(d=dates.now(), m=msg))
31 def close(self):
32 self._fh.close()
34 mainStore = StoreFactory.get("harvester")
35 mainStore.store(app.config.get("STORE_HARVESTER_CONTAINER"), self._logFile, source_path=self._tempFile)
36 url = mainStore.url(app.config.get("STORE_HARVESTER_CONTAINER"), self._logFile)
37 self._job.add_audit_message("Audit messages file moved to {x}".format(x=url))
39 tempStore = StoreFactory.tmp()
40 tempStore.delete_file(app.config.get("STORE_HARVESTER_CONTAINER"), self._logFile)
43class HarvesterBackgroundTask(BackgroundTask):
44 """
45 ~~Harvester:BackgroundTask~~
46 """
47 __action__ = "harvest"
49 def run(self):
50 """
51 Execute the task as specified by the background_job
52 :return:
53 """
55 if not self.only_me():
56 msg = "Another harvester is currently running, skipping this run"
57 self.background_job.add_audit_message(msg)
58 raise BackgroundException(msg)
60 logger = BGHarvesterLogger(self.background_job)
61 accs = app.config.get("HARVEST_ACCOUNTS", [])
62 harvester_workflow = workflow.HarvesterWorkflow(logger)
63 for account_id in accs:
64 harvester_workflow.process_account(account_id)
66 report = Report.write_report()
67 logger.log(report)
68 logger.close()
70 # self.background_job.add_audit_message(report)
72 def cleanup(self):
73 """
74 Cleanup after a successful OR failed run of the task
75 :return:
76 """
77 pass
79 @classmethod
80 def prepare(cls, username, **kwargs):
81 """
82 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob,
83 or fail with a suitable exception
85 :param kwargs: arbitrary keyword arguments pertaining to this task type
86 :return: a BackgroundJob instance representing this task
87 """
89 # first prepare a job record
90 job = models.BackgroundJob()
91 job.user = username
92 job.action = cls.__action__
93 return job
95 @classmethod
96 def submit(cls, background_job):
97 """
98 Submit the specified BackgroundJob to the background queue
100 :param background_job: the BackgroundJob instance
101 :return:
102 """
103 background_job.save()
104 harvest.schedule(args=(background_job.id,), delay=10)
105 # fixme: schedule() could raise a huey.exceptions.HueyException and not reach redis- would that be logged?
107 def only_me(self):
108 age = app.config.get("HARVESTER_ZOMBIE_AGE")
109 since = dates.format(dates.before(datetime.utcnow(), age))
110 actives = models.BackgroundJob.active(self.__action__, since=since)
111 if self.background_job.id in [a.id for a in actives] and len(actives) == 1:
112 return True
113 if len(actives) == 0:
114 return True
115 return False
118@long_running.periodic_task(schedule("harvest"))
119@write_required(script=True)
120def scheduled_harvest():
121 user = app.config.get("SYSTEM_USERNAME")
122 job = HarvesterBackgroundTask.prepare(user)
123 HarvesterBackgroundTask.submit(job)
125@long_running.task()
126@write_required(script=True)
127def harvest(job_id):
128 job = models.BackgroundJob.pull(job_id)
129 task = HarvesterBackgroundTask(job)
130 BackgroundApi.execute(task)