Coverage for portality / tasks / harvester.py: 88%
73 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
1from datetime import datetime
3from portality import models
4from portality.background import BackgroundTask, BackgroundApi, BackgroundException
5from portality.core import app
6from portality.lib import dates
7from portality.models.harvester import HarvesterProgressReport as Report
8from portality.store import StoreFactory
9from portality.tasks.harvester_helpers import workflow
10from portality.tasks.helpers import background_helper
11from portality.tasks.redis_huey import scheduled_long_queue as queue
14class BGHarvesterLogger(object):
15 def __init__(self, job):
16 self._job = job
18 self._logFile = "harvest-" + job.id + ".log"
19 tempStore = StoreFactory.tmp()
20 self._tempFile = tempStore.path(app.config.get("STORE_HARVESTER_CONTAINER"), self._logFile, create_container=True, must_exist=False)
21 self._job.add_audit_message("Audit messages for this run will be stored in file {x}".format(x=self._tempFile))
22 self._job.save()
24 self._fh = open(self._tempFile, "w")
26 def log(self, msg):
27 # self._job.add_audit_message(msg)
28 self._fh.write("[{d}] {m}\n".format(d=dates.now_str(), m=msg))
30 def close(self):
31 self._fh.close()
33 mainStore = StoreFactory.get("harvester")
34 mainStore.store(app.config.get("STORE_HARVESTER_CONTAINER"), self._logFile, source_path=self._tempFile)
35 url = mainStore.url(app.config.get("STORE_HARVESTER_CONTAINER"), self._logFile)
36 self._job.add_audit_message("Audit messages file moved to {x}".format(x=url))
38 tempStore = StoreFactory.tmp()
39 tempStore.delete_file(app.config.get("STORE_HARVESTER_CONTAINER"), self._logFile)
42class HarvesterBackgroundTask(BackgroundTask):
43 """
44 ~~Harvester:BackgroundTask~~
45 """
46 __action__ = "harvest"
48 def run(self):
49 """
50 Execute the task as specified by the background_job
51 :return:
52 """
54 if not self.only_me():
55 msg = "Another harvester is currently running, skipping this run"
56 self.background_job.add_audit_message(msg)
57 raise BackgroundException(msg)
59 logger = BGHarvesterLogger(self.background_job)
60 accs = app.config.get("HARVEST_ACCOUNTS", [])
61 harvester_workflow = workflow.HarvesterWorkflow(logger)
62 for account_id in accs:
63 harvester_workflow.process_account(account_id)
65 report = Report.write_report()
66 logger.log(report)
67 logger.close()
69 # self.background_job.add_audit_message(report)
71 def cleanup(self):
72 """
73 Cleanup after a successful OR failed run of the task
74 :return:
75 """
76 pass
78 @classmethod
79 def prepare(cls, username, **kwargs):
80 """
81 Take an arbitrary set of keyword arguments and return an instance of a BackgroundJob,
82 or fail with a suitable exception
84 :param kwargs: arbitrary keyword arguments pertaining to this task type
85 :return: a BackgroundJob instance representing this task
86 """
88 # first prepare a job record
89 return background_helper.create_job(username, cls.__action__, queue_id=huey_helper.queue_id)
91 @classmethod
92 def submit(cls, background_job):
93 """
94 Submit the specified BackgroundJob to the background queue
96 :param background_job: the BackgroundJob instance
97 :return:
98 """
99 background_job.save()
100 harvest.schedule(args=(background_job.id,), delay=app.config.get('HUEY_ASYNC_DELAY', 10))
101 # fixme: schedule() could raise a huey.exceptions.HueyException and not reach redis- would that be logged?
103 def only_me(self):
104 age = app.config.get("HARVESTER_ZOMBIE_AGE")
105 since = dates.format(dates.before_now(age))
106 actives = models.BackgroundJob.active(self.__action__, since=since)
107 if self.background_job.id in [a.id for a in actives] and len(actives) == 1:
108 return True
109 if len(actives) == 0:
110 return True
111 return False
114huey_helper = HarvesterBackgroundTask.create_huey_helper(queue)
117@huey_helper.register_schedule
118def scheduled_harvest():
119 user = app.config.get("SYSTEM_USERNAME")
120 job = HarvesterBackgroundTask.prepare(user)
121 HarvesterBackgroundTask.submit(job)
124@huey_helper.register_execute(is_load_config=False)
125def harvest(job_id):
126 job = models.BackgroundJob.pull(job_id)
127 task = HarvesterBackgroundTask(job)
128 BackgroundApi.execute(task)