Coverage for portality/store.py: 67%
175 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-22 15:59 +0100
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-22 15:59 +0100
1from portality.core import app
2from portality.lib import plugin
4import os, shutil, boto3
5from urllib.parse import quote_plus
6from boto3.s3.transfer import TransferConfig
8class StoreException(Exception):
9 pass
11class StoreFactory(object):
13 @classmethod
14 def get(cls, scope):
15 """
16 Returns an implementation of the base Store class
17 """
18 si = app.config.get("STORE_IMPL")
19 sm = plugin.load_class(si)
20 return sm(scope)
22 @classmethod
23 def tmp(cls):
24 """
25 Returns an implementation of the base Store class which should be able
26 to provide local temp storage to the app. In addition to the methods supplied
27 by Store, it must also provide a "path" function to give the path on-disk to
28 the file
29 """
30 si = app.config.get("STORE_TMP_IMPL")
31 sm = plugin.load_class(si)
32 return sm()
34class Store(object):
35 """
36 ~~FileStore:Feature~~
37 """
38 def __init__(self, scope):
39 pass
41 def store(self, container_id, target_name, source_path=None, source_stream=None):
42 pass
44 def exists(self, container_id):
45 return False
47 def list(self, container_id):
48 pass
50 def get(self, container_id, target_name, encoding=None):
51 return None
53 def url(self, container_id, target_name):
54 pass
56 def delete_container(self, container_id):
57 pass
59 def delete_file(self, container_id, target_name):
60 pass
63class StoreS3(Store):
64 """
65 Primitive local storage system. Use this for testing in place of remote store
66 ~~->FileStoreS3:Feature~~
67 ~~!FileStoreS3:Feature->S3:Technology~~
68 """
69 def __init__(self, scope):
70 cfg = app.config.get("STORE_S3_SCOPES", {}).get(scope)
71 multipart_threshold = app.config.get("STORE_S3_MULTIPART_THRESHOLD", 5 * 1024**3)
72 access_key = cfg.get("aws_access_key_id")
73 secret = cfg.get("aws_secret_access_key")
74 if access_key is None or secret is None:
75 raise StoreException("'aws_access_key_id' and 'aws_secret_access_key' must be set in STORE_S3_SCOPE for scope '{x}'".format(x=scope))
77 self.client = boto3.client(
78 's3',
79 aws_access_key_id=access_key,
80 aws_secret_access_key=secret
81 )
82 # NOTE: we disabled use_threads due to the background server failing to execute the public_data_dump task.
83 self.config = TransferConfig(multipart_threshold=multipart_threshold, use_threads=False)
85 def store(self, container_id, target_name, source_path=None, source_stream=None):
86 # Note that this assumes the container (bucket) exists
87 if source_path is not None:
88 with open(source_path, "rb") as f:
89 self.client.upload_fileobj(f, Bucket=container_id, Key=target_name, Config=self.config)
90 elif source_stream is not None:
91 self.client.upload_fileobj(source_stream, Bucket=container_id, Key=target_name, Config=self.config)
93 def exists(self, container_id):
94 pass
96 def list(self, container_id):
97 all_keys = []
98 r = self.client.list_objects_v2(Bucket=container_id)
99 while r.get('Contents', None):
100 all_keys += [key["Key"] for key in r['Contents']]
102 if r.get('NextContinuationToken', None):
103 r = self.client.list_objects_v2(Bucket=container_id, ContinuationToken=r['NextContinuationToken'])
104 else:
105 break
106 return all_keys
108 def get(self, container_id, target_name, encoding=None):
109 try:
110 obj = self.client.get_object(Bucket=container_id, Key=target_name)
111 except self.client.exceptions.NoSuchKey:
112 return None
113 if obj is None:
114 return None
115 body = obj["Body"]
116 return body
118 def url(self, container_id, target_name):
119 bucket_location = self.client.get_bucket_location(Bucket=container_id)
120 location = bucket_location['LocationConstraint']
121 return "https://s3.{0}.amazonaws.com/{1}/{2}".format(location, container_id, quote_plus(target_name))
123 def delete_container(self, container_id):
124 """
125 This method will delete the entire container (actually, it can't, it will
126 just empty the bucket)
128 :param container_id: the container (in this case an S3 bucket)
129 :return:
130 """
131 # we are not allowed to delete the bucket, so we just delete the contents
132 keys = self.list(container_id)
134 # FIXME: this has a limit of 1000 keys, which will need to be dealt with at some point soon
135 delete_info = {
136 "Objects" : [{"Key" : key} for key in keys]
137 }
139 self.client.delete_objects(
140 Bucket=container_id,
141 Delete=delete_info
142 )
144 def delete_file(self, container_id, target_name):
145 """
146 This method will delete the the target_name file within
147 the container
149 :param container_id: the container (in this case an S3 bucket)
150 :param target_name: the file in the container
151 :return:
152 """
154 if target_name is None:
155 return
157 on_remote = self.list(container_id)
158 if target_name not in on_remote:
159 return
161 delete_info = {
162 "Objects" : [{"Key" : target_name}]
163 }
165 self.client.delete_objects(
166 Bucket=container_id,
167 Delete=delete_info
168 )
171class StoreLocal(Store):
172 """
173 ~~->FileStoreLocal:Feature~~
174 """
175 def __init__(self, scope):
176 self.dir = app.config.get("STORE_LOCAL_DIR")
177 if self.dir is None:
178 raise StoreException("STORE_LOCAL_DIR is not defined in config")
179 self.buffer_size = app.config.get("STORE_LOCAL_WRITE_BUFFER_SIZE", 16777216)
181 def store(self, container_id, target_name, source_path=None, source_stream=None):
182 cpath = os.path.join(self.dir, container_id)
183 if not os.path.exists(cpath):
184 os.makedirs(cpath)
185 tpath = os.path.join(cpath, target_name)
187 if source_path:
188 shutil.copyfile(source_path, tpath)
189 elif source_stream:
190 data = source_stream.read(self.buffer_size)
191 mode = "w" if isinstance(data, str) else "wb"
192 with open(tpath, mode) as f:
193 while data:
194 f.write(data)
195 data = source_stream.read(self.buffer_size)
197 def exists(self, container_id):
198 cpath = os.path.join(self.dir, container_id)
199 return os.path.exists(cpath) and os.path.isdir(cpath)
201 def list(self, container_id):
202 cpath = os.path.join(self.dir, container_id)
203 return os.listdir(cpath)
205 def get(self, container_id, target_name, encoding=None):
206 cpath = os.path.join(self.dir, container_id, target_name)
207 if os.path.exists(cpath) and os.path.isfile(cpath):
208 kwargs = {}
209 mode = "rb"
210 if encoding is not None:
211 kwargs = {"encoding" : encoding}
212 mode = "r"
213 f = open(cpath, mode, **kwargs)
214 return f
216 def url(self, container_id, target_name):
217 return "/" + container_id + "/" + target_name
219 def delete_container(self, container_id):
220 if container_id is None:
221 return
222 cpath = os.path.join(self.dir, container_id)
223 if os.path.exists(cpath):
224 shutil.rmtree(cpath)
226 def delete_file(self, container_id, target_name):
227 if target_name is None:
228 return
229 cpath = os.path.join(self.dir, container_id, target_name)
230 if os.path.exists(cpath):
231 if os.path.isfile(cpath):
232 os.remove(cpath)
233 else:
234 shutil.rmtree(cpath)
236 def size(self, container_id, target_name):
237 cpath = os.path.join(self.dir, container_id, target_name)
238 return os.stat(cpath).st_size
241class TempStore(StoreLocal):
242 """
243 ~~->FileStoreTemp:Feature~~
244 """
245 def __init__(self):
246 self.dir = app.config.get("STORE_TMP_DIR")
247 if self.dir is None:
248 raise StoreException("STORE_TMP_DIR is not defined in config")
249 self.buffer_size = app.config.get("STORE_TMP_WRITE_BUFFER_SIZE", 16777216)
251 def path(self, container_id, filename, create_container=False, must_exist=True):
252 container_path = os.path.join(self.dir, container_id)
253 if create_container and not os.path.exists(container_path):
254 os.makedirs(container_path)
255 fpath = os.path.join(self.dir, container_id, filename)
256 if not os.path.exists(fpath) and must_exist:
257 raise StoreException("Unable to create path for container {x}, file {y}".format(x=container_id, y=filename))
258 return fpath
260 def list_container_ids(self):
261 return [x for x in os.listdir(self.dir) if os.path.isdir(os.path.join(self.dir, x))]
264def prune_container(storage, container_id, sort, filter=None, keep=1):
265 action_register = []
267 filelist = storage.list(container_id)
268 #action_register.append("Current cached files (before prune): " + ", ".join(filelist))
270 # filter for the files we care about
271 filtered = []
272 if filter is not None:
273 for fn in filelist:
274 if filter(fn):
275 filtered.append(fn)
276 else:
277 filtered = filelist
278 #action_register.append("Filtered cached files (before prune): " + ", ".join(filelist))
280 if len(filtered) <= keep:
281 # action_register.append("Fewer than {x} files in cache, no further action".format(x=keep))
282 return action_register
284 filtered_sorted = sort(filtered)
285 #action_register.append("Considering files for retention in the following order: " + ", ".join(filtered_sorted))
287 remove = filtered_sorted[keep:]
288 action_register.append("Removed old files: " + ", ".join(remove))
290 for fn in remove:
291 storage.delete_file(container_id, fn)
293 return action_register