Coverage for portality / upgrade.py: 59%

158 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 00:09 +0100

1""" 

2~~Migrations:Framework~~ 

3# FIXME: this script requires more work if it's to be used for specified source and target clusters 

4""" 

5import dictdiffer 

6import json 

7import os 

8from collections import OrderedDict 

9from copy import deepcopy 

10from datetime import timedelta 

11from typing import TypedDict, List, Dict 

12 

13from portality import models 

14from portality.dao import ScrollTimeoutException 

15from portality.lib import plugin, dates 

16from portality.lib.dataobj import DataStructureException 

17from portality.lib.seamless import SeamlessException 

18from portality.models.datalog_journal_added import DatalogJournalAdded 

19 

20MODELS = { 

21 "journal": models.Journal, # ~~->Journal:Model~~ 

22 "article": models.Article, # ~~->Article:Model~~ 

23 "draft_application": models.DraftApplication, # ~~->DraftApplication:Model~~ 

24 "suggestion": models.Suggestion, # ~~->Application:Model~~ 

25 "application": models.Application, 

26 "account": models.Account, # ~~->Account:Model~~ 

27 "background_job": models.BackgroundJob, # ~~->BackgroundJob:Model~~ 

28 'datalog_journal_added': DatalogJournalAdded, 

29} 

30 

31 

32class UpgradeTask(object): 

33 

34 def upgrade_article(self, article): 

35 pass 

36 

37 

38class UpgradeType(TypedDict): 

39 type: str # name / key of the MODELS class 

40 action: str # default is update 

41 

42 """ 

43 ES query to use to find the records to upgrade 

44 default is match_all if query is None 

45 """ 

46 query: dict 

47 keepalive: str # ES keepalive time for the scroll, default 1m 

48 scroll_size: int # ES scroll size, default 1000 

49 

50 """ 

51 python path of functions to run on the record 

52 interface of the function should be: 

53 my_function(instance: DomainObject | dict) -> DomainObject | dict 

54 """ 

55 functions: List[str] 

56 

57 """ 

58 instance would be a DomainObject if True, otherwise a dict 

59 default is True 

60 """ 

61 init_with_model: bool 

62 

63 """ 

64 tasks to run on the record 

65 that will only work if init_with_model is True 

66  

67 format of each task: 

68 { function name of model : kwargs } 

69 """ 

70 tasks: List[Dict[str, dict]] 

71 

72 

73class Definition(TypedDict): 

74 batch: int 

75 types: List[UpgradeType] 

76 

77 

78def do_upgrade(definition: Definition, verbose, save_batches=None): 

79 # get the source and target es definitions 

80 # ~~->Elasticsearch:Technology~~ 

81 

82 # get the defined batch size 

83 batch_size = definition.get("batch", 500) 

84 

85 for tdef in definition.get("types", []): 

86 print("Upgrading", tdef.get("type")) 

87 batch = [] 

88 total = 0 

89 batch_num = 0 

90 type_start = dates.now() 

91 

92 default_query = { 

93 "query": {"match_all": {}} 

94 } 

95 query = tdef.get("query", default_query) 

96 

97 # learn what kind of model we've got 

98 model_class = MODELS.get(tdef.get("type")) 

99 max = model_class.count(query=query) 

100 action = tdef.get("action", "update") 

101 

102 # Iterate through all of the records in the model class 

103 try: 

104 for result in model_class.iterate(q=query, keepalive=tdef.get("keepalive", "1m"), 

105 page_size=tdef.get("scroll_size", 1000), wrap=False): 

106 

107 original = deepcopy(result) 

108 if tdef.get("init_with_model", True): 

109 # instantiate an object with the data 

110 try: 

111 result = model_class(**result) 

112 except (DataStructureException, SeamlessException) as e: 

113 print("Could not create model for {0}, Error: {1}".format(result['id'], str(e))) 

114 continue 

115 

116 for function_path in tdef.get("functions", []): 

117 fn = plugin.load_function(function_path) 

118 result = fn(result) 

119 

120 data = result 

121 _id = result.get("id", "id not specified") 

122 if isinstance(result, model_class): 

123 # run the tasks specified with this object type 

124 tasks = tdef.get("tasks", None) 

125 if tasks: 

126 for func_call, kwargs in tasks.items(): 

127 getattr(result, func_call)(**kwargs) 

128 

129 # run the prep routine for the record 

130 try: 

131 result.prep() 

132 except AttributeError: 

133 if verbose: 

134 print(tdef.get("type"), result.id, 

135 "has no prep method - no, pre-save preparation being done") 

136 pass 

137 

138 data = result.data 

139 _id = result.id 

140 

141 # add the data to the batch 

142 if action == 'update': 

143 data = _diff(original, data) 

144 

145 if "id" not in data: 

146 data["id"] = _id 

147 

148 batch.append(data) 

149 if verbose: 

150 print("added", tdef.get("type"), _id, "to batch update") 

151 

152 # When we have enough, do some writing 

153 if len(batch) >= batch_size: 

154 total += len(batch) 

155 batch_num += 1 

156 

157 print(dates.now(), "writing ", len(batch), "to", tdef.get("type"), ";", total, "of", max) 

158 

159 if save_batches: 

160 fn = os.path.join(save_batches, tdef.get("type") + "." + str(batch_num) + ".json") 

161 with open(fn, "w") as f: 

162 f.write(json.dumps(batch, indent=2)) 

163 print(dates.now(), "wrote batch to file {x}".format(x=fn)) 

164 

165 model_class.bulk(batch, action=action, req_timeout=120) 

166 batch = [] 

167 # do some timing predictions 

168 batch_tick = dates.now() 

169 time_so_far = batch_tick - type_start 

170 seconds_so_far = time_so_far.total_seconds() 

171 estimated_seconds_remaining = ((seconds_so_far * max) / total) - seconds_so_far 

172 estimated_finish = batch_tick + timedelta(seconds=estimated_seconds_remaining) 

173 print('Estimated finish time for this type {0}.'.format(estimated_finish)) 

174 except ScrollTimeoutException: 

175 # Try to write the part-batch to index 

176 if len(batch) > 0: 

177 total += len(batch) 

178 batch_num += 1 

179 

180 if save_batches: 

181 fn = os.path.join(save_batches, tdef.get("type") + "." + str(batch_num) + ".json") 

182 with open(fn, "w") as f: 

183 f.write(json.dumps(batch, indent=2)) 

184 print(dates.now(), "wrote batch to file {x}".format(x=fn)) 

185 

186 print(dates.now(), "scroll timed out / writing ", len(batch), "to", 

187 tdef.get("type"), ";", total, "of", max) 

188 model_class.bulk(batch, action=action, req_timeout=120) 

189 batch = [] 

190 

191 # Write the last part-batch to index 

192 if len(batch) > 0: 

193 total += len(batch) 

194 batch_num += 1 

195 

196 if save_batches: 

197 fn = os.path.join(save_batches, tdef.get("type") + "." + str(batch_num) + ".json") 

198 with open(fn, "w") as f: 

199 f.write(json.dumps(batch, indent=2)) 

200 print(dates.now(), "wrote batch to file {x}".format(x=fn)) 

201 

202 print(dates.now(), "final result set / writing ", len(batch), "to", tdef.get("type"), ";", total, "of", max) 

203 model_class.bulk(batch, action=action, req_timeout=120) 

204 

205 

206def _diff(original, current): 

207 thediff = {} 

208 context = thediff 

209 

210 def recurse(context, c, o): 

211 dd = dictdiffer.DictDiffer(c, o) 

212 changed = dd.changed() 

213 added = dd.added() 

214 

215 for a in added: 

216 context[a] = c[a] 

217 

218 for change in changed: 

219 sub = c[change] 

220 if isinstance(c[change], dict): 

221 context[change] = {} 

222 recurse(context[change], c[change], o[change]) 

223 else: 

224 context[change] = sub 

225 

226 recurse(context, current, original) 

227 return thediff 

228 

229 

230if __name__ == "__main__": 

231 # ~~->Migrate:Script~~ 

232 import argparse 

233 

234 parser = argparse.ArgumentParser() 

235 parser.add_argument("-u", "--upgrade", help="path to upgrade definition") 

236 parser.add_argument("-v", "--verbose", action="store_true", help="verbose output to stdout during processing") 

237 parser.add_argument("-s", "--save", help="save batches to disk in this directory") 

238 args = parser.parse_args() 

239 

240 if not args.upgrade: 

241 print("Please specify an upgrade package with the -u option") 

242 exit() 

243 

244 if not (os.path.exists(args.upgrade) and os.path.isfile(args.upgrade)): 

245 print(args.upgrade, "does not exist or is not a file") 

246 exit() 

247 

248 print('Starting {0}.'.format(dates.now())) 

249 

250 with open(args.upgrade) as f: 

251 try: 

252 instructions = json.loads(f.read(), object_pairs_hook=OrderedDict) 

253 except: 

254 print(args.upgrade, "does not parse as JSON") 

255 exit() 

256 

257 do_upgrade(instructions, args.verbose, args.save) 

258 

259 print('Finished {0}.'.format(dates.now()))