Coverage for portality / upgrade.py: 59%
158 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
1"""
2~~Migrations:Framework~~
3# FIXME: this script requires more work if it's to be used for specified source and target clusters
4"""
5import dictdiffer
6import json
7import os
8from collections import OrderedDict
9from copy import deepcopy
10from datetime import timedelta
11from typing import TypedDict, List, Dict
13from portality import models
14from portality.dao import ScrollTimeoutException
15from portality.lib import plugin, dates
16from portality.lib.dataobj import DataStructureException
17from portality.lib.seamless import SeamlessException
18from portality.models.datalog_journal_added import DatalogJournalAdded
20MODELS = {
21 "journal": models.Journal, # ~~->Journal:Model~~
22 "article": models.Article, # ~~->Article:Model~~
23 "draft_application": models.DraftApplication, # ~~->DraftApplication:Model~~
24 "suggestion": models.Suggestion, # ~~->Application:Model~~
25 "application": models.Application,
26 "account": models.Account, # ~~->Account:Model~~
27 "background_job": models.BackgroundJob, # ~~->BackgroundJob:Model~~
28 'datalog_journal_added': DatalogJournalAdded,
29}
32class UpgradeTask(object):
34 def upgrade_article(self, article):
35 pass
38class UpgradeType(TypedDict):
39 type: str # name / key of the MODELS class
40 action: str # default is update
42 """
43 ES query to use to find the records to upgrade
44 default is match_all if query is None
45 """
46 query: dict
47 keepalive: str # ES keepalive time for the scroll, default 1m
48 scroll_size: int # ES scroll size, default 1000
50 """
51 python path of functions to run on the record
52 interface of the function should be:
53 my_function(instance: DomainObject | dict) -> DomainObject | dict
54 """
55 functions: List[str]
57 """
58 instance would be a DomainObject if True, otherwise a dict
59 default is True
60 """
61 init_with_model: bool
63 """
64 tasks to run on the record
65 that will only work if init_with_model is True
67 format of each task:
68 { function name of model : kwargs }
69 """
70 tasks: List[Dict[str, dict]]
73class Definition(TypedDict):
74 batch: int
75 types: List[UpgradeType]
78def do_upgrade(definition: Definition, verbose, save_batches=None):
79 # get the source and target es definitions
80 # ~~->Elasticsearch:Technology~~
82 # get the defined batch size
83 batch_size = definition.get("batch", 500)
85 for tdef in definition.get("types", []):
86 print("Upgrading", tdef.get("type"))
87 batch = []
88 total = 0
89 batch_num = 0
90 type_start = dates.now()
92 default_query = {
93 "query": {"match_all": {}}
94 }
95 query = tdef.get("query", default_query)
97 # learn what kind of model we've got
98 model_class = MODELS.get(tdef.get("type"))
99 max = model_class.count(query=query)
100 action = tdef.get("action", "update")
102 # Iterate through all of the records in the model class
103 try:
104 for result in model_class.iterate(q=query, keepalive=tdef.get("keepalive", "1m"),
105 page_size=tdef.get("scroll_size", 1000), wrap=False):
107 original = deepcopy(result)
108 if tdef.get("init_with_model", True):
109 # instantiate an object with the data
110 try:
111 result = model_class(**result)
112 except (DataStructureException, SeamlessException) as e:
113 print("Could not create model for {0}, Error: {1}".format(result['id'], str(e)))
114 continue
116 for function_path in tdef.get("functions", []):
117 fn = plugin.load_function(function_path)
118 result = fn(result)
120 data = result
121 _id = result.get("id", "id not specified")
122 if isinstance(result, model_class):
123 # run the tasks specified with this object type
124 tasks = tdef.get("tasks", None)
125 if tasks:
126 for func_call, kwargs in tasks.items():
127 getattr(result, func_call)(**kwargs)
129 # run the prep routine for the record
130 try:
131 result.prep()
132 except AttributeError:
133 if verbose:
134 print(tdef.get("type"), result.id,
135 "has no prep method - no, pre-save preparation being done")
136 pass
138 data = result.data
139 _id = result.id
141 # add the data to the batch
142 if action == 'update':
143 data = _diff(original, data)
145 if "id" not in data:
146 data["id"] = _id
148 batch.append(data)
149 if verbose:
150 print("added", tdef.get("type"), _id, "to batch update")
152 # When we have enough, do some writing
153 if len(batch) >= batch_size:
154 total += len(batch)
155 batch_num += 1
157 print(dates.now(), "writing ", len(batch), "to", tdef.get("type"), ";", total, "of", max)
159 if save_batches:
160 fn = os.path.join(save_batches, tdef.get("type") + "." + str(batch_num) + ".json")
161 with open(fn, "w") as f:
162 f.write(json.dumps(batch, indent=2))
163 print(dates.now(), "wrote batch to file {x}".format(x=fn))
165 model_class.bulk(batch, action=action, req_timeout=120)
166 batch = []
167 # do some timing predictions
168 batch_tick = dates.now()
169 time_so_far = batch_tick - type_start
170 seconds_so_far = time_so_far.total_seconds()
171 estimated_seconds_remaining = ((seconds_so_far * max) / total) - seconds_so_far
172 estimated_finish = batch_tick + timedelta(seconds=estimated_seconds_remaining)
173 print('Estimated finish time for this type {0}.'.format(estimated_finish))
174 except ScrollTimeoutException:
175 # Try to write the part-batch to index
176 if len(batch) > 0:
177 total += len(batch)
178 batch_num += 1
180 if save_batches:
181 fn = os.path.join(save_batches, tdef.get("type") + "." + str(batch_num) + ".json")
182 with open(fn, "w") as f:
183 f.write(json.dumps(batch, indent=2))
184 print(dates.now(), "wrote batch to file {x}".format(x=fn))
186 print(dates.now(), "scroll timed out / writing ", len(batch), "to",
187 tdef.get("type"), ";", total, "of", max)
188 model_class.bulk(batch, action=action, req_timeout=120)
189 batch = []
191 # Write the last part-batch to index
192 if len(batch) > 0:
193 total += len(batch)
194 batch_num += 1
196 if save_batches:
197 fn = os.path.join(save_batches, tdef.get("type") + "." + str(batch_num) + ".json")
198 with open(fn, "w") as f:
199 f.write(json.dumps(batch, indent=2))
200 print(dates.now(), "wrote batch to file {x}".format(x=fn))
202 print(dates.now(), "final result set / writing ", len(batch), "to", tdef.get("type"), ";", total, "of", max)
203 model_class.bulk(batch, action=action, req_timeout=120)
206def _diff(original, current):
207 thediff = {}
208 context = thediff
210 def recurse(context, c, o):
211 dd = dictdiffer.DictDiffer(c, o)
212 changed = dd.changed()
213 added = dd.added()
215 for a in added:
216 context[a] = c[a]
218 for change in changed:
219 sub = c[change]
220 if isinstance(c[change], dict):
221 context[change] = {}
222 recurse(context[change], c[change], o[change])
223 else:
224 context[change] = sub
226 recurse(context, current, original)
227 return thediff
230if __name__ == "__main__":
231 # ~~->Migrate:Script~~
232 import argparse
234 parser = argparse.ArgumentParser()
235 parser.add_argument("-u", "--upgrade", help="path to upgrade definition")
236 parser.add_argument("-v", "--verbose", action="store_true", help="verbose output to stdout during processing")
237 parser.add_argument("-s", "--save", help="save batches to disk in this directory")
238 args = parser.parse_args()
240 if not args.upgrade:
241 print("Please specify an upgrade package with the -u option")
242 exit()
244 if not (os.path.exists(args.upgrade) and os.path.isfile(args.upgrade)):
245 print(args.upgrade, "does not exist or is not a file")
246 exit()
248 print('Starting {0}.'.format(dates.now()))
250 with open(args.upgrade) as f:
251 try:
252 instructions = json.loads(f.read(), object_pairs_hook=OrderedDict)
253 except:
254 print(args.upgrade, "does not parse as JSON")
255 exit()
257 do_upgrade(instructions, args.verbose, args.save)
259 print('Finished {0}.'.format(dates.now()))