Coverage for portality/store.py: 67%

175 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-22 15:59 +0100

1from portality.core import app 

2from portality.lib import plugin 

3 

4import os, shutil, boto3 

5from urllib.parse import quote_plus 

6from boto3.s3.transfer import TransferConfig 

7 

8class StoreException(Exception): 

9 pass 

10 

11class StoreFactory(object): 

12 

13 @classmethod 

14 def get(cls, scope): 

15 """ 

16 Returns an implementation of the base Store class 

17 """ 

18 si = app.config.get("STORE_IMPL") 

19 sm = plugin.load_class(si) 

20 return sm(scope) 

21 

22 @classmethod 

23 def tmp(cls): 

24 """ 

25 Returns an implementation of the base Store class which should be able 

26 to provide local temp storage to the app. In addition to the methods supplied 

27 by Store, it must also provide a "path" function to give the path on-disk to 

28 the file 

29 """ 

30 si = app.config.get("STORE_TMP_IMPL") 

31 sm = plugin.load_class(si) 

32 return sm() 

33 

34class Store(object): 

35 """ 

36 ~~FileStore:Feature~~ 

37 """ 

38 def __init__(self, scope): 

39 pass 

40 

41 def store(self, container_id, target_name, source_path=None, source_stream=None): 

42 pass 

43 

44 def exists(self, container_id): 

45 return False 

46 

47 def list(self, container_id): 

48 pass 

49 

50 def get(self, container_id, target_name, encoding=None): 

51 return None 

52 

53 def url(self, container_id, target_name): 

54 pass 

55 

56 def delete_container(self, container_id): 

57 pass 

58 

59 def delete_file(self, container_id, target_name): 

60 pass 

61 

62 

63class StoreS3(Store): 

64 """ 

65 Primitive local storage system. Use this for testing in place of remote store 

66 ~~->FileStoreS3:Feature~~ 

67 ~~!FileStoreS3:Feature->S3:Technology~~ 

68 """ 

69 def __init__(self, scope): 

70 cfg = app.config.get("STORE_S3_SCOPES", {}).get(scope) 

71 multipart_threshold = app.config.get("STORE_S3_MULTIPART_THRESHOLD", 5 * 1024**3) 

72 access_key = cfg.get("aws_access_key_id") 

73 secret = cfg.get("aws_secret_access_key") 

74 if access_key is None or secret is None: 

75 raise StoreException("'aws_access_key_id' and 'aws_secret_access_key' must be set in STORE_S3_SCOPE for scope '{x}'".format(x=scope)) 

76 

77 self.client = boto3.client( 

78 's3', 

79 aws_access_key_id=access_key, 

80 aws_secret_access_key=secret 

81 ) 

82 # NOTE: we disabled use_threads due to the background server failing to execute the public_data_dump task. 

83 self.config = TransferConfig(multipart_threshold=multipart_threshold, use_threads=False) 

84 

85 def store(self, container_id, target_name, source_path=None, source_stream=None): 

86 # Note that this assumes the container (bucket) exists 

87 if source_path is not None: 

88 with open(source_path, "rb") as f: 

89 self.client.upload_fileobj(f, Bucket=container_id, Key=target_name, Config=self.config) 

90 elif source_stream is not None: 

91 self.client.upload_fileobj(source_stream, Bucket=container_id, Key=target_name, Config=self.config) 

92 

93 def exists(self, container_id): 

94 pass 

95 

96 def list(self, container_id): 

97 all_keys = [] 

98 r = self.client.list_objects_v2(Bucket=container_id) 

99 while r.get('Contents', None): 

100 all_keys += [key["Key"] for key in r['Contents']] 

101 

102 if r.get('NextContinuationToken', None): 

103 r = self.client.list_objects_v2(Bucket=container_id, ContinuationToken=r['NextContinuationToken']) 

104 else: 

105 break 

106 return all_keys 

107 

108 def get(self, container_id, target_name, encoding=None): 

109 try: 

110 obj = self.client.get_object(Bucket=container_id, Key=target_name) 

111 except self.client.exceptions.NoSuchKey: 

112 return None 

113 if obj is None: 

114 return None 

115 body = obj["Body"] 

116 return body 

117 

118 def url(self, container_id, target_name): 

119 bucket_location = self.client.get_bucket_location(Bucket=container_id) 

120 location = bucket_location['LocationConstraint'] 

121 return "https://s3.{0}.amazonaws.com/{1}/{2}".format(location, container_id, quote_plus(target_name)) 

122 

123 def delete_container(self, container_id): 

124 """ 

125 This method will delete the entire container (actually, it can't, it will 

126 just empty the bucket) 

127 

128 :param container_id: the container (in this case an S3 bucket) 

129 :return: 

130 """ 

131 # we are not allowed to delete the bucket, so we just delete the contents 

132 keys = self.list(container_id) 

133 

134 # FIXME: this has a limit of 1000 keys, which will need to be dealt with at some point soon 

135 delete_info = { 

136 "Objects" : [{"Key" : key} for key in keys] 

137 } 

138 

139 self.client.delete_objects( 

140 Bucket=container_id, 

141 Delete=delete_info 

142 ) 

143 

144 def delete_file(self, container_id, target_name): 

145 """ 

146 This method will delete the the target_name file within 

147 the container 

148 

149 :param container_id: the container (in this case an S3 bucket) 

150 :param target_name: the file in the container 

151 :return: 

152 """ 

153 

154 if target_name is None: 

155 return 

156 

157 on_remote = self.list(container_id) 

158 if target_name not in on_remote: 

159 return 

160 

161 delete_info = { 

162 "Objects" : [{"Key" : target_name}] 

163 } 

164 

165 self.client.delete_objects( 

166 Bucket=container_id, 

167 Delete=delete_info 

168 ) 

169 

170 

171class StoreLocal(Store): 

172 """ 

173 ~~->FileStoreLocal:Feature~~ 

174 """ 

175 def __init__(self, scope): 

176 self.dir = app.config.get("STORE_LOCAL_DIR") 

177 if self.dir is None: 

178 raise StoreException("STORE_LOCAL_DIR is not defined in config") 

179 self.buffer_size = app.config.get("STORE_LOCAL_WRITE_BUFFER_SIZE", 16777216) 

180 

181 def store(self, container_id, target_name, source_path=None, source_stream=None): 

182 cpath = os.path.join(self.dir, container_id) 

183 if not os.path.exists(cpath): 

184 os.makedirs(cpath) 

185 tpath = os.path.join(cpath, target_name) 

186 

187 if source_path: 

188 shutil.copyfile(source_path, tpath) 

189 elif source_stream: 

190 data = source_stream.read(self.buffer_size) 

191 mode = "w" if isinstance(data, str) else "wb" 

192 with open(tpath, mode) as f: 

193 while data: 

194 f.write(data) 

195 data = source_stream.read(self.buffer_size) 

196 

197 def exists(self, container_id): 

198 cpath = os.path.join(self.dir, container_id) 

199 return os.path.exists(cpath) and os.path.isdir(cpath) 

200 

201 def list(self, container_id): 

202 cpath = os.path.join(self.dir, container_id) 

203 return os.listdir(cpath) 

204 

205 def get(self, container_id, target_name, encoding=None): 

206 cpath = os.path.join(self.dir, container_id, target_name) 

207 if os.path.exists(cpath) and os.path.isfile(cpath): 

208 kwargs = {} 

209 mode = "rb" 

210 if encoding is not None: 

211 kwargs = {"encoding" : encoding} 

212 mode = "r" 

213 f = open(cpath, mode, **kwargs) 

214 return f 

215 

216 def url(self, container_id, target_name): 

217 return "/" + container_id + "/" + target_name 

218 

219 def delete_container(self, container_id): 

220 if container_id is None: 

221 return 

222 cpath = os.path.join(self.dir, container_id) 

223 if os.path.exists(cpath): 

224 shutil.rmtree(cpath) 

225 

226 def delete_file(self, container_id, target_name): 

227 if target_name is None: 

228 return 

229 cpath = os.path.join(self.dir, container_id, target_name) 

230 if os.path.exists(cpath): 

231 if os.path.isfile(cpath): 

232 os.remove(cpath) 

233 else: 

234 shutil.rmtree(cpath) 

235 

236 def size(self, container_id, target_name): 

237 cpath = os.path.join(self.dir, container_id, target_name) 

238 return os.stat(cpath).st_size 

239 

240 

241class TempStore(StoreLocal): 

242 """ 

243 ~~->FileStoreTemp:Feature~~ 

244 """ 

245 def __init__(self): 

246 self.dir = app.config.get("STORE_TMP_DIR") 

247 if self.dir is None: 

248 raise StoreException("STORE_TMP_DIR is not defined in config") 

249 self.buffer_size = app.config.get("STORE_TMP_WRITE_BUFFER_SIZE", 16777216) 

250 

251 def path(self, container_id, filename, create_container=False, must_exist=True): 

252 container_path = os.path.join(self.dir, container_id) 

253 if create_container and not os.path.exists(container_path): 

254 os.makedirs(container_path) 

255 fpath = os.path.join(self.dir, container_id, filename) 

256 if not os.path.exists(fpath) and must_exist: 

257 raise StoreException("Unable to create path for container {x}, file {y}".format(x=container_id, y=filename)) 

258 return fpath 

259 

260 def list_container_ids(self): 

261 return [x for x in os.listdir(self.dir) if os.path.isdir(os.path.join(self.dir, x))] 

262 

263 

264def prune_container(storage, container_id, sort, filter=None, keep=1): 

265 action_register = [] 

266 

267 filelist = storage.list(container_id) 

268 #action_register.append("Current cached files (before prune): " + ", ".join(filelist)) 

269 

270 # filter for the files we care about 

271 filtered = [] 

272 if filter is not None: 

273 for fn in filelist: 

274 if filter(fn): 

275 filtered.append(fn) 

276 else: 

277 filtered = filelist 

278 #action_register.append("Filtered cached files (before prune): " + ", ".join(filelist)) 

279 

280 if len(filtered) <= keep: 

281 # action_register.append("Fewer than {x} files in cache, no further action".format(x=keep)) 

282 return action_register 

283 

284 filtered_sorted = sort(filtered) 

285 #action_register.append("Considering files for retention in the following order: " + ", ".join(filtered_sorted)) 

286 

287 remove = filtered_sorted[keep:] 

288 action_register.append("Removed old files: " + ", ".join(remove)) 

289 

290 for fn in remove: 

291 storage.delete_file(container_id, fn) 

292 

293 return action_register