Coverage for portality / store.py: 67%
215 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
1from portality.core import app
2from portality.lib import plugin
4import os, shutil, boto3
5from urllib.parse import quote_plus
6from boto3.s3.transfer import TransferConfig
7from botocore.config import Config
10class StoreException(Exception):
11 pass
14class StoreFactory(object):
16 @classmethod
17 def get(cls, scope):
18 """
19 Returns an implementation of the base Store class
20 """
21 si = app.config.get("STORE_IMPL")
22 if scope is not None and scope in app.config.get("STORE_SCOPE_IMPL", {}):
23 si = app.config.get["STORE_SCOPE_IMPL"][scope]
25 sm = plugin.load_class(si)
26 return sm(scope)
28 @classmethod
29 def tmp(cls):
30 """
31 Returns an implementation of the base Store class which should be able
32 to provide local temp storage to the app. In addition to the methods supplied
33 by Store, it must also provide a "path" function to give the path on-disk to
34 the file
35 """
36 si = app.config.get("STORE_TMP_IMPL")
37 sm = plugin.load_class(si)
38 return sm()
40class Store(object):
41 """
42 ~~FileStore:Feature~~
43 """
44 def __init__(self, scope):
45 pass
47 def store(self, container_id, target_name, source_path=None, source_stream=None):
48 pass
50 def exists(self, container_id):
51 return False
53 def list(self, container_id):
54 pass
56 def get(self, container_id, target_name, encoding=None):
57 return None
59 def url(self, container_id, target_name):
60 pass
62 def temporary_url(self, container_id, target_name, timeout=3600):
63 return self.url(container_id, target_name)
65 def delete_container(self, container_id):
66 pass
68 def delete_file(self, container_id, target_name):
69 pass
72class StoreS3(Store):
73 """
74 Primitive local storage system. Use this for testing in place of remote store
75 ~~->FileStoreS3:Feature~~
76 ~~!FileStoreS3:Feature->S3:Technology~~
77 """
78 def __init__(self, scope):
79 self.dir = None
80 self._cfg = app.config.get("STORE_S3_SCOPES", {}).get(scope)
81 multipart_threshold = app.config.get("STORE_S3_MULTIPART_THRESHOLD", 5 * 1024**3)
83 self.client = self._make_client()
85 # NOTE: we disabled use_threads due to the background server failing to execute the public_data_dump task.
86 self.config = TransferConfig(multipart_threshold=multipart_threshold, use_threads=False)
88 def _make_client(self, region_name=None):
89 access_key = self._cfg.get("aws_access_key_id")
90 secret = self._cfg.get("aws_secret_access_key")
92 if access_key is None or secret is None:
93 raise StoreException("'aws_access_key_id' and 'aws_secret_access_key' must be set in STORE_S3_SCOPE for scope '{x}'".format(x=scope))
95 kwargs = {
96 "aws_access_key_id": access_key,
97 "aws_secret_access_key": secret,
98 "config": Config(signature_version='s3v4')
99 }
101 if region_name is not None:
102 kwargs["region_name"] = region_name
104 return boto3.client(
105 's3',
106 **kwargs
107 )
109 def store(self, container_id, target_name, source_path=None, source_stream=None):
110 # Note that this assumes the container (bucket) exists
111 if source_path is not None:
112 with open(source_path, "rb") as f:
113 self.client.upload_fileobj(f, Bucket=container_id, Key=target_name, Config=self.config)
114 elif source_stream is not None:
115 self.client.upload_fileobj(source_stream, Bucket=container_id, Key=target_name, Config=self.config)
117 def exists(self, container_id):
118 pass
120 def list(self, container_id):
121 all_keys = []
122 r = self.client.list_objects_v2(Bucket=container_id)
123 while r.get('Contents', None):
124 all_keys += [key["Key"] for key in r['Contents']]
126 if r.get('NextContinuationToken', None):
127 r = self.client.list_objects_v2(Bucket=container_id, ContinuationToken=r['NextContinuationToken'])
128 else:
129 break
130 return all_keys
132 def get(self, container_id, target_name, encoding=None):
133 try:
134 obj = self.client.get_object(Bucket=container_id, Key=target_name)
135 except self.client.exceptions.NoSuchKey:
136 return None
137 if obj is None:
138 return None
139 body = obj["Body"]
140 return body
142 def url(self, container_id, target_name):
143 bucket_location = self.client.get_bucket_location(Bucket=container_id)
144 location = bucket_location['LocationConstraint']
145 return "https://s3.{0}.amazonaws.com/{1}/{2}".format(location, container_id, quote_plus(target_name))
147 def temporary_url(self, container_id, target_name, timeout=3600):
148 bucket_location = self.client.get_bucket_location(Bucket=container_id)
149 location = bucket_location['LocationConstraint']
150 location_client = self._make_client(region_name=location)
151 return location_client.generate_presigned_url('get_object',
152 Params={"Bucket": container_id, "Key": target_name},
153 ExpiresIn=timeout)
155 def delete_container(self, container_id):
156 """
157 This method will delete the entire container (actually, it can't, it will
158 just empty the bucket)
160 :param container_id: the container (in this case an S3 bucket)
161 :return:
162 """
163 # we are not allowed to delete the bucket, so we just delete the contents
164 keys = self.list(container_id)
166 # FIXME: this has a limit of 1000 keys, which will need to be dealt with at some point soon
167 delete_info = {
168 "Objects" : [{"Key" : key} for key in keys]
169 }
171 self.client.delete_objects(
172 Bucket=container_id,
173 Delete=delete_info
174 )
176 def delete_file(self, container_id, target_name):
177 """
178 This method will delete the the target_name file within
179 the container
181 :param container_id: the container (in this case an S3 bucket)
182 :param target_name: the file in the container
183 :return:
184 """
186 if target_name is None:
187 return
189 on_remote = self.list(container_id)
190 if target_name not in on_remote:
191 return
193 delete_info = {
194 "Objects" : [{"Key" : target_name}]
195 }
197 self.client.delete_objects(
198 Bucket=container_id,
199 Delete=delete_info
200 )
203class StoreLocal(Store):
204 """
205 ~~->FileStoreLocal:Feature~~
206 """
207 def __init__(self, scope):
208 self.dir = app.config.get("STORE_LOCAL_DIR")
209 if self.dir is None:
210 raise StoreException("STORE_LOCAL_DIR is not defined in config")
211 self.buffer_size = app.config.get("STORE_LOCAL_WRITE_BUFFER_SIZE", 16777216)
213 def store(self, container_id, target_name, source_path=None, source_stream=None):
214 cpath = os.path.join(self.dir, container_id)
215 tpath = os.path.join(cpath, target_name)
216 directory = os.path.dirname(tpath)
217 if not os.path.exists(directory):
218 os.makedirs(directory)
220 if source_path:
221 shutil.copyfile(source_path, tpath)
222 elif source_stream:
223 data = source_stream.read(self.buffer_size)
224 mode = "w" if isinstance(data, str) else "wb"
225 with open(tpath, mode) as f:
226 while data:
227 f.write(data)
228 data = source_stream.read(self.buffer_size)
230 def exists(self, container_id):
231 cpath = os.path.join(self.dir, container_id)
232 return os.path.exists(cpath) and os.path.isdir(cpath)
234 def list(self, container_id):
235 cpath = os.path.join(self.dir, container_id)
236 return os.listdir(cpath)
238 def get(self, container_id, target_name, encoding=None):
239 cpath = os.path.join(self.dir, container_id, target_name)
240 if os.path.exists(cpath) and os.path.isfile(cpath):
241 kwargs = {}
242 mode = "rb"
243 if encoding is not None:
244 kwargs = {"encoding" : encoding}
245 mode = "r"
246 f = open(cpath, mode, **kwargs)
247 return f
249 def url(self, container_id, target_name):
250 return "/" + container_id + "/" + target_name
252 def delete_container(self, container_id):
253 if container_id is None:
254 return
255 cpath = os.path.join(self.dir, container_id)
256 if os.path.exists(cpath):
257 shutil.rmtree(cpath)
259 def delete_file(self, container_id, target_name):
260 if target_name is None:
261 return
262 cpath = os.path.join(self.dir, container_id, target_name)
263 if os.path.exists(cpath):
264 if os.path.isfile(cpath):
265 os.remove(cpath)
266 else:
267 shutil.rmtree(cpath)
269 def size(self, container_id, target_name):
270 cpath = os.path.join(self.dir, container_id, target_name)
271 return os.stat(cpath).st_size
274class TempStore(StoreLocal):
275 """
276 ~~->FileStoreTemp:Feature~~
277 """
278 def __init__(self):
279 self.dir = app.config.get("STORE_TMP_DIR")
280 if self.dir is None:
281 raise StoreException("STORE_TMP_DIR is not defined in config")
282 self.buffer_size = app.config.get("STORE_TMP_WRITE_BUFFER_SIZE", 16777216)
284 def path(self, container_id, filename, create_container=False, must_exist=True):
285 container_path = os.path.join(self.dir, container_id)
286 if create_container and not os.path.exists(container_path):
287 os.makedirs(container_path)
288 fpath = os.path.join(self.dir, container_id, filename)
289 if not os.path.exists(fpath) and must_exist:
290 raise StoreException("Unable to create path for container {x}, file {y}".format(x=container_id, y=filename))
291 return fpath
293 def list_container_ids(self):
294 return [x for x in os.listdir(self.dir) if os.path.isdir(os.path.join(self.dir, x))]
297def prune_container(storage, container_id, sort, filter=None, keep=1, logger=None, is_directory=False):
298 logger = logger if logger is not None else lambda x: x
299 action_register = []
301 dir_list = []
302 filelist = storage.list(container_id)
303 #action_register.append("Current cached files (before prune): " + ", ".join(filelist))
305 # filter for the files we care about
306 filtered = []
307 if filter is not None:
308 for fn in filelist:
309 if filter(fn):
310 filtered.append(fn)
311 else:
312 filtered = filelist
313 #action_register.append("Filtered cached files (before prune): " + ", ".join(filelist))
315 # treat directories differently
316 # s3 buckets does not have physical directories under the bucket. They are virtual directories.
317 # Retrieve the directories and delete all files related to the directories
318 if is_directory:
319 for fn in filtered:
320 dir = os.path.dirname(fn)
321 if dir:
322 dir_list.append(dir)
323 else:
324 if storage.dir:
325 if os.path.isdir(os.path.join(storage.dir, container_id, fn)):
326 dir_list.append(fn)
328 dir_set = set(dir_list)
330 if is_directory:
331 if len(dir_set) <= keep:
332 return action_register
333 else:
334 if len(filtered) <= keep:
335 # action_register.append("Fewer than {x} files in cache, no further action".format(x=keep))
336 return action_register
338 if is_directory:
339 filtered_sorted = sort(dir_set)
340 else:
341 filtered_sorted = sort(filtered)
342 #action_register.append("Considering files for retention in the following order: " + ", ".join(filtered_sorted))
344 remove = filtered_sorted[keep:]
345 msg = "Removed old files: " + ", ".join(remove)
346 action_register.append(msg)
347 logger(msg)
349 if is_directory:
350 for fn in remove:
351 for file in filtered:
352 if file.startswith(fn):
353 storage.delete_file(container_id, file)
354 else:
355 for fn in remove:
356 storage.delete_file(container_id, fn)
358 return action_register