Coverage for portality/lib/seamless.py: 68%
670 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-09-06 17:17 +0100
« prev ^ index » next coverage.py v6.4.2, created at 2022-09-06 17:17 +0100
1import locale
2from urllib.parse import urlparse
3from copy import deepcopy
4from datetime import datetime
6###############################################
7## Common coerce functions
8###############################################
10def to_utf8_unicode(val):
11 if isinstance(val, str):
12 return val
13 elif isinstance(val, str):
14 try:
15 return val.decode("utf8", "strict")
16 except UnicodeDecodeError:
17 raise ValueError("Could not decode string")
18 else:
19 return str(val)
22def to_unicode_upper(val):
23 val = to_utf8_unicode(val)
24 return val.upper()
27def to_unicode_lower(val):
28 val = to_utf8_unicode(val)
29 return val.lower()
32def intify(val):
33 # strip any characters that are outside the ascii range - they won't make up the int anyway
34 # and this will get rid of things like strange currency marks
35 if isinstance(val, str):
36 val = val.encode("ascii", errors="ignore")
38 # try the straight cast
39 try:
40 return int(val)
41 except ValueError:
42 pass
44 # could have commas in it, so try stripping them
45 try:
46 return int(val.replace(",", ""))
47 except ValueError:
48 pass
50 # try the locale-specific approach
51 try:
52 return locale.atoi(val)
53 except ValueError:
54 pass
56 raise ValueError("Could not convert string to int: {x}".format(x=val))
58def floatify(val):
59 # strip any characters that are outside the ascii range - they won't make up the float anyway
60 # and this will get rid of things like strange currency marks
61 if isinstance(val, str):
62 val = val.encode("ascii", errors="ignore")
64 # try the straight cast
65 try:
66 return float(val)
67 except ValueError:
68 pass
70 # could have commas in it, so try stripping them
71 try:
72 return float(val.replace(",", ""))
73 except ValueError:
74 pass
76 # try the locale-specific approach
77 try:
78 return locale.atof(val)
79 except ValueError:
80 pass
82 raise ValueError("Could not convert string to float: {x}".format(x=val))
85def to_url(val):
86 if not isinstance(val, str):
87 raise ValueError("Argument passed to to_url was not a string, but type '{t}': '{val}'".format(t=type(val),val=val))
89 val = val.strip()
91 if val == '':
92 return val
94 # parse with urlparse
95 url = urlparse(val)
97 # now check the url has the minimum properties that we require
98 if url.scheme and url.scheme.startswith("http"):
99 return to_utf8_unicode(val)
100 else:
101 raise ValueError("Could not convert string {val} to viable URL".format(val=val))
104def to_bool(val):
105 """Conservative boolean cast - don't cast lists and objects to True, just existing booleans and strings."""
106 if val is None:
107 return None
108 if val is True or val is False:
109 return val
111 if isinstance(val, str):
112 if val.lower() == 'true':
113 return True
114 elif val.lower() == 'false':
115 return False
116 raise ValueError("Could not convert string {val} to boolean. Expecting string to either say 'true' or 'false' (not case-sensitive).".format(val=val))
118 raise ValueError("Could not convert {val} to boolean. Expect either boolean or string.".format(val=val))
121def to_datetime(val):
122 try:
123 datetime.strptime(val, "%Y-%m-%dT%H:%M:%SZ")
124 return val
125 except:
126 raise ValueError("Could not convert string {val} to UTC Datetime".format(val=val))
129def string_canonicalise(canon, allow_fail=False):
130 normalised = {}
131 for a in canon:
132 normalised[a.strip().lower()] = a
134 def sn(val):
135 if val is None:
136 if allow_fail:
137 return None
138 raise ValueError("NoneType not permitted")
140 try:
141 norm = val.strip().lower()
142 except:
143 raise ValueError("Unable to treat value as a string")
145 uc = to_utf8_unicode
146 if norm in normalised:
147 return uc(normalised[norm])
148 if allow_fail:
149 return uc(val)
151 raise ValueError("Unable to canonicalise string")
153 return sn
156class SeamlessException(Exception):
157 def __init__(self, message, *args, **kwargs):
158 self.message = message
159 super(SeamlessException, self).__init__(message, *args, **kwargs)
162class SeamlessMixin(object):
164 __SEAMLESS_STRUCT__ = None
166 __SEAMLESS_COERCE__ = {
167 "unicode": to_utf8_unicode,
168 "unicode_upper" : to_unicode_upper,
169 "unicode_lower" : to_unicode_lower,
170 "integer": intify,
171 "float": floatify,
172 "url": to_url,
173 "bool": to_bool,
174 "datetime" : to_datetime
175 }
177 __SEAMLESS_DEFAULT_COERCE__ = "unicode"
179 __SEAMLESS_PROPERTIES__ = None
181 __SEAMLESS_APPLY_STRUCT_ON_INIT__ = True
182 __SEAMLESS_CHECK_REQUIRED_ON_INIT__ = True
183 __SEAMLESS_SILENT_PRUNE__ = False
184 __SEAMLESS_ALLOW_OTHER_FIELDS__ = False
186 def __init__(self,
187 raw=None, # The raw data
188 struct=None,
189 coerce=None,
190 properties=None,
191 default_coerce=None,
192 apply_struct_on_init=None,
193 check_required_on_init=None,
194 silent_prune=None,
195 allow_other_fields=None,
196 *args, **kwargs
197 ):
199 # set all the working properties
200 self.__seamless_coerce__ = coerce if coerce is not None else self.__SEAMLESS_COERCE__
201 self.__seamless_default_coerce__ = default_coerce if default_coerce is not None else self.__SEAMLESS_DEFAULT_COERCE__
202 self.__seamless_properties__ = properties if properties is not None else self.__SEAMLESS_PROPERTIES__
203 self.__seamless_apply_struct_on_init__ = apply_struct_on_init if apply_struct_on_init is not None else self.__SEAMLESS_APPLY_STRUCT_ON_INIT__
204 self.__seamless_check_required_on_init__ = check_required_on_init if check_required_on_init is not None else self.__SEAMLESS_CHECK_REQUIRED_ON_INIT__
205 self.__seamless_silent_prune__ = silent_prune if silent_prune is not None else self.__SEAMLESS_SILENT_PRUNE__
206 self.__seamless_allow_other_fields__ = allow_other_fields if allow_other_fields is not None else self.__SEAMLESS_ALLOW_OTHER_FIELDS__
208 struct = struct if struct is not None else self.__SEAMLESS_STRUCT__
209 if isinstance(struct, list):
210 struct = Construct.merge(*struct)
211 self.__seamless_struct__ = Construct(struct,
212 self.__seamless_coerce__,
213 self.__seamless_default_coerce__)
215 self.__seamless__ = SeamlessData(raw, struct=self.__seamless_struct__)
217 if (self.__seamless_struct__ is not None and
218 raw is not None and
219 self.__seamless_apply_struct_on_init__):
220 self.__seamless__ = self.__seamless_struct__.construct(self.__seamless__.data,
221 check_required=self.__seamless_check_required_on_init__,
222 silent_prune=self.__seamless_silent_prune__,
223 allow_other_fields=self.__seamless_allow_other_fields__)
225 self.custom_validate()
227 super(SeamlessMixin, self).__init__(*args, **kwargs)
229 def __getattr__(self, name):
231 # workaround to prevent debugger from disconnecting at the deepcopy method
232 # https://stackoverflow.com/questions/32831050/pycharms-debugger-gives-up-when-hitting-copy-deepcopy
233 # if name.startswith("__"):
234 # raise AttributeError
236 if hasattr(self.__class__, name):
237 return object.__getattribute__(self, name)
239 if self.__seamless_properties__ is not None:
240 prop = self.__seamless_properties__.get(name)
241 if prop is not None:
242 path = prop["path"]
243 wrap = prop.get("wrapper")
244 return self.__seamless__.get_property(path, wrap)
246 raise AttributeError('{name} is not set'.format(name=name))
248 def __setattr__(self, name, value, allow_coerce_failure=False):
249 if hasattr(self.__class__, name):
250 return object.__setattr__(self, name, value)
252 if name.startswith("__seamless"):
253 return object.__setattr__(self, name, value)
255 if self.__seamless_properties__ is not None:
256 prop = self.__seamless_properties__.get(name)
257 if prop is not None:
258 path = prop["path"]
259 unwrap = prop.get("unwrapper")
260 wasset = self.__seamless__.set_property(path, value, unwrap, allow_coerce_failure)
261 if wasset:
262 return
264 # fall back to the default approach of allowing any attribute to be set on the object
265 return object.__setattr__(self, name, value)
267 def __deepcopy__(self):
268 # FIXME: should also reflect all the constructor arguments
269 return self.__class__(deepcopy(self.__seamless__.data))
271 def custom_validate(self):
272 """
273 Should be implemented on the higher level
274 """
275 pass
277 def verify_against_struct(self, check_required=True, silent_prune=None, allow_other_fields=None):
279 silent_prune = silent_prune if silent_prune is not None else self.__seamless_silent_prune__
280 allow_other_fields = allow_other_fields if allow_other_fields is not None else self.__seamless_allow_other_fields__
282 if (self.__seamless_struct__ is not None and
283 self.__seamless__ is not None):
284 self.__seamless_struct__.construct(deepcopy(self.__seamless__.data), # use a copy of the data, to avoid messing with any references to the current data
285 check_required=check_required,
286 silent_prune=silent_prune,
287 allow_other_fields=allow_other_fields)
289 def apply_struct(self, check_required=True, silent_prune=None, allow_other_fields=None):
291 silent_prune = silent_prune if silent_prune is not None else self.__seamless_silent_prune__
292 allow_other_fields = allow_other_fields if allow_other_fields is not None else self.__seamless_allow_other_fields__
294 if (self.__seamless_struct__ is not None and
295 self.__seamless__ is not None):
296 self.__seamless__ = self.__seamless_struct__.construct(self.__seamless__.data,
297 check_required=check_required,
298 silent_prune=silent_prune,
299 allow_other_fields=allow_other_fields)
301 def extend_struct(self, struct):
302 self.__seamless_struct__ = Construct.merge(self.__seamless_struct__, struct)
305class SeamlessData(object):
306 def __init__(self, raw=None, struct=None):
307 self.data = raw if raw is not None else {}
308 self._struct = struct
310 def get_single(self, path, coerce=None, default=None, allow_coerce_failure=True):
311 # get the value at the point in the object
312 val = self._get_path(path, default)
314 if coerce is not None and val is not None:
315 # if you want to coerce and there is something to coerce do it
316 try:
317 return self._coerce(val, coerce, accept_failure=allow_coerce_failure)
318 except SeamlessException as e:
319 e.message += "; get_single, path {x}".format(x=path)
320 raise
321 else:
322 # otherwise return the value
323 return val
325 def set_single(self, path, val, coerce=None, allow_coerce_failure=False, allowed_values=None, allowed_range=None,
326 allow_none=True, ignore_none=False, context=""):
328 if val is None and ignore_none:
329 return
331 if val is None and not allow_none:
332 raise SeamlessException("NoneType is not allowed at '{x}'".format(x=context + "." + path))
334 # first see if we need to coerce the value (and don't coerce None)
335 if coerce is not None and val is not None:
336 try:
337 val = self._coerce(val, coerce, accept_failure=allow_coerce_failure)
338 except SeamlessException as e:
339 e.message += "; set_single, path {x}".format(x=context + "." + path)
340 raise
342 if allowed_values is not None and val not in allowed_values:
343 raise SeamlessException("Value '{x}' is not permitted at '{y}'".format(x=val, y=context + "." + path))
345 if allowed_range is not None:
346 lower, upper = allowed_range
347 if (lower is not None and val < lower) or (upper is not None and val > upper):
348 raise SeamlessException("Value '{x}' is outside the allowed range: {l} - {u} at '{y}'".format(x=val, l=lower, u=upper, y=context + "." + path))
350 # now set it at the path point in the object
351 self._set_path(path, val)
353 def delete(self, path, prune=True):
354 parts = path.split(".")
355 context = self.data
357 stack = []
358 for i in range(len(parts)):
359 p = parts[i]
360 if p in context:
361 if i < len(parts) - 1:
362 stack.append(context[p])
363 context = context[p]
364 else:
365 del context[p]
366 if prune and len(stack) > 0:
367 stack.pop() # the last element was just deleted
368 self._prune_stack(stack)
370 def get_list(self, path, coerce=None, by_reference=True, allow_coerce_failure=True, context=""):
371 # get the value at the point in the object
372 val = self._get_path(path, None)
374 # if there is no value and we want to do by reference, then create it, bind it and return it
375 if val is None and by_reference:
376 mylist = []
377 self.set_single(path, mylist)
378 return mylist
380 # otherwise, default is an empty list
381 elif val is None and not by_reference:
382 return []
384 # check that the val is actually a list
385 if not isinstance(val, list):
386 raise SeamlessException("Expecting a list at '{x}' but found '{y}'".format(x=context + "." + path, y=val))
388 # if there is a value, do we want to coerce each of them
389 if coerce is not None:
390 try:
391 coerced = [self._coerce(v, coerce, accept_failure=allow_coerce_failure) for v in val]
392 except SeamlessException as e:
393 e.message += "; get_list, path {x}".format(x=context + "." + path)
394 raise
395 if by_reference:
396 self.set_single(path, coerced)
397 return coerced
398 else:
399 if by_reference:
400 return val
401 else:
402 return deepcopy(val)
404 def set_list(self, path, val, coerce=None, allow_coerce_failure=False, allow_none=True,
405 ignore_none=False, allowed_values=None, context=""):
406 # first ensure that the value is a list
407 if not isinstance(val, list):
408 val = [val]
410 # now carry out the None check
411 # for each supplied value, if it is none, and none is not allowed, raise an error if we do not
412 # plan to ignore the nones.
413 for v in val:
414 if v is None and not allow_none:
415 if not ignore_none:
416 raise SeamlessException("NoneType is not allowed at '{x}'".format(x=context + "." + path))
417 if allowed_values is not None and v not in allowed_values:
418 raise SeamlessException("Value '{x}' is not permitted at '{y}'".format(x=val, y=context + "." + path))
420 # now coerce each of the values, stripping out Nones if necessary
421 try:
422 val = [self._coerce(v, coerce, accept_failure=allow_coerce_failure) for v in val if v is not None or not ignore_none]
423 except SeamlessException as e:
424 e.message += "; set_list, path {x}".format(x=context + "." + path)
425 raise
427 # check that the cleaned array isn't empty, and if it is behave appropriately
428 if len(val) == 0:
429 # this is equivalent to a None, so we need to decide what to do
430 if ignore_none:
431 # if we are ignoring nones, just do nothing
432 return
433 elif not allow_none:
434 # if we are not ignoring nones, and not allowing them, raise an error
435 raise SeamlessException("Empty array not permitted at '{x}'".format(x=context + "." + path))
437 # now set it on the path
438 self._set_path(path, val)
440 def add_to_list(self, path, val, coerce=None, allow_coerce_failure=False, allow_none=False,
441 ignore_none=True, unique=False, allowed_values=None, context=""):
442 if val is None and ignore_none:
443 return
445 if val is None and not allow_none:
446 raise SeamlessException("NoneType is not allowed in list at '{x}'".format(x=context + "." + path))
447 if allowed_values is not None and val not in allowed_values:
448 raise SeamlessException("Value '{x}' is not permitted at '{y}'".format(x=val, y=context + "." + path))
450 # first coerce the value
451 if coerce is not None:
452 try:
453 val = self._coerce(val, coerce, accept_failure=allow_coerce_failure)
454 except SeamlessException as e:
455 e.message += "; add_to_list, path {x}".format(x=context + "." + path)
456 raise
457 current = self.get_list(path, by_reference=True, context=context)
459 # if we require the list to be unique, check for the value first
460 if unique:
461 if val in current:
462 return
464 # otherwise, append
465 current.append(val)
467 def delete_from_list(self, path, val=None, matchsub=None, prune=True, apply_struct_on_matchsub=True):
468 """
469 Note that matchsub will be coerced with the struct if it exists, to ensure
470 that the match is done correctly
472 :param path:
473 :param val:
474 :param matchsub:
475 :param prune:
476 :return:
477 """
478 l = self.get_list(path)
480 removes = []
481 i = 0
482 for entry in l:
483 if val is not None:
484 if entry == val:
485 removes.append(i)
486 elif matchsub is not None:
487 # attempt to coerce the sub
488 if apply_struct_on_matchsub:
489 try:
490 type, struct, instructions = self._struct.lookup(path)
491 if struct is not None:
492 matchsub = struct.construct(matchsub, struct).data
493 except:
494 pass
496 matches = 0
497 for k, v in matchsub.items():
498 if entry.get(k) == v:
499 matches += 1
500 if matches == len(list(matchsub.keys())):
501 removes.append(i)
502 i += 1
504 removes.sort(reverse=True)
505 for r in removes:
506 del l[r]
508 if len(l) == 0 and prune:
509 self.delete(path, prune)
511 def set_with_struct(self, path, val, check_required=True, silent_prune=False):
512 typ, substruct, instructions = self._struct.lookup(path)
514 if typ == "field":
515 coerce_name, coerce_fn = self._struct.get_coerce(instructions)
516 if coerce_fn is None:
517 raise SeamlessException("No coersion function defined for type '{x}' at '{c}'".format(x=coerce_name, c=path))
518 kwargs = self._struct.kwargs(typ, "set", instructions)
519 self.set_single(path, val, coerce=coerce_fn, **kwargs)
520 elif typ == "list":
521 if not isinstance(val, list):
522 val = [val]
523 if substruct is not None:
524 val = [substruct.construct(x, check_required=check_required, silent_prune=silent_prune).data for x in val]
525 kwargs = self._struct.kwargs(typ, "set", instructions)
526 coerce_fn = None
527 if instructions.get("contains") != "object":
528 coerce_name, coerce_fn = self._struct.get_coerce(instructions)
529 self.set_list(path, val, coerce=coerce_fn, **kwargs)
530 elif typ == "object" or typ == "struct":
531 if substruct is not None:
532 val = substruct.construct(val, check_required=check_required, silent_prune=silent_prune).data
533 self.set_single(path, val)
534 else:
535 raise SeamlessException("Attempted to set_with_struct on path '{x}' but no such path exists in the struct".format(x=path))
537 def add_to_list_with_struct(self, path, val):
538 type, struct, instructions = self._struct.lookup(path)
539 if type != "list":
540 raise SeamlessException("Attempt to add to list '{x}' failed - it is not a list element".format(x=path))
541 if struct is not None:
542 val = struct.construct(val).data
543 kwargs = Construct.kwargs(type, "set", instructions)
544 self.add_to_list(path, val, **kwargs)
546 def get_property(self, path, wrapper=None):
547 if wrapper is None:
548 wrapper = lambda x : x
550 # pull the object from the structure, to find out what kind of retrieve it needs
551 # (if there is a struct)
552 type, substruct, instructions = None, None, None
553 if self._struct:
554 type, substruct, instructions = self._struct.lookup(path)
556 if type is None:
557 # if there is no struct, or no object mapping was found, try to pull the path
558 # as a single node (may be a field, list or dict, we'll find out in a mo)
559 val = self.get_single(path)
561 # if a wrapper is supplied, wrap it
562 if isinstance(val, list):
563 return [wrapper(v) for v in val]
564 else:
565 return wrapper(val)
567 if instructions is None:
568 instructions = {}
570 # if the struct contains a reference to the path, always return something, even if it is None - don't raise an AttributeError
571 kwargs = self._struct.kwargs(type, "get", instructions)
572 coerce_name, coerce_fn = self._struct.get_coerce(instructions)
573 if coerce_fn is not None:
574 kwargs["coerce"] = coerce_fn
576 if type == "field" or type == "object":
577 return wrapper(self.get_single(path, **kwargs))
578 elif type == "list":
579 l = self.get_list(path, **kwargs)
580 return [wrapper(o) for o in l]
582 return None
584 def set_property(self, path, value, unwrapper=None, allow_coerce_failure=False):
585 if unwrapper is None:
586 unwrapper = lambda x : x
588 # pull the object from the structure, to find out what kind of retrieve it needs
589 # (if there is a struct)
590 type, substruct, instructions = None, None, None
591 if self._struct:
592 type, substruct, instructions = self._struct.lookup(path)
594 # if no type is found, then this means that either the struct was undefined, or the
595 # path did not point to a valid point in the struct. In the case that the struct was
596 # defined, this means the property is trying to set something outside the struct, which
597 # isn't allowed. So, only set types which are None against objects which don't define
598 # the struct.
599 if type is None:
600 if self._struct is None:
601 if isinstance(value, list):
602 value = [unwrapper(v) for v in value]
603 self.set_list(path, value, allow_coerce_failure)
604 else:
605 value = unwrapper(value)
606 self.set_single(path, value, allow_coerce_failure)
608 return True
609 else:
610 return False
612 if type == "field" or type == "object":
613 value = unwrapper(value)
614 if type == "list":
615 value = [unwrapper(v) for v in value]
617 try:
618 self.set_with_struct(path, value)
619 return
620 except SeamlessException:
621 return False
623 def _get_path(self, path, default):
624 parts = path.split(".")
625 context = self.data
627 for i in range(len(parts)):
628 p = parts[i]
629 d = {} if i < len(parts) - 1 else default
630 context = context.get(p, d)
631 return context
633 def _set_path(self, path, val):
634 parts = path.split(".")
635 context = self.data
637 for i in range(len(parts)):
638 p = parts[i]
640 if p not in context and i < len(parts) - 1:
641 context[p] = {}
642 context = context[p]
643 elif p in context and i < len(parts) - 1:
644 context = context[p]
645 else:
646 context[p] = val
648 def _coerce(self, val, coerce, accept_failure=False):
649 if coerce is None:
650 return val
651 try:
652 return coerce(val)
653 except (ValueError, TypeError):
654 if accept_failure:
655 return val
656 raise SeamlessException("Coerce with '{x}' failed on '{y}' of type '{z}'".format(x=coerce, y=val, z=type(val)))
658 def _prune_stack(self, stack):
659 while len(stack) > 0:
660 context = stack.pMax.Pop()
661 todelete = []
662 for k, v in context.items():
663 if isinstance(v, dict) and len(list(v.keys())) == 0:
664 todelete.append(k)
665 for d in todelete:
666 del context[d]
669class Construct(object):
670 def __init__(self, definition, coerce, default_coerce):
671 if isinstance(definition, Construct):
672 definition = definition._definition
674 self._definition = definition
675 self._coerce = coerce
676 self._default_coerce = default_coerce
678 @classmethod
679 def merge(cls, target, *args):
680 if not isinstance(target, Construct):
681 merged = Construct(deepcopy(target), None, None)
682 else:
683 merged = target
685 for source in args:
686 if not isinstance(source, Construct):
687 source = Construct(source, None, None)
689 for field, instructions in source.fields:
690 merged.add_field(field, instructions, overwrite=False)
692 for obj in source.objects:
693 merged.add_object(obj)
695 for field, instructions in source.lists:
696 merged.add_list(field, instructions, overwrite=False)
698 for r in source._definition.get("required", []):
699 merged.add_required(r)
701 for field, struct in source._definition.get("structs", {}).items():
702 merged.add_substruct(field, struct, mode="merge")
704 return merged
706 @classmethod
707 def kwargs(cls, type, dir, instructions):
708 # if there are no instructions there are no kwargs
709 if instructions is None:
710 return {}
712 # take a copy of the instructions that we can modify
713 kwargs = deepcopy(instructions)
715 # remove the known arguments for the field type
716 if type == "field":
717 if "coerce" in kwargs:
718 del kwargs["coerce"]
720 elif type == "list":
721 if "coerce" in kwargs:
722 del kwargs["coerce"]
723 if "contains" in kwargs:
724 del kwargs["contains"]
726 nk = {}
727 if dir == "set":
728 for k, v in kwargs.items():
729 # basically everything is a "set" argument unless explicitly stated to be a "get" argument
730 if not k.startswith("get__"):
731 if k.startswith("set__"): # if it starts with the set__ prefix, remove it
732 k = k[5:]
733 nk[k] = v
734 elif dir == "get":
735 for k, v in kwargs.items():
736 # must start with "get" argument
737 if k.startswith("get__"):
738 nk[k[5:]] = v
740 return nk
742 @property
743 def raw(self):
744 return self._definition
746 @property
747 def required(self):
748 return self._definition.get("required", [])
750 def add_required(self, field):
751 if "required" not in self._definition:
752 self._definition["required"] = []
753 if field not in self._definition["required"]:
754 self._definition["required"].append(field)
756 @property
757 def allowed(self):
758 return list(self._definition.get("fields", {}).keys()) + \
759 self._definition.get("objects", []) + \
760 list(self._definition.get("lists", {}).keys())
762 @property
763 def objects(self):
764 return self._definition.get("objects", [])
766 def add_object(self, object_name):
767 if "objects" not in self._definition:
768 self._definition["objects"] = []
769 if object_name not in self._definition["objects"]:
770 self._definition["objects"].append(object_name)
772 @property
773 def substructs(self):
774 return self._definition.get("structs", {})
776 def substruct(self, field):
777 s = self.substructs.get(field)
778 if s is None:
779 return None
780 return Construct(s, self._coerce, self._default_coerce)
782 def add_substruct(self, field, struct, mode="merge"):
783 if "structs" not in self._definition:
784 self._definition["structs"] = {}
785 if mode == "overwrite" or field not in self._definition["structs"]:
786 self._definition["structs"][field] = deepcopy(struct)
787 else:
788 # recursively merge
789 self._definition["structs"][field] = Construct.merge(self._definition["structs"][field], struct).raw
791 @property
792 def fields(self):
793 return self._definition.get("fields", {}).items()
795 def field_instructions(self, field):
796 return self._definition.get("fields", {}).get(field)
798 def add_field(self, field_name, instructions, overwrite=False):
799 if "fields" not in self._definition:
800 self._definition["fields"] = {}
801 if overwrite or field_name not in self._definition["fields"]:
802 self._definition["fields"][field_name] = deepcopy(instructions)
804 @property
805 def lists(self):
806 return self._definition.get("lists", {}).items()
808 @property
809 def list_names(self):
810 return self._definition.get("lists", {}).keys()
812 def list_instructions(self, field):
813 return self._definition.get("lists", {}).get(field)
815 def add_list(self, list_name, instructions, overwrite=False):
816 if "lists" not in self._definition:
817 self._definition["lists"] = {}
818 if overwrite or list_name not in self._definition["lists"]:
819 self._definition["lists"][list_name] = deepcopy(instructions)
821 def get_coerce(self, instructions):
822 coerce_name = instructions.get("coerce", self._default_coerce)
823 return coerce_name, self._coerce.get(coerce_name)
825 def get(self, elem, default=None):
826 if elem in self._definition:
827 return self._definition.get(elem)
828 else:
829 return default
831 def lookup(self, path):
832 bits = path.split(".")
834 # if there's more than one path element, we will need to recurse
835 if len(bits) > 1:
836 # it has to be an object, in order for the path to still have multiple
837 # segments
838 if bits[0] not in self.objects:
839 return None, None, None
840 substruct = self.substruct(bits[0])
841 return substruct.lookup(".".join(bits[1:]))
842 elif len(bits) == 1:
843 # first check the fields
844 instructions = self.field_instructions(bits[0])
845 if instructions is not None:
846 return "field", None, instructions
848 # then check the lists
849 instructions = self.list_instructions(bits[0])
850 if instructions is not None:
851 substruct = self.substruct(bits[0])
852 return "list", substruct, instructions
854 # then check the objects
855 if bits[0] in self.objects:
856 substruct = self.substruct(bits[0])
857 return "struct", substruct, instructions
859 return None, None, None
861 def construct(self, obj, check_required=True, silent_prune=False, allow_other_fields=False):
863 def recurse(obj, struct, context):
864 if obj is None:
865 return None
866 if not isinstance(obj, dict):
867 raise SeamlessException("Expected a dict at '{c}' but found something else instead".format(c=context))
869 keyset = obj.keys()
871 # if we are checking required fields, then check them
872 # FIXME: might be sensible to move this out to a separate phase, independent of constructing
873 if check_required:
874 for r in struct.required:
875 if r not in keyset:
876 raise SeamlessException("Field '{r}' is required but not present at '{c}'".format(r=r, c=context))
878 # check that there are no fields that are not allowed
879 # Note that since the construct mechanism copies fields explicitly, silent_prune just turns off this
880 # check
881 if not allow_other_fields and not silent_prune:
882 allowed = struct.allowed
883 for k in keyset:
884 if k not in allowed:
885 c = context if context != "" else "root"
886 raise SeamlessException("Field '{k}' is not permitted at '{c}'".format(k=k, c=c))
888 # make a SeamlessData instance for gathering all the new data
889 constructed = SeamlessData(struct=struct)
891 # now check all the fields
892 for field_name, instructions in struct.fields:
893 val = obj.get(field_name)
894 if val is None:
895 continue
896 typ, substruct, instructions = struct.lookup(field_name)
897 if instructions is None:
898 raise SeamlessException("No instruction set defined for field at '{x}'".format(x=context + field_name))
899 coerce_name, coerce_fn = struct.get_coerce(instructions)
900 if coerce_fn is None:
901 raise SeamlessException("No coerce function defined for type '{x}' at '{c}'".format(x=coerce_name, c=context + field_name))
902 kwargs = struct.kwargs(typ, "set", instructions)
903 constructed.set_single(field_name, val, coerce=coerce_fn, context=context, **kwargs)
905 # next check all the objects (which will involve a recursive call to this function)
906 for field_name in struct.objects:
907 val = obj.get(field_name)
908 if val is None:
909 continue
910 if type(val) != dict:
911 raise SeamlessException("Expected dict at '{x}' but found '{y}'".format(x=context + field_name, y=type(val)))
913 typ, substruct, instructions = struct.lookup(field_name)
915 if substruct is None:
916 # this is the lowest point at which we have instructions, so just accept the data structure as-is
917 # (taking a deep copy to destroy any references)
918 constructed.set_single(field_name, deepcopy(val))
919 else:
920 # we need to recurse further down
921 beneath = recurse(val, substruct, context=context + field_name + ".")
923 # what we get back is the correct sub-data structure, which we can then store
924 constructed.set_single(field_name, beneath)
926 # now check all the lists
927 for field_name, instructions in struct.lists:
928 vals = obj.get(field_name)
929 if vals is None:
930 continue
931 if not isinstance(vals, list):
932 raise SeamlessException("Expecting list at '{x}' but found something else '{y}'".format(x=context + field_name, y=type(val)))
934 typ, substruct, instructions = struct.lookup(field_name)
935 kwargs = struct.kwargs(typ, "set", instructions)
937 contains = instructions.get("contains")
938 if contains == "field":
939 # coerce all the values in the list
940 coerce_name, coerce_fn = struct.get_coerce(instructions)
941 if coerce_fn is None:
942 raise SeamlessException("No coerce function defined for type '{x}' at '{c}'".format(x=coerce_name, c=context + field_name))
944 for i in range(len(vals)):
945 val = vals[i]
946 constructed.add_to_list(field_name, val, coerce=coerce_fn, **kwargs)
948 elif contains == "object":
949 # for each object in the list, send it for construction
950 for i in range(len(vals)):
951 val = vals[i]
953 if type(val) != dict:
954 print("Expected dict at '{x}[{p}]' but got '{y}'".format(x=context + field_name, y=type(val), p=i))
955 raise SeamlessException("Expected dict at '{x}[{p}]' but got '{y}'".format(x=context + field_name, y=type(val), p=i))
957 substruct = struct.substruct(field_name)
958 if substruct is None:
959 constructed.add_to_list(field_name, deepcopy(val))
960 else:
961 # we need to recurse further down
962 beneath = recurse(val, substruct, context=context + field_name + "[" + str(i) + "].")
964 # what we get back is the correct sub-data structure, which we can then store
965 constructed.add_to_list(field_name, beneath)
967 else:
968 raise SeamlessException("Cannot understand structure where list '{x}' elements contain '{y}'".format(x=context + field_name, y=contains))
970 # finally, if we allow other fields, make sure that they come across too
971 if allow_other_fields:
972 known = struct.allowed
973 for k, v in obj.items():
974 if k not in known:
975 constructed.set_single(k, v)
977 # ensure any external references to the object persist
978 obj.clear()
979 obj.update(constructed.data)
980 return obj
982 ready = recurse(obj, self, "[root]")
983 return SeamlessData(ready, struct=self)
985 def validate(self):
987 def recurse(struct, context):
988 # check that only the allowed keys are present
989 keys = struct.raw.keys()
990 for k in keys:
991 if k not in ["fields", "objects", "lists", "required", "structs"]:
992 raise SeamlessException("Key '{x}' present in struct at '{y}', but is not permitted".format(x=k, y=context))
994 # now go through and make sure the fields are the right shape:
995 for field_name, instructions in struct.fields:
996 for k,v in instructions.items():
997 if not isinstance(v, list) and not isinstance(v, str) and not isinstance(v, bool):
998 raise SeamlessException("Argument '{a}' in field '{b}' at '{c}' is not a string, list or boolean".format(a=k, b=field_name, c=context))
1000 # then make sure the objects are ok
1001 for o in struct.objects:
1002 if not isinstance(o, str):
1003 raise SeamlessException("There is a non-string value in the object list at '{y}'".format(y=context))
1005 # make sure the lists are correct
1006 for field_name, instructions in struct.lists:
1007 contains = instructions.get("contains")
1008 if contains is None:
1009 raise SeamlessException("No 'contains' argument in list definition for field '{x}' at '{y}'".format(x=field_name, y=context))
1010 if contains not in ["object", "field"]:
1011 raise SeamlessException("'contains' argument in list '{x}' at '{y}' contains illegal value '{z}'".format(x=field_name, y=context, z=contains))
1012 for k,v in instructions.items():
1013 if not isinstance(v, list) and not isinstance(v, str) and not isinstance(v, bool):
1014 raise SeamlessException("Argument '{a}' in list '{b}' at '{c}' is not a string, list or boolean".format(a=k, b=field_name, c=context))
1016 # make sure the requireds are correct
1017 for o in struct.required:
1018 if not isinstance(o, str):
1019 raise SeamlessException("There is a non-string value in the required list at '{y}'".format(y=context))
1021 # now do the structs, which will involve some recursion
1022 substructs = struct.substructs
1024 # first check that there are no previously unknown keys in there
1025 possibles = struct.objects + list(struct.list_names)
1026 for s in substructs:
1027 if s not in possibles:
1028 raise SeamlessException("struct contains key '{a}' which is not listed in object or list definitions at '{x}'".format(a=s, x=context))
1030 # now recurse into each struct
1031 for k, v in substructs.items():
1032 nc = context
1033 if nc == "":
1034 nc = k
1035 else:
1036 nc += "." + k
1037 recurse(Construct(v, None, None), context=nc)
1039 return True
1041 recurse(self, "[root]")