Coverage for portality/upgrade.py: 53%
138 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-11-09 16:22 +0000
« prev ^ index » next coverage.py v6.4.2, created at 2022-11-09 16:22 +0000
1"""
2~~Migrations:Framework~~
3# FIXME: this script requires more work if it's to be used for specified source and target clusters
4"""
5import json, os, dictdiffer
6from datetime import datetime, timedelta
7from copy import deepcopy
8from collections import OrderedDict
9from portality import models
10from portality.dao import ScrollTimeoutException
11from portality.lib import plugin
12from portality.lib.dataobj import DataStructureException
13from portality.lib.seamless import SeamlessException
14from portality.dao import ScrollTimeoutException
16MODELS = {
17 "journal": models.Journal, #~~->Journal:Model~~
18 "article": models.Article, #~~->Article:Model~~
19 "suggestion": models.Suggestion, #~~->Application:Model~~
20 "application": models.Application,
21 "account": models.Account #~~->Account:Model~~
22}
25class UpgradeTask(object):
27 def upgrade_article(self, article):
28 pass
31def do_upgrade(definition, verbose, save_batches=None):
32 # get the source and target es definitions
33 # ~~->Elasticsearch:Technology~~
35 # get the defined batch size
36 batch_size = definition.get("batch", 500)
38 for tdef in definition.get("types", []):
39 print("Upgrading", tdef.get("type"))
40 batch = []
41 total = 0
42 batch_num = 0
43 type_start = datetime.now()
45 default_query = {
46 "query": {"match_all": {}}
47 }
49 # learn what kind of model we've got
50 model_class = MODELS.get(tdef.get("type"))
51 max = model_class.count()
52 action = tdef.get("action", "update")
54 # Iterate through all of the records in the model class
55 try:
56 for result in model_class.iterate(q=tdef.get("query", default_query), keepalive=tdef.get("keepalive", "1m"), page_size=tdef.get("scroll_size", 1000), wrap=False):
58 original = deepcopy(result)
59 if tdef.get("init_with_model", True):
60 # instantiate an object with the data
61 try:
62 result = model_class(**result)
63 except (DataStructureException, SeamlessException) as e:
64 print("Could not create model for {0}, Error: {1}".format(result['id'], str(e)))
65 continue
67 for function_path in tdef.get("functions", []):
68 fn = plugin.load_function(function_path)
69 result = fn(result)
71 data = result
72 _id = result.get("id", "id not specified")
73 if isinstance(result, model_class):
74 # run the tasks specified with this object type
75 tasks = tdef.get("tasks", None)
76 if tasks:
77 for func_call, kwargs in tasks.items():
78 getattr(result, func_call)(**kwargs)
80 # run the prep routine for the record
81 try:
82 result.prep()
83 except AttributeError:
84 if verbose:
85 print(tdef.get("type"), result.id, "has no prep method - no, pre-save preparation being done")
86 pass
88 data = result.data
89 _id = result.id
91 # add the data to the batch
92 if action == 'update':
93 data = _diff(original, data)
95 if "id" not in data:
96 data["id"] = _id
98 batch.append(data)
99 if verbose:
100 print("added", tdef.get("type"), _id, "to batch update")
102 # When we have enough, do some writing
103 if len(batch) >= batch_size:
104 total += len(batch)
105 batch_num += 1
107 print(datetime.now(), "writing ", len(batch), "to", tdef.get("type"), ";", total, "of", max)
109 if save_batches:
110 fn = os.path.join(save_batches, tdef.get("type") + "." + str(batch_num) + ".json")
111 with open(fn, "w") as f:
112 f.write(json.dumps(batch, indent=2))
113 print(datetime.now(), "wrote batch to file {x}".format(x=fn))
115 model_class.bulk(batch, action=action, req_timeout=120)
116 batch = []
117 # do some timing predictions
118 batch_tick = datetime.now()
119 time_so_far = batch_tick - type_start
120 seconds_so_far = time_so_far.total_seconds()
121 estimated_seconds_remaining = ((seconds_so_far * max) / total) - seconds_so_far
122 estimated_finish = batch_tick + timedelta(seconds=estimated_seconds_remaining)
123 print('Estimated finish time for this type {0}.'.format(estimated_finish))
124 except ScrollTimeoutException:
125 # Try to write the part-batch to index
126 if len(batch) > 0:
127 total += len(batch)
128 batch_num += 1
130 if save_batches:
131 fn = os.path.join(save_batches, tdef.get("type") + "." + str(batch_num) + ".json")
132 with open(fn, "w") as f:
133 f.write(json.dumps(batch, indent=2))
134 print(datetime.now(), "wrote batch to file {x}".format(x=fn))
136 print(datetime.now(), "scroll timed out / writing ", len(batch), "to", tdef.get("type"), ";", total, "of", max)
137 model_class.bulk(batch, action=action, req_timeout=120)
138 batch = []
140 # Write the last part-batch to index
141 if len(batch) > 0:
142 total += len(batch)
143 batch_num += 1
145 if save_batches:
146 fn = os.path.join(save_batches, tdef.get("type") + "." + str(batch_num) + ".json")
147 with open(fn, "w") as f:
148 f.write(json.dumps(batch, indent=2))
149 print(datetime.now(), "wrote batch to file {x}".format(x=fn))
151 print(datetime.now(), "final result set / writing ", len(batch), "to", tdef.get("type"), ";", total, "of", max)
152 model_class.bulk(batch, action=action, req_timeout=120)
155def _diff(original, current):
156 thediff = {}
157 context = thediff
159 def recurse(context, c, o):
160 dd = dictdiffer.DictDiffer(c, o)
161 changed = dd.changed()
162 added = dd.added()
164 for a in added:
165 context[a] = c[a]
167 for change in changed:
168 sub = c[change]
169 if isinstance(c[change], dict):
170 context[change] = {}
171 recurse(context[change], c[change], o[change])
172 else:
173 context[change] = sub
175 recurse(context, current, original)
176 return thediff
179if __name__ == "__main__":
180 # ~~->Migrate:Script~~
181 import argparse
182 parser = argparse.ArgumentParser()
183 parser.add_argument("-u", "--upgrade", help="path to upgrade definition")
184 parser.add_argument("-v", "--verbose", action="store_true", help="verbose output to stdout during processing")
185 parser.add_argument("-s", "--save", help="save batches to disk in this directory")
186 args = parser.parse_args()
188 if not args.upgrade:
189 print("Please specify an upgrade package with the -u option")
190 exit()
192 if not (os.path.exists(args.upgrade) and os.path.isfile(args.upgrade)):
193 print(args.upgrade, "does not exist or is not a file")
194 exit()
196 print('Starting {0}.'.format(datetime.now()))
198 with open(args.upgrade) as f:
199 try:
200 instructions = json.loads(f.read(), object_pairs_hook=OrderedDict)
201 except:
202 print(args.upgrade, "does not parse as JSON")
203 exit()
205 do_upgrade(instructions, args.verbose, args.save)
207 print('Finished {0}.'.format(datetime.now()))