Coverage for portality / lib / seamless.py: 70%
696 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
1import locale
2from copy import deepcopy
3from typing import Type
4from urllib.parse import urlparse
6from portality.constants import ConstantList
7from portality.lib import dates
10###############################################
11## Common coerce functions
12###############################################
14def to_utf8_unicode(val):
15 if isinstance(val, str):
16 return val
17 elif isinstance(val, str):
18 try:
19 return val.decode("utf8", "strict")
20 except UnicodeDecodeError:
21 raise ValueError("Could not decode string")
22 else:
23 return str(val)
26def to_unicode_upper(val):
27 val = to_utf8_unicode(val)
28 return val.upper()
31def to_unicode_lower(val):
32 val = to_utf8_unicode(val)
33 return val.lower()
36def intify(val):
37 # strip any characters that are outside the ascii range - they won't make up the int anyway
38 # and this will get rid of things like strange currency marks
39 if isinstance(val, str):
40 val = val.encode("ascii", errors="ignore")
42 # try the straight cast
43 try:
44 return int(val)
45 except ValueError:
46 pass
48 # could have commas in it, so try stripping them
49 try:
50 return int(val.replace(",", ""))
51 except ValueError:
52 pass
54 # try the locale-specific approach
55 try:
56 return locale.atoi(val)
57 except ValueError:
58 pass
60 raise ValueError("Could not convert string to int: {x}".format(x=val))
62def floatify(val):
63 # strip any characters that are outside the ascii range - they won't make up the float anyway
64 # and this will get rid of things like strange currency marks
65 if isinstance(val, str):
66 val = val.encode("ascii", errors="ignore")
68 # try the straight cast
69 try:
70 return float(val)
71 except ValueError:
72 pass
74 # could have commas in it, so try stripping them
75 try:
76 return float(val.replace(",", ""))
77 except ValueError:
78 pass
80 # try the locale-specific approach
81 try:
82 return locale.atof(val)
83 except ValueError:
84 pass
86 raise ValueError("Could not convert string to float: {x}".format(x=val))
89def to_url(val):
90 if not isinstance(val, str):
91 raise ValueError("Argument passed to to_url was not a string, but type '{t}': '{val}'".format(t=type(val),val=val))
93 val = val.strip()
95 if val == '':
96 return val
98 # parse with urlparse
99 url = urlparse(val)
101 # now check the url has the minimum properties that we require
102 if url.scheme and url.scheme.startswith("http"):
103 return to_utf8_unicode(val)
104 else:
105 raise ValueError("Could not convert string {val} to viable URL".format(val=val))
108def to_bool(val):
109 """Conservative boolean cast - don't cast lists and objects to True, just existing booleans and strings."""
110 if val is None:
111 return None
112 if val is True or val is False:
113 return val
115 if isinstance(val, str):
116 if val.lower() == 'true':
117 return True
118 elif val.lower() == 'false':
119 return False
120 raise ValueError("Could not convert string {val} to boolean. Expecting string to either say 'true' or 'false' (not case-sensitive).".format(val=val))
122 raise ValueError("Could not convert {val} to boolean. Expect either boolean or string.".format(val=val))
125def to_datetime(val):
126 try:
127 return dates.parse(val)
128 except:
129 raise ValueError("Could not convert string {val} to UTC Datetime".format(val=val))
132def string_canonicalise(canon, allow_fail=False):
133 normalised = {}
134 for a in canon:
135 normalised[a.strip().lower()] = a
137 def sn(val):
138 if val is None:
139 if allow_fail:
140 return None
141 raise ValueError("NoneType not permitted")
143 try:
144 norm = val.strip().lower()
145 except:
146 raise ValueError("Unable to treat value as a string")
148 uc = to_utf8_unicode
149 if norm in normalised:
150 return uc(normalised[norm])
151 if allow_fail:
152 return uc(val)
154 raise ValueError("Unable to canonicalise string")
156 return sn
159class SeamlessException(Exception):
160 def __init__(self, message, *args, **kwargs):
161 self.message = message
162 super(SeamlessException, self).__init__(message, *args, **kwargs)
165class SeamlessMixin(object):
166 """
168 remember merge data property if you Mixin with DomainObject
170 ------
171 Example
172 ```python
173 class DatalogJournalAdded(SeamlessMixin, DomainObject):
174 ...
176 @property
177 def data(self):
178 return self.__seamless__.data
179 ```
181 remember to setup __init__ like below, otherwise your object wrap in iterable will not work
183 ----
184 ```python
185 def __init__(self, **kwargs):
186 super(DatalogJournalAdded, self).__init__(raw=kwargs)
188 ```
192 """
194 __SEAMLESS_STRUCT__ = None
196 __SEAMLESS_COERCE__ = {
197 "unicode": to_utf8_unicode,
198 "unicode_upper" : to_unicode_upper,
199 "unicode_lower" : to_unicode_lower,
200 "integer": intify,
201 "float": floatify,
202 "url": to_url,
203 "bool": to_bool,
204 "datetime" : to_datetime
205 }
207 __SEAMLESS_DEFAULT_COERCE__ = "unicode"
209 __SEAMLESS_PROPERTIES__ = None
211 __SEAMLESS_APPLY_STRUCT_ON_INIT__ = True
212 __SEAMLESS_CHECK_REQUIRED_ON_INIT__ = True
213 __SEAMLESS_SILENT_PRUNE__ = False
214 __SEAMLESS_ALLOW_OTHER_FIELDS__ = False
216 def __init__(self,
217 raw=None, # The raw data
218 struct=None,
219 coerce=None,
220 properties=None,
221 default_coerce=None,
222 apply_struct_on_init=None,
223 check_required_on_init=None,
224 silent_prune=None,
225 allow_other_fields=None,
226 *args, **kwargs
227 ):
229 # set all the working properties
230 self.__seamless_coerce__ = coerce if coerce is not None else self.__SEAMLESS_COERCE__
231 self.__seamless_default_coerce__ = default_coerce if default_coerce is not None else self.__SEAMLESS_DEFAULT_COERCE__
232 self.__seamless_properties__ = properties if properties is not None else self.__SEAMLESS_PROPERTIES__
233 self.__seamless_apply_struct_on_init__ = apply_struct_on_init if apply_struct_on_init is not None else self.__SEAMLESS_APPLY_STRUCT_ON_INIT__
234 self.__seamless_check_required_on_init__ = check_required_on_init if check_required_on_init is not None else self.__SEAMLESS_CHECK_REQUIRED_ON_INIT__
235 self.__seamless_silent_prune__ = silent_prune if silent_prune is not None else self.__SEAMLESS_SILENT_PRUNE__
236 self.__seamless_allow_other_fields__ = allow_other_fields if allow_other_fields is not None else self.__SEAMLESS_ALLOW_OTHER_FIELDS__
238 struct = struct if struct is not None else self.__SEAMLESS_STRUCT__
239 if isinstance(struct, list):
240 struct = Construct.merge(*struct)
241 self.__seamless_struct__ = Construct(struct,
242 self.__seamless_coerce__,
243 self.__seamless_default_coerce__)
245 self.__seamless__ = SeamlessData(raw, struct=self.__seamless_struct__)
247 if (self.__seamless_struct__ is not None and
248 raw is not None and
249 self.__seamless_apply_struct_on_init__):
250 self.__seamless__ = self.__seamless_struct__.construct(self.__seamless__.data,
251 check_required=self.__seamless_check_required_on_init__,
252 silent_prune=self.__seamless_silent_prune__,
253 allow_other_fields=self.__seamless_allow_other_fields__)
255 self.custom_validate()
257 super(SeamlessMixin, self).__init__(*args, **kwargs)
259 def __getattr__(self, name):
261 # workaround to prevent debugger from disconnecting at the deepcopy method
262 # https://stackoverflow.com/questions/32831050/pycharms-debugger-gives-up-when-hitting-copy-deepcopy
263 # if name.startswith("__"):
264 # raise AttributeError
266 if hasattr(self.__class__, name):
267 return object.__getattribute__(self, name)
269 if self.__seamless_properties__ is not None:
270 prop = self.__seamless_properties__.get(name)
271 if prop is not None:
272 path = prop["path"]
273 wrap = prop.get("wrapper")
274 return self.__seamless__.get_property(path, wrap)
276 raise AttributeError('{name} is not set'.format(name=name))
278 def __setattr__(self, name, value, allow_coerce_failure=False):
279 if hasattr(self.__class__, name):
280 return object.__setattr__(self, name, value)
282 if name.startswith("__seamless"):
283 return object.__setattr__(self, name, value)
285 if self.__seamless_properties__ is not None:
286 prop = self.__seamless_properties__.get(name)
287 if prop is not None:
288 path = prop["path"]
289 unwrap = prop.get("unwrapper")
290 wasset = self.__seamless__.set_property(path, value, unwrap, allow_coerce_failure)
291 if wasset:
292 return
294 # fall back to the default approach of allowing any attribute to be set on the object
295 return object.__setattr__(self, name, value)
297 def __deepcopy__(self):
298 # FIXME: should also reflect all the constructor arguments
299 return self.__class__(deepcopy(self.__seamless__.data))
301 def custom_validate(self):
302 """
303 Should be implemented on the higher level
304 """
305 pass
307 def verify_against_struct(self, check_required=True, silent_prune=None, allow_other_fields=None):
309 silent_prune = silent_prune if silent_prune is not None else self.__seamless_silent_prune__
310 allow_other_fields = allow_other_fields if allow_other_fields is not None else self.__seamless_allow_other_fields__
312 if (self.__seamless_struct__ is not None and
313 self.__seamless__ is not None):
314 self.__seamless_struct__.construct(deepcopy(self.__seamless__.data), # use a copy of the data, to avoid messing with any references to the current data
315 check_required=check_required,
316 silent_prune=silent_prune,
317 allow_other_fields=allow_other_fields)
319 def apply_struct(self, check_required=True, silent_prune=None, allow_other_fields=None):
321 silent_prune = silent_prune if silent_prune is not None else self.__seamless_silent_prune__
322 allow_other_fields = allow_other_fields if allow_other_fields is not None else self.__seamless_allow_other_fields__
324 if (self.__seamless_struct__ is not None and
325 self.__seamless__ is not None):
326 self.__seamless__ = self.__seamless_struct__.construct(self.__seamless__.data,
327 check_required=check_required,
328 silent_prune=silent_prune,
329 allow_other_fields=allow_other_fields)
331 def extend_struct(self, struct):
332 self.__seamless_struct__ = Construct.merge(self.__seamless_struct__, struct)
335class SeamlessData(object):
336 def __init__(self, raw=None, struct=None):
337 self.data = raw if raw is not None else {}
338 self._struct = struct
340 def get_single(self, path, coerce=None, default=None, allow_coerce_failure=True):
341 # get the value at the point in the object
342 val = self._get_path(path, default)
344 if coerce is not None and val is not None:
345 # if you want to coerce and there is something to coerce do it
346 try:
347 return self._coerce(val, coerce, accept_failure=allow_coerce_failure)
348 except SeamlessException as e:
349 e.message += "; get_single, path {x}".format(x=path)
350 raise
351 else:
352 # otherwise return the value
353 return val
355 def set_single(self, path, val, coerce=None, allow_coerce_failure=False, allowed_values=None, allowed_range=None,
356 allow_none=True, ignore_none=False, context=""):
358 if val is None and ignore_none:
359 return
361 if val is None and not allow_none:
362 raise SeamlessException("NoneType is not allowed at '{x}'".format(x=context + "." + path))
364 # first see if we need to coerce the value (and don't coerce None)
365 if coerce is not None and val is not None:
366 try:
367 val = self._coerce(val, coerce, accept_failure=allow_coerce_failure)
368 except SeamlessException as e:
369 e.message += "; set_single, path {x}".format(x=context + "." + path)
370 raise
372 if allowed_values is not None and val not in allowed_values:
373 raise SeamlessException("Value '{x}' is not permitted at '{y}'".format(x=val, y=context + "." + path))
375 if allowed_range is not None:
376 lower, upper = allowed_range
377 if (lower is not None and val < lower) or (upper is not None and val > upper):
378 raise SeamlessException("Value '{x}' is outside the allowed range: {l} - {u} at '{y}'".format(x=val, l=lower, u=upper, y=context + "." + path))
380 # now set it at the path point in the object
381 self._set_path(path, val)
383 def delete(self, path, prune=True):
384 parts = path.split(".")
385 context = self.data
387 stack = []
388 for i in range(len(parts)):
389 p = parts[i]
390 if p in context:
391 if i < len(parts) - 1:
392 stack.append(context[p])
393 context = context[p]
394 else:
395 del context[p]
396 if prune and len(stack) > 0:
397 stack.pop() # the last element was just deleted
398 self._prune_stack(stack)
400 def get_list(self, path, coerce=None, by_reference=True, allow_coerce_failure=True, context=""):
401 # get the value at the point in the object
402 val = self._get_path(path, None)
404 # if there is no value and we want to do by reference, then create it, bind it and return it
405 if val is None and by_reference:
406 mylist = []
407 self.set_single(path, mylist)
408 return mylist
410 # otherwise, default is an empty list
411 elif val is None and not by_reference:
412 return []
414 # check that the val is actually a list
415 if not isinstance(val, list):
416 raise SeamlessException("Expecting a list at '{x}' but found '{y}'".format(x=context + "." + path, y=val))
418 # if there is a value, do we want to coerce each of them
419 if coerce is not None:
420 try:
421 coerced = [self._coerce(v, coerce, accept_failure=allow_coerce_failure) for v in val]
422 except SeamlessException as e:
423 e.message += "; get_list, path {x}".format(x=context + "." + path)
424 raise
425 if by_reference:
426 self.set_single(path, coerced)
427 return coerced
428 else:
429 if by_reference:
430 return val
431 else:
432 return deepcopy(val)
434 def set_list(self, path, val, coerce=None, allow_coerce_failure=False, allow_none=True,
435 ignore_none=False, allowed_values=None, context=""):
436 # first ensure that the value is a list
437 if not isinstance(val, list):
438 val = [val]
440 # now carry out the None check
441 # for each supplied value, if it is none, and none is not allowed, raise an error if we do not
442 # plan to ignore the nones.
443 for v in val:
444 if v is None and not allow_none:
445 if not ignore_none:
446 raise SeamlessException("NoneType is not allowed at '{x}'".format(x=context + "." + path))
447 if allowed_values is not None and v not in allowed_values:
448 raise SeamlessException("Value '{x}' is not permitted at '{y}'".format(x=val, y=context + "." + path))
450 # now coerce each of the values, stripping out Nones if necessary
451 try:
452 val = [self._coerce(v, coerce, accept_failure=allow_coerce_failure) for v in val if v is not None or not ignore_none]
453 except SeamlessException as e:
454 e.message += "; set_list, path {x}".format(x=context + "." + path)
455 raise
457 # check that the cleaned array isn't empty, and if it is behave appropriately
458 if len(val) == 0:
459 # this is equivalent to a None, so we need to decide what to do
460 if ignore_none:
461 # if we are ignoring nones, just do nothing
462 return
463 elif not allow_none:
464 # if we are not ignoring nones, and not allowing them, raise an error
465 raise SeamlessException("Empty array not permitted at '{x}'".format(x=context + "." + path))
467 # now set it on the path
468 self._set_path(path, val)
470 def add_to_list(self, path, val, coerce=None, allow_coerce_failure=False, allow_none=False,
471 ignore_none=True, unique=False, allowed_values=None, context=""):
472 if val is None and ignore_none:
473 return
475 if val is None and not allow_none:
476 raise SeamlessException("NoneType is not allowed in list at '{x}'".format(x=context + "." + path))
477 if allowed_values is not None and val not in allowed_values:
478 raise SeamlessException("Value '{x}' is not permitted at '{y}'".format(x=val, y=context + "." + path))
480 # first coerce the value
481 if coerce is not None:
482 try:
483 val = self._coerce(val, coerce, accept_failure=allow_coerce_failure)
484 except SeamlessException as e:
485 e.message += "; add_to_list, path {x}".format(x=context + "." + path)
486 raise
487 current = self.get_list(path, by_reference=True, context=context)
489 # if we require the list to be unique, check for the value first
490 if unique:
491 if val in current:
492 return
494 # otherwise, append
495 current.append(val)
497 def exists_in_list(self, path, val=None, matchsub=None, apply_struct_on_matchsub=True):
498 l = self.get_list(path)
500 for entry in l:
501 if val is not None:
502 if entry == val:
503 return True
504 elif matchsub is not None:
505 # attempt to coerce the sub
506 if apply_struct_on_matchsub:
507 try:
508 type, struct, instructions = self._struct.lookup(path)
509 if struct is not None:
510 matchsub = struct.construct(matchsub, struct).data
511 except:
512 pass
514 matches = 0
515 for k, v in matchsub.items():
516 if entry.get(k) == v:
517 matches += 1
518 if matches == len(list(matchsub.keys())):
519 return True
521 return False
523 def delete_from_list(self, path, val=None, matchsub=None, prune=True, apply_struct_on_matchsub=True):
524 """
525 Note that matchsub will be coerced with the struct if it exists, to ensure
526 that the match is done correctly
528 :param path:
529 :param val:
530 :param matchsub:
531 :param prune:
532 :return:
533 """
534 l = self.get_list(path)
536 removes = []
537 i = 0
538 for entry in l:
539 if val is not None:
540 if entry == val:
541 removes.append(i)
542 elif matchsub is not None:
543 # attempt to coerce the sub
544 if apply_struct_on_matchsub:
545 try:
546 type, struct, instructions = self._struct.lookup(path)
547 if struct is not None:
548 matchsub = struct.construct(matchsub, struct).data
549 except:
550 pass
552 matches = 0
553 for k, v in matchsub.items():
554 if entry.get(k) == v:
555 matches += 1
556 if matches == len(list(matchsub.keys())):
557 removes.append(i)
558 i += 1
560 removes.sort(reverse=True)
561 for r in removes:
562 del l[r]
564 if len(l) == 0 and prune:
565 self.delete(path, prune)
567 def set_with_struct(self, path, val, check_required=True, silent_prune=False):
568 typ, substruct, instructions = self._struct.lookup(path)
570 if typ == "field":
571 coerce_name, coerce_fn = self._struct.get_coerce(instructions)
572 if coerce_fn is None:
573 raise SeamlessException("No coersion function defined for type '{x}' at '{c}'".format(x=coerce_name, c=path))
574 kwargs = self._struct.kwargs(typ, "set", instructions)
575 self.set_single(path, val, coerce=coerce_fn, **kwargs)
576 elif typ == "list":
577 if not isinstance(val, list):
578 val = [val]
579 if substruct is not None:
580 val = [substruct.construct(x, check_required=check_required, silent_prune=silent_prune).data for x in val]
581 kwargs = self._struct.kwargs(typ, "set", instructions)
582 coerce_fn = None
583 if instructions.get("contains") != "object":
584 coerce_name, coerce_fn = self._struct.get_coerce(instructions)
585 self.set_list(path, val, coerce=coerce_fn, **kwargs)
586 elif typ == "object" or typ == "struct":
587 if substruct is not None:
588 val = substruct.construct(val, check_required=check_required, silent_prune=silent_prune).data
589 self.set_single(path, val)
590 else:
591 raise SeamlessException("Attempted to set_with_struct on path '{x}' but no such path exists in the struct".format(x=path))
593 def add_to_list_with_struct(self, path, val):
594 type, struct, instructions = self._struct.lookup(path)
595 if type != "list":
596 raise SeamlessException("Attempt to add to list '{x}' failed - it is not a list element".format(x=path))
597 if struct is not None:
598 val = struct.construct(val).data
599 kwargs = Construct.kwargs(type, "set", instructions)
600 self.add_to_list(path, val, **kwargs)
602 def get_property(self, path, wrapper=None):
603 if wrapper is None:
604 wrapper = lambda x : x
606 # pull the object from the structure, to find out what kind of retrieve it needs
607 # (if there is a struct)
608 type, substruct, instructions = None, None, None
609 if self._struct:
610 type, substruct, instructions = self._struct.lookup(path)
612 if type is None:
613 # if there is no struct, or no object mapping was found, try to pull the path
614 # as a single node (may be a field, list or dict, we'll find out in a mo)
615 val = self.get_single(path)
617 # if a wrapper is supplied, wrap it
618 if isinstance(val, list):
619 return [wrapper(v) for v in val]
620 else:
621 return wrapper(val)
623 if instructions is None:
624 instructions = {}
626 # if the struct contains a reference to the path, always return something, even if it is None - don't raise an AttributeError
627 kwargs = self._struct.kwargs(type, "get", instructions)
628 coerce_name, coerce_fn = self._struct.get_coerce(instructions)
629 if coerce_fn is not None:
630 kwargs["coerce"] = coerce_fn
632 if type == "field" or type == "object":
633 return wrapper(self.get_single(path, **kwargs))
634 elif type == "list":
635 l = self.get_list(path, **kwargs)
636 return [wrapper(o) for o in l]
638 return None
640 def set_property(self, path, value, unwrapper=None, allow_coerce_failure=False):
641 if unwrapper is None:
642 unwrapper = lambda x : x
644 # pull the object from the structure, to find out what kind of retrieve it needs
645 # (if there is a struct)
646 type, substruct, instructions = None, None, None
647 if self._struct:
648 type, substruct, instructions = self._struct.lookup(path)
650 # if no type is found, then this means that either the struct was undefined, or the
651 # path did not point to a valid point in the struct. In the case that the struct was
652 # defined, this means the property is trying to set something outside the struct, which
653 # isn't allowed. So, only set types which are None against objects which don't define
654 # the struct.
655 if type is None:
656 if self._struct is None:
657 if isinstance(value, list):
658 value = [unwrapper(v) for v in value]
659 self.set_list(path, value, allow_coerce_failure)
660 else:
661 value = unwrapper(value)
662 self.set_single(path, value, allow_coerce_failure)
664 return True
665 else:
666 return False
668 if type == "field" or type == "object":
669 value = unwrapper(value)
670 if type == "list":
671 value = [unwrapper(v) for v in value]
673 try:
674 self.set_with_struct(path, value)
675 return
676 except SeamlessException:
677 return False
679 def _get_path(self, path, default):
680 parts = path.split(".")
681 context = self.data
683 for i in range(len(parts)):
684 p = parts[i]
685 d = {} if i < len(parts) - 1 else default
686 context = context.get(p, d)
687 return context
689 def _set_path(self, path, val):
690 parts = path.split(".")
691 context = self.data
693 for i in range(len(parts)):
694 p = parts[i]
696 if p not in context and i < len(parts) - 1:
697 context[p] = {}
698 context = context[p]
699 elif p in context and i < len(parts) - 1:
700 context = context[p]
701 else:
702 context[p] = val
704 @staticmethod
705 def _coerce(val, coerce, accept_failure=False):
706 if coerce is None:
707 return val
708 try:
709 return coerce(val)
710 except (ValueError, TypeError):
711 if accept_failure:
712 return val
713 raise SeamlessException("Coerce with '{x}' failed on '{y}' of type '{z}'".format(x=coerce, y=val, z=type(val)))
715 @staticmethod
716 def _prune_stack(stack):
717 while len(stack) > 0:
718 context = stack.pMax.Pop()
719 todelete = []
720 for k, v in context.items():
721 if isinstance(v, dict) and len(list(v.keys())) == 0:
722 todelete.append(k)
723 for d in todelete:
724 del context[d]
727class Construct(object):
728 def __init__(self, definition, coerce, default_coerce):
729 if isinstance(definition, Construct):
730 definition = definition._definition
732 self._definition = definition
733 self._coerce = coerce
734 self._default_coerce = default_coerce
736 @classmethod
737 def merge(cls, target, *args):
738 # TODO: add an override or mode argument so we can perform a merge with replacements
739 if not isinstance(target, Construct):
740 merged = Construct(deepcopy(target), None, None)
741 else:
742 merged = target
744 for source in args:
745 if not isinstance(source, Construct):
746 source = Construct(source, None, None)
748 for field, instructions in source.fields:
749 merged.add_field(field, instructions, overwrite=False)
751 for obj in source.objects:
752 merged.add_object(obj)
754 for field, instructions in source.lists:
755 merged.add_list(field, instructions, overwrite=False)
757 for r in source._definition.get("required", []):
758 merged.add_required(r)
760 for field, struct in source._definition.get("structs", {}).items():
761 merged.add_substruct(field, struct, mode="merge")
763 return merged
765 @classmethod
766 def kwargs(cls, type, dir, instructions):
767 # if there are no instructions there are no kwargs
768 if instructions is None:
769 return {}
771 # take a copy of the instructions that we can modify
772 kwargs = deepcopy(instructions)
774 # remove the known arguments for the field type
775 if type == "field":
776 if "coerce" in kwargs:
777 del kwargs["coerce"]
779 elif type == "list":
780 if "coerce" in kwargs:
781 del kwargs["coerce"]
782 if "contains" in kwargs:
783 del kwargs["contains"]
785 nk = {}
786 if dir == "set":
787 for k, v in kwargs.items():
788 # basically everything is a "set" argument unless explicitly stated to be a "get" argument
789 if not k.startswith("get__"):
790 if k.startswith("set__"): # if it starts with the set__ prefix, remove it
791 k = k[5:]
792 nk[k] = v
793 elif dir == "get":
794 for k, v in kwargs.items():
795 # must start with "get" argument
796 if k.startswith("get__"):
797 nk[k[5:]] = v
799 return nk
801 @property
802 def raw(self):
803 return self._definition
805 @property
806 def required(self):
807 return self._definition.get("required", [])
809 def add_required(self, field):
810 if "required" not in self._definition:
811 self._definition["required"] = []
812 if field not in self._definition["required"]:
813 self._definition["required"].append(field)
815 @property
816 def allowed(self):
817 return list(self._definition.get("fields", {}).keys()) + \
818 self._definition.get("objects", []) + \
819 list(self._definition.get("lists", {}).keys())
821 @property
822 def objects(self):
823 return self._definition.get("objects", [])
825 def add_object(self, object_name):
826 if "objects" not in self._definition:
827 self._definition["objects"] = []
828 if object_name not in self._definition["objects"]:
829 self._definition["objects"].append(object_name)
831 @property
832 def substructs(self):
833 return self._definition.get("structs", {})
835 def substruct(self, field):
836 s = self.substructs.get(field)
837 if s is None:
838 return None
839 return Construct(s, self._coerce, self._default_coerce)
841 def add_substruct(self, field, struct, mode="merge"):
842 if "structs" not in self._definition:
843 self._definition["structs"] = {}
844 if mode == "overwrite" or field not in self._definition["structs"]:
845 self._definition["structs"][field] = deepcopy(struct)
846 else:
847 # recursively merge
848 self._definition["structs"][field] = Construct.merge(self._definition["structs"][field], struct).raw
850 @property
851 def fields(self):
852 return self._definition.get("fields", {}).items()
854 def field_instructions(self, field):
855 return self._definition.get("fields", {}).get(field)
857 def add_field(self, field_name, instructions, overwrite=False):
858 if "fields" not in self._definition:
859 self._definition["fields"] = {}
860 if overwrite or field_name not in self._definition["fields"]:
861 self._definition["fields"][field_name] = deepcopy(instructions)
863 @property
864 def lists(self):
865 return self._definition.get("lists", {}).items()
867 @property
868 def list_names(self):
869 return self._definition.get("lists", {}).keys()
871 def list_instructions(self, field):
872 return self._definition.get("lists", {}).get(field)
874 def add_list(self, list_name, instructions, overwrite=False):
875 if "lists" not in self._definition:
876 self._definition["lists"] = {}
877 if overwrite or list_name not in self._definition["lists"]:
878 self._definition["lists"][list_name] = deepcopy(instructions)
880 def get_coerce(self, instructions):
881 coerce_name = instructions.get("coerce", self._default_coerce)
882 return coerce_name, self._coerce.get(coerce_name)
884 def get(self, elem, default=None):
885 if elem in self._definition:
886 return self._definition.get(elem)
887 else:
888 return default
890 def lookup(self, path):
891 bits = path.split(".")
893 # if there's more than one path element, we will need to recurse
894 if len(bits) > 1:
895 # it has to be an object, in order for the path to still have multiple
896 # segments
897 if bits[0] not in self.objects:
898 return None, None, None
899 substruct = self.substruct(bits[0])
900 return substruct.lookup(".".join(bits[1:]))
901 elif len(bits) == 1:
902 # first check the fields
903 instructions = self.field_instructions(bits[0])
904 if instructions is not None:
905 return "field", None, instructions
907 # then check the lists
908 instructions = self.list_instructions(bits[0])
909 if instructions is not None:
910 substruct = self.substruct(bits[0])
911 return "list", substruct, instructions
913 # then check the objects
914 if bits[0] in self.objects:
915 substruct = self.substruct(bits[0])
916 return "struct", substruct, instructions
918 return None, None, None
920 def construct(self, obj, check_required=True, silent_prune=False, allow_other_fields=False):
922 def recurse(obj, struct, context):
923 if obj is None:
924 return None
925 if not isinstance(obj, dict):
926 raise SeamlessException("Expected a dict at '{c}' but found something else instead".format(c=context))
928 keyset = obj.keys()
930 # if we are checking required fields, then check them
931 # FIXME: might be sensible to move this out to a separate phase, independent of constructing
932 if check_required:
933 for r in struct.required:
934 if r not in keyset:
935 raise SeamlessException("Field '{r}' is required but not present at '{c}'".format(r=r, c=context))
937 # check that there are no fields that are not allowed
938 # Note that since the construct mechanism copies fields explicitly, silent_prune just turns off this
939 # check
940 if not allow_other_fields and not silent_prune:
941 allowed = struct.allowed
942 for k in keyset:
943 if k not in allowed:
944 c = context if context != "" else "root"
945 raise SeamlessException("Field '{k}' is not permitted at '{c}'".format(k=k, c=c))
947 # make a SeamlessData instance for gathering all the new data
948 constructed = SeamlessData(struct=struct)
950 # now check all the fields
951 for field_name, instructions in struct.fields:
952 val = obj.get(field_name)
953 if val is None:
954 continue
955 typ, substruct, instructions = struct.lookup(field_name)
956 if instructions is None:
957 raise SeamlessException("No instruction set defined for field at '{x}'".format(x=context + field_name))
958 coerce_name, coerce_fn = struct.get_coerce(instructions)
959 if coerce_fn is None:
960 raise SeamlessException("No coerce function defined for type '{x}' at '{c}'".format(x=coerce_name, c=context + field_name))
961 kwargs = struct.kwargs(typ, "set", instructions)
962 constructed.set_single(field_name, val, coerce=coerce_fn, context=context, **kwargs)
964 # next check all the objects (which will involve a recursive call to this function)
965 for field_name in struct.objects:
966 val = obj.get(field_name)
967 if val is None:
968 continue
969 if type(val) != dict:
970 raise SeamlessException("Expected dict at '{x}' but found '{y}'".format(x=context + field_name, y=type(val)))
972 typ, substruct, instructions = struct.lookup(field_name)
974 if substruct is None:
975 # this is the lowest point at which we have instructions, so just accept the data structure as-is
976 # (taking a deep copy to destroy any references)
977 constructed.set_single(field_name, deepcopy(val))
978 else:
979 # we need to recurse further down
980 beneath = recurse(val, substruct, context=context + field_name + ".")
982 # what we get back is the correct sub-data structure, which we can then store
983 constructed.set_single(field_name, beneath)
985 # now check all the lists
986 for field_name, instructions in struct.lists:
987 vals = obj.get(field_name)
988 if vals is None:
989 continue
990 if not isinstance(vals, list):
991 raise SeamlessException("Expecting list at '{x}' but found something else '{y}'".format(x=context + field_name, y=type(val)))
993 typ, substruct, instructions = struct.lookup(field_name)
994 kwargs = struct.kwargs(typ, "set", instructions)
996 contains = instructions.get("contains")
997 if contains == "field":
998 # coerce all the values in the list
999 coerce_name, coerce_fn = struct.get_coerce(instructions)
1000 if coerce_fn is None:
1001 raise SeamlessException("No coerce function defined for type '{x}' at '{c}'".format(x=coerce_name, c=context + field_name))
1003 for i in range(len(vals)):
1004 val = vals[i]
1005 constructed.add_to_list(field_name, val, coerce=coerce_fn, **kwargs)
1007 elif contains == "object":
1008 # for each object in the list, send it for construction
1009 for i in range(len(vals)):
1010 val = vals[i]
1012 if type(val) != dict:
1013 print("Expected dict at '{x}[{p}]' but got '{y}'".format(x=context + field_name, y=type(val), p=i))
1014 raise SeamlessException("Expected dict at '{x}[{p}]' but got '{y}'".format(x=context + field_name, y=type(val), p=i))
1016 substruct = struct.substruct(field_name)
1017 if substruct is None:
1018 constructed.add_to_list(field_name, deepcopy(val))
1019 else:
1020 # we need to recurse further down
1021 beneath = recurse(val, substruct, context=context + field_name + "[" + str(i) + "].")
1023 # what we get back is the correct sub-data structure, which we can then store
1024 constructed.add_to_list(field_name, beneath)
1026 else:
1027 raise SeamlessException("Cannot understand structure where list '{x}' elements contain '{y}'".format(x=context + field_name, y=contains))
1029 # finally, if we allow other fields, make sure that they come across too
1030 if allow_other_fields:
1031 known = struct.allowed
1032 for k, v in obj.items():
1033 if k not in known:
1034 constructed.set_single(k, v)
1036 # ensure any external references to the object persist
1037 obj.clear()
1038 obj.update(constructed.data)
1039 return obj
1041 ready = recurse(obj, self, "[root]")
1042 return SeamlessData(ready, struct=self)
1044 def validate(self):
1046 def recurse(struct, context):
1047 # check that only the allowed keys are present
1048 keys = struct.raw.keys()
1049 for k in keys:
1050 if k not in ["fields", "objects", "lists", "required", "structs"]:
1051 raise SeamlessException("Key '{x}' present in struct at '{y}', but is not permitted".format(x=k, y=context))
1053 # now go through and make sure the fields are the right shape:
1054 for field_name, instructions in struct.fields:
1055 for k,v in instructions.items():
1056 if not isinstance(v, list) and not isinstance(v, str) and not isinstance(v, bool):
1057 raise SeamlessException("Argument '{a}' in field '{b}' at '{c}' is not a string, list or boolean".format(a=k, b=field_name, c=context))
1059 # then make sure the objects are ok
1060 for o in struct.objects:
1061 if not isinstance(o, str):
1062 raise SeamlessException("There is a non-string value in the object list at '{y}'".format(y=context))
1064 # make sure the lists are correct
1065 for field_name, instructions in struct.lists:
1066 contains = instructions.get("contains")
1067 if contains is None:
1068 raise SeamlessException("No 'contains' argument in list definition for field '{x}' at '{y}'".format(x=field_name, y=context))
1069 if contains not in ["object", "field"]:
1070 raise SeamlessException("'contains' argument in list '{x}' at '{y}' contains illegal value '{z}'".format(x=field_name, y=context, z=contains))
1071 for k,v in instructions.items():
1072 if not isinstance(v, list) and not isinstance(v, str) and not isinstance(v, bool):
1073 raise SeamlessException("Argument '{a}' in list '{b}' at '{c}' is not a string, list or boolean".format(a=k, b=field_name, c=context))
1075 # make sure the requireds are correct
1076 for o in struct.required:
1077 if not isinstance(o, str):
1078 raise SeamlessException("There is a non-string value in the required list at '{y}'".format(y=context))
1080 # now do the structs, which will involve some recursion
1081 substructs = struct.substructs
1083 # first check that there are no previously unknown keys in there
1084 possibles = struct.objects + list(struct.list_names)
1085 for s in substructs:
1086 if s not in possibles:
1087 raise SeamlessException("struct contains key '{a}' which is not listed in object or list definitions at '{x}'".format(a=s, x=context))
1089 # now recurse into each struct
1090 for k, v in substructs.items():
1091 nc = context
1092 if nc == "":
1093 nc = k
1094 else:
1095 nc += "." + k
1096 recurse(Construct(v, None, None), context=nc)
1098 return True
1100 recurse(self, "[root]")
1103def create_allowed_values_by_constant(constant_class: Type[ConstantList]):
1104 return {
1105 'allowed_values': list(constant_class.all_constants())
1106 }