Coverage for portality/upgrade.py: 53%

138 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-11-09 15:10 +0000

1""" 

2~~Migrations:Framework~~ 

3# FIXME: this script requires more work if it's to be used for specified source and target clusters 

4""" 

5import json, os, dictdiffer 

6from datetime import datetime, timedelta 

7from copy import deepcopy 

8from collections import OrderedDict 

9from portality import models 

10from portality.dao import ScrollTimeoutException 

11from portality.lib import plugin 

12from portality.lib.dataobj import DataStructureException 

13from portality.lib.seamless import SeamlessException 

14from portality.dao import ScrollTimeoutException 

15 

16MODELS = { 

17 "journal": models.Journal, #~~->Journal:Model~~ 

18 "article": models.Article, #~~->Article:Model~~ 

19 "suggestion": models.Suggestion, #~~->Application:Model~~ 

20 "application": models.Application, 

21 "account": models.Account #~~->Account:Model~~ 

22} 

23 

24 

25class UpgradeTask(object): 

26 

27 def upgrade_article(self, article): 

28 pass 

29 

30 

31def do_upgrade(definition, verbose, save_batches=None): 

32 # get the source and target es definitions 

33 # ~~->Elasticsearch:Technology~~ 

34 

35 # get the defined batch size 

36 batch_size = definition.get("batch", 500) 

37 

38 for tdef in definition.get("types", []): 

39 print("Upgrading", tdef.get("type")) 

40 batch = [] 

41 total = 0 

42 batch_num = 0 

43 type_start = datetime.now() 

44 

45 default_query = { 

46 "query": {"match_all": {}} 

47 } 

48 

49 # learn what kind of model we've got 

50 model_class = MODELS.get(tdef.get("type")) 

51 max = model_class.count() 

52 action = tdef.get("action", "update") 

53 

54 # Iterate through all of the records in the model class 

55 try: 

56 for result in model_class.iterate(q=tdef.get("query", default_query), keepalive=tdef.get("keepalive", "1m"), page_size=tdef.get("scroll_size", 1000), wrap=False): 

57 

58 original = deepcopy(result) 

59 if tdef.get("init_with_model", True): 

60 # instantiate an object with the data 

61 try: 

62 result = model_class(**result) 

63 except (DataStructureException, SeamlessException) as e: 

64 print("Could not create model for {0}, Error: {1}".format(result['id'], str(e))) 

65 continue 

66 

67 for function_path in tdef.get("functions", []): 

68 fn = plugin.load_function(function_path) 

69 result = fn(result) 

70 

71 data = result 

72 _id = result.get("id", "id not specified") 

73 if isinstance(result, model_class): 

74 # run the tasks specified with this object type 

75 tasks = tdef.get("tasks", None) 

76 if tasks: 

77 for func_call, kwargs in tasks.items(): 

78 getattr(result, func_call)(**kwargs) 

79 

80 # run the prep routine for the record 

81 try: 

82 result.prep() 

83 except AttributeError: 

84 if verbose: 

85 print(tdef.get("type"), result.id, "has no prep method - no, pre-save preparation being done") 

86 pass 

87 

88 data = result.data 

89 _id = result.id 

90 

91 # add the data to the batch 

92 if action == 'update': 

93 data = _diff(original, data) 

94 

95 if "id" not in data: 

96 data["id"] = _id 

97 

98 batch.append(data) 

99 if verbose: 

100 print("added", tdef.get("type"), _id, "to batch update") 

101 

102 # When we have enough, do some writing 

103 if len(batch) >= batch_size: 

104 total += len(batch) 

105 batch_num += 1 

106 

107 print(datetime.now(), "writing ", len(batch), "to", tdef.get("type"), ";", total, "of", max) 

108 

109 if save_batches: 

110 fn = os.path.join(save_batches, tdef.get("type") + "." + str(batch_num) + ".json") 

111 with open(fn, "w") as f: 

112 f.write(json.dumps(batch, indent=2)) 

113 print(datetime.now(), "wrote batch to file {x}".format(x=fn)) 

114 

115 model_class.bulk(batch, action=action, req_timeout=120) 

116 batch = [] 

117 # do some timing predictions 

118 batch_tick = datetime.now() 

119 time_so_far = batch_tick - type_start 

120 seconds_so_far = time_so_far.total_seconds() 

121 estimated_seconds_remaining = ((seconds_so_far * max) / total) - seconds_so_far 

122 estimated_finish = batch_tick + timedelta(seconds=estimated_seconds_remaining) 

123 print('Estimated finish time for this type {0}.'.format(estimated_finish)) 

124 except ScrollTimeoutException: 

125 # Try to write the part-batch to index 

126 if len(batch) > 0: 

127 total += len(batch) 

128 batch_num += 1 

129 

130 if save_batches: 

131 fn = os.path.join(save_batches, tdef.get("type") + "." + str(batch_num) + ".json") 

132 with open(fn, "w") as f: 

133 f.write(json.dumps(batch, indent=2)) 

134 print(datetime.now(), "wrote batch to file {x}".format(x=fn)) 

135 

136 print(datetime.now(), "scroll timed out / writing ", len(batch), "to", tdef.get("type"), ";", total, "of", max) 

137 model_class.bulk(batch, action=action, req_timeout=120) 

138 batch = [] 

139 

140 # Write the last part-batch to index 

141 if len(batch) > 0: 

142 total += len(batch) 

143 batch_num += 1 

144 

145 if save_batches: 

146 fn = os.path.join(save_batches, tdef.get("type") + "." + str(batch_num) + ".json") 

147 with open(fn, "w") as f: 

148 f.write(json.dumps(batch, indent=2)) 

149 print(datetime.now(), "wrote batch to file {x}".format(x=fn)) 

150 

151 print(datetime.now(), "final result set / writing ", len(batch), "to", tdef.get("type"), ";", total, "of", max) 

152 model_class.bulk(batch, action=action, req_timeout=120) 

153 

154 

155def _diff(original, current): 

156 thediff = {} 

157 context = thediff 

158 

159 def recurse(context, c, o): 

160 dd = dictdiffer.DictDiffer(c, o) 

161 changed = dd.changed() 

162 added = dd.added() 

163 

164 for a in added: 

165 context[a] = c[a] 

166 

167 for change in changed: 

168 sub = c[change] 

169 if isinstance(c[change], dict): 

170 context[change] = {} 

171 recurse(context[change], c[change], o[change]) 

172 else: 

173 context[change] = sub 

174 

175 recurse(context, current, original) 

176 return thediff 

177 

178 

179if __name__ == "__main__": 

180 # ~~->Migrate:Script~~ 

181 import argparse 

182 parser = argparse.ArgumentParser() 

183 parser.add_argument("-u", "--upgrade", help="path to upgrade definition") 

184 parser.add_argument("-v", "--verbose", action="store_true", help="verbose output to stdout during processing") 

185 parser.add_argument("-s", "--save", help="save batches to disk in this directory") 

186 args = parser.parse_args() 

187 

188 if not args.upgrade: 

189 print("Please specify an upgrade package with the -u option") 

190 exit() 

191 

192 if not (os.path.exists(args.upgrade) and os.path.isfile(args.upgrade)): 

193 print(args.upgrade, "does not exist or is not a file") 

194 exit() 

195 

196 print('Starting {0}.'.format(datetime.now())) 

197 

198 with open(args.upgrade) as f: 

199 try: 

200 instructions = json.loads(f.read(), object_pairs_hook=OrderedDict) 

201 except: 

202 print(args.upgrade, "does not parse as JSON") 

203 exit() 

204 

205 do_upgrade(instructions, args.verbose, args.save) 

206 

207 print('Finished {0}.'.format(datetime.now()))