Coverage for portality / store.py: 67%

215 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-04 09:41 +0100

1from portality.core import app 

2from portality.lib import plugin 

3 

4import os, shutil, boto3 

5from urllib.parse import quote_plus 

6from boto3.s3.transfer import TransferConfig 

7from botocore.config import Config 

8 

9 

10class StoreException(Exception): 

11 pass 

12 

13 

14class StoreFactory(object): 

15 

16 @classmethod 

17 def get(cls, scope): 

18 """ 

19 Returns an implementation of the base Store class 

20 """ 

21 si = app.config.get("STORE_IMPL") 

22 if scope is not None and scope in app.config.get("STORE_SCOPE_IMPL", {}): 

23 si = app.config.get["STORE_SCOPE_IMPL"][scope] 

24 

25 sm = plugin.load_class(si) 

26 return sm(scope) 

27 

28 @classmethod 

29 def tmp(cls): 

30 """ 

31 Returns an implementation of the base Store class which should be able 

32 to provide local temp storage to the app. In addition to the methods supplied 

33 by Store, it must also provide a "path" function to give the path on-disk to 

34 the file 

35 """ 

36 si = app.config.get("STORE_TMP_IMPL") 

37 sm = plugin.load_class(si) 

38 return sm() 

39 

40class Store(object): 

41 """ 

42 ~~FileStore:Feature~~ 

43 """ 

44 def __init__(self, scope): 

45 pass 

46 

47 def store(self, container_id, target_name, source_path=None, source_stream=None): 

48 pass 

49 

50 def exists(self, container_id): 

51 return False 

52 

53 def list(self, container_id): 

54 pass 

55 

56 def get(self, container_id, target_name, encoding=None): 

57 return None 

58 

59 def url(self, container_id, target_name): 

60 pass 

61 

62 def temporary_url(self, container_id, target_name, timeout=3600): 

63 return self.url(container_id, target_name) 

64 

65 def delete_container(self, container_id): 

66 pass 

67 

68 def delete_file(self, container_id, target_name): 

69 pass 

70 

71 

72class StoreS3(Store): 

73 """ 

74 Primitive local storage system. Use this for testing in place of remote store 

75 ~~->FileStoreS3:Feature~~ 

76 ~~!FileStoreS3:Feature->S3:Technology~~ 

77 """ 

78 def __init__(self, scope): 

79 self.dir = None 

80 self._cfg = app.config.get("STORE_S3_SCOPES", {}).get(scope) 

81 multipart_threshold = app.config.get("STORE_S3_MULTIPART_THRESHOLD", 5 * 1024**3) 

82 

83 self.client = self._make_client() 

84 

85 # NOTE: we disabled use_threads due to the background server failing to execute the public_data_dump task. 

86 self.config = TransferConfig(multipart_threshold=multipart_threshold, use_threads=False) 

87 

88 def _make_client(self, region_name=None): 

89 access_key = self._cfg.get("aws_access_key_id") 

90 secret = self._cfg.get("aws_secret_access_key") 

91 

92 if access_key is None or secret is None: 

93 raise StoreException("'aws_access_key_id' and 'aws_secret_access_key' must be set in STORE_S3_SCOPE for scope '{x}'".format(x=scope)) 

94 

95 kwargs = { 

96 "aws_access_key_id": access_key, 

97 "aws_secret_access_key": secret, 

98 "config": Config(signature_version='s3v4') 

99 } 

100 

101 if region_name is not None: 

102 kwargs["region_name"] = region_name 

103 

104 return boto3.client( 

105 's3', 

106 **kwargs 

107 ) 

108 

109 def store(self, container_id, target_name, source_path=None, source_stream=None): 

110 # Note that this assumes the container (bucket) exists 

111 if source_path is not None: 

112 with open(source_path, "rb") as f: 

113 self.client.upload_fileobj(f, Bucket=container_id, Key=target_name, Config=self.config) 

114 elif source_stream is not None: 

115 self.client.upload_fileobj(source_stream, Bucket=container_id, Key=target_name, Config=self.config) 

116 

117 def exists(self, container_id): 

118 pass 

119 

120 def list(self, container_id): 

121 all_keys = [] 

122 r = self.client.list_objects_v2(Bucket=container_id) 

123 while r.get('Contents', None): 

124 all_keys += [key["Key"] for key in r['Contents']] 

125 

126 if r.get('NextContinuationToken', None): 

127 r = self.client.list_objects_v2(Bucket=container_id, ContinuationToken=r['NextContinuationToken']) 

128 else: 

129 break 

130 return all_keys 

131 

132 def get(self, container_id, target_name, encoding=None): 

133 try: 

134 obj = self.client.get_object(Bucket=container_id, Key=target_name) 

135 except self.client.exceptions.NoSuchKey: 

136 return None 

137 if obj is None: 

138 return None 

139 body = obj["Body"] 

140 return body 

141 

142 def url(self, container_id, target_name): 

143 bucket_location = self.client.get_bucket_location(Bucket=container_id) 

144 location = bucket_location['LocationConstraint'] 

145 return "https://s3.{0}.amazonaws.com/{1}/{2}".format(location, container_id, quote_plus(target_name)) 

146 

147 def temporary_url(self, container_id, target_name, timeout=3600): 

148 bucket_location = self.client.get_bucket_location(Bucket=container_id) 

149 location = bucket_location['LocationConstraint'] 

150 location_client = self._make_client(region_name=location) 

151 return location_client.generate_presigned_url('get_object', 

152 Params={"Bucket": container_id, "Key": target_name}, 

153 ExpiresIn=timeout) 

154 

155 def delete_container(self, container_id): 

156 """ 

157 This method will delete the entire container (actually, it can't, it will 

158 just empty the bucket) 

159 

160 :param container_id: the container (in this case an S3 bucket) 

161 :return: 

162 """ 

163 # we are not allowed to delete the bucket, so we just delete the contents 

164 keys = self.list(container_id) 

165 

166 # FIXME: this has a limit of 1000 keys, which will need to be dealt with at some point soon 

167 delete_info = { 

168 "Objects" : [{"Key" : key} for key in keys] 

169 } 

170 

171 self.client.delete_objects( 

172 Bucket=container_id, 

173 Delete=delete_info 

174 ) 

175 

176 def delete_file(self, container_id, target_name): 

177 """ 

178 This method will delete the the target_name file within 

179 the container 

180 

181 :param container_id: the container (in this case an S3 bucket) 

182 :param target_name: the file in the container 

183 :return: 

184 """ 

185 

186 if target_name is None: 

187 return 

188 

189 on_remote = self.list(container_id) 

190 if target_name not in on_remote: 

191 return 

192 

193 delete_info = { 

194 "Objects" : [{"Key" : target_name}] 

195 } 

196 

197 self.client.delete_objects( 

198 Bucket=container_id, 

199 Delete=delete_info 

200 ) 

201 

202 

203class StoreLocal(Store): 

204 """ 

205 ~~->FileStoreLocal:Feature~~ 

206 """ 

207 def __init__(self, scope): 

208 self.dir = app.config.get("STORE_LOCAL_DIR") 

209 if self.dir is None: 

210 raise StoreException("STORE_LOCAL_DIR is not defined in config") 

211 self.buffer_size = app.config.get("STORE_LOCAL_WRITE_BUFFER_SIZE", 16777216) 

212 

213 def store(self, container_id, target_name, source_path=None, source_stream=None): 

214 cpath = os.path.join(self.dir, container_id) 

215 tpath = os.path.join(cpath, target_name) 

216 directory = os.path.dirname(tpath) 

217 if not os.path.exists(directory): 

218 os.makedirs(directory) 

219 

220 if source_path: 

221 shutil.copyfile(source_path, tpath) 

222 elif source_stream: 

223 data = source_stream.read(self.buffer_size) 

224 mode = "w" if isinstance(data, str) else "wb" 

225 with open(tpath, mode) as f: 

226 while data: 

227 f.write(data) 

228 data = source_stream.read(self.buffer_size) 

229 

230 def exists(self, container_id): 

231 cpath = os.path.join(self.dir, container_id) 

232 return os.path.exists(cpath) and os.path.isdir(cpath) 

233 

234 def list(self, container_id): 

235 cpath = os.path.join(self.dir, container_id) 

236 return os.listdir(cpath) 

237 

238 def get(self, container_id, target_name, encoding=None): 

239 cpath = os.path.join(self.dir, container_id, target_name) 

240 if os.path.exists(cpath) and os.path.isfile(cpath): 

241 kwargs = {} 

242 mode = "rb" 

243 if encoding is not None: 

244 kwargs = {"encoding" : encoding} 

245 mode = "r" 

246 f = open(cpath, mode, **kwargs) 

247 return f 

248 

249 def url(self, container_id, target_name): 

250 return "/" + container_id + "/" + target_name 

251 

252 def delete_container(self, container_id): 

253 if container_id is None: 

254 return 

255 cpath = os.path.join(self.dir, container_id) 

256 if os.path.exists(cpath): 

257 shutil.rmtree(cpath) 

258 

259 def delete_file(self, container_id, target_name): 

260 if target_name is None: 

261 return 

262 cpath = os.path.join(self.dir, container_id, target_name) 

263 if os.path.exists(cpath): 

264 if os.path.isfile(cpath): 

265 os.remove(cpath) 

266 else: 

267 shutil.rmtree(cpath) 

268 

269 def size(self, container_id, target_name): 

270 cpath = os.path.join(self.dir, container_id, target_name) 

271 return os.stat(cpath).st_size 

272 

273 

274class TempStore(StoreLocal): 

275 """ 

276 ~~->FileStoreTemp:Feature~~ 

277 """ 

278 def __init__(self): 

279 self.dir = app.config.get("STORE_TMP_DIR") 

280 if self.dir is None: 

281 raise StoreException("STORE_TMP_DIR is not defined in config") 

282 self.buffer_size = app.config.get("STORE_TMP_WRITE_BUFFER_SIZE", 16777216) 

283 

284 def path(self, container_id, filename, create_container=False, must_exist=True): 

285 container_path = os.path.join(self.dir, container_id) 

286 if create_container and not os.path.exists(container_path): 

287 os.makedirs(container_path) 

288 fpath = os.path.join(self.dir, container_id, filename) 

289 if not os.path.exists(fpath) and must_exist: 

290 raise StoreException("Unable to create path for container {x}, file {y}".format(x=container_id, y=filename)) 

291 return fpath 

292 

293 def list_container_ids(self): 

294 return [x for x in os.listdir(self.dir) if os.path.isdir(os.path.join(self.dir, x))] 

295 

296 

297def prune_container(storage, container_id, sort, filter=None, keep=1, logger=None, is_directory=False): 

298 logger = logger if logger is not None else lambda x: x 

299 action_register = [] 

300 

301 dir_list = [] 

302 filelist = storage.list(container_id) 

303 #action_register.append("Current cached files (before prune): " + ", ".join(filelist)) 

304 

305 # filter for the files we care about 

306 filtered = [] 

307 if filter is not None: 

308 for fn in filelist: 

309 if filter(fn): 

310 filtered.append(fn) 

311 else: 

312 filtered = filelist 

313 #action_register.append("Filtered cached files (before prune): " + ", ".join(filelist)) 

314 

315 # treat directories differently 

316 # s3 buckets does not have physical directories under the bucket. They are virtual directories. 

317 # Retrieve the directories and delete all files related to the directories 

318 if is_directory: 

319 for fn in filtered: 

320 dir = os.path.dirname(fn) 

321 if dir: 

322 dir_list.append(dir) 

323 else: 

324 if storage.dir: 

325 if os.path.isdir(os.path.join(storage.dir, container_id, fn)): 

326 dir_list.append(fn) 

327 

328 dir_set = set(dir_list) 

329 

330 if is_directory: 

331 if len(dir_set) <= keep: 

332 return action_register 

333 else: 

334 if len(filtered) <= keep: 

335 # action_register.append("Fewer than {x} files in cache, no further action".format(x=keep)) 

336 return action_register 

337 

338 if is_directory: 

339 filtered_sorted = sort(dir_set) 

340 else: 

341 filtered_sorted = sort(filtered) 

342 #action_register.append("Considering files for retention in the following order: " + ", ".join(filtered_sorted)) 

343 

344 remove = filtered_sorted[keep:] 

345 msg = "Removed old files: " + ", ".join(remove) 

346 action_register.append(msg) 

347 logger(msg) 

348 

349 if is_directory: 

350 for fn in remove: 

351 for file in filtered: 

352 if file.startswith(fn): 

353 storage.delete_file(container_id, file) 

354 else: 

355 for fn in remove: 

356 storage.delete_file(container_id, fn) 

357 

358 return action_register