Coverage for portality / lib / seamless.py: 70%

696 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-04 09:41 +0100

1import locale 

2from copy import deepcopy 

3from typing import Type 

4from urllib.parse import urlparse 

5 

6from portality.constants import ConstantList 

7from portality.lib import dates 

8 

9 

10############################################### 

11## Common coerce functions 

12############################################### 

13 

14def to_utf8_unicode(val): 

15 if isinstance(val, str): 

16 return val 

17 elif isinstance(val, str): 

18 try: 

19 return val.decode("utf8", "strict") 

20 except UnicodeDecodeError: 

21 raise ValueError("Could not decode string") 

22 else: 

23 return str(val) 

24 

25 

26def to_unicode_upper(val): 

27 val = to_utf8_unicode(val) 

28 return val.upper() 

29 

30 

31def to_unicode_lower(val): 

32 val = to_utf8_unicode(val) 

33 return val.lower() 

34 

35 

36def intify(val): 

37 # strip any characters that are outside the ascii range - they won't make up the int anyway 

38 # and this will get rid of things like strange currency marks 

39 if isinstance(val, str): 

40 val = val.encode("ascii", errors="ignore") 

41 

42 # try the straight cast 

43 try: 

44 return int(val) 

45 except ValueError: 

46 pass 

47 

48 # could have commas in it, so try stripping them 

49 try: 

50 return int(val.replace(",", "")) 

51 except ValueError: 

52 pass 

53 

54 # try the locale-specific approach 

55 try: 

56 return locale.atoi(val) 

57 except ValueError: 

58 pass 

59 

60 raise ValueError("Could not convert string to int: {x}".format(x=val)) 

61 

62def floatify(val): 

63 # strip any characters that are outside the ascii range - they won't make up the float anyway 

64 # and this will get rid of things like strange currency marks 

65 if isinstance(val, str): 

66 val = val.encode("ascii", errors="ignore") 

67 

68 # try the straight cast 

69 try: 

70 return float(val) 

71 except ValueError: 

72 pass 

73 

74 # could have commas in it, so try stripping them 

75 try: 

76 return float(val.replace(",", "")) 

77 except ValueError: 

78 pass 

79 

80 # try the locale-specific approach 

81 try: 

82 return locale.atof(val) 

83 except ValueError: 

84 pass 

85 

86 raise ValueError("Could not convert string to float: {x}".format(x=val)) 

87 

88 

89def to_url(val): 

90 if not isinstance(val, str): 

91 raise ValueError("Argument passed to to_url was not a string, but type '{t}': '{val}'".format(t=type(val),val=val)) 

92 

93 val = val.strip() 

94 

95 if val == '': 

96 return val 

97 

98 # parse with urlparse 

99 url = urlparse(val) 

100 

101 # now check the url has the minimum properties that we require 

102 if url.scheme and url.scheme.startswith("http"): 

103 return to_utf8_unicode(val) 

104 else: 

105 raise ValueError("Could not convert string {val} to viable URL".format(val=val)) 

106 

107 

108def to_bool(val): 

109 """Conservative boolean cast - don't cast lists and objects to True, just existing booleans and strings.""" 

110 if val is None: 

111 return None 

112 if val is True or val is False: 

113 return val 

114 

115 if isinstance(val, str): 

116 if val.lower() == 'true': 

117 return True 

118 elif val.lower() == 'false': 

119 return False 

120 raise ValueError("Could not convert string {val} to boolean. Expecting string to either say 'true' or 'false' (not case-sensitive).".format(val=val)) 

121 

122 raise ValueError("Could not convert {val} to boolean. Expect either boolean or string.".format(val=val)) 

123 

124 

125def to_datetime(val): 

126 try: 

127 return dates.parse(val) 

128 except: 

129 raise ValueError("Could not convert string {val} to UTC Datetime".format(val=val)) 

130 

131 

132def string_canonicalise(canon, allow_fail=False): 

133 normalised = {} 

134 for a in canon: 

135 normalised[a.strip().lower()] = a 

136 

137 def sn(val): 

138 if val is None: 

139 if allow_fail: 

140 return None 

141 raise ValueError("NoneType not permitted") 

142 

143 try: 

144 norm = val.strip().lower() 

145 except: 

146 raise ValueError("Unable to treat value as a string") 

147 

148 uc = to_utf8_unicode 

149 if norm in normalised: 

150 return uc(normalised[norm]) 

151 if allow_fail: 

152 return uc(val) 

153 

154 raise ValueError("Unable to canonicalise string") 

155 

156 return sn 

157 

158 

159class SeamlessException(Exception): 

160 def __init__(self, message, *args, **kwargs): 

161 self.message = message 

162 super(SeamlessException, self).__init__(message, *args, **kwargs) 

163 

164 

165class SeamlessMixin(object): 

166 """ 

167  

168 remember merge data property if you Mixin with DomainObject 

169 

170 ------ 

171 Example 

172 ```python 

173 class DatalogJournalAdded(SeamlessMixin, DomainObject): 

174 ... 

175 

176 @property 

177 def data(self): 

178 return self.__seamless__.data 

179 ``` 

180 

181 remember to setup __init__ like below, otherwise your object wrap in iterable will not work 

182 

183 ---- 

184 ```python 

185 def __init__(self, **kwargs): 

186 super(DatalogJournalAdded, self).__init__(raw=kwargs) 

187 

188 ``` 

189 

190 

191  

192 """ 

193 

194 __SEAMLESS_STRUCT__ = None 

195 

196 __SEAMLESS_COERCE__ = { 

197 "unicode": to_utf8_unicode, 

198 "unicode_upper" : to_unicode_upper, 

199 "unicode_lower" : to_unicode_lower, 

200 "integer": intify, 

201 "float": floatify, 

202 "url": to_url, 

203 "bool": to_bool, 

204 "datetime" : to_datetime 

205 } 

206 

207 __SEAMLESS_DEFAULT_COERCE__ = "unicode" 

208 

209 __SEAMLESS_PROPERTIES__ = None 

210 

211 __SEAMLESS_APPLY_STRUCT_ON_INIT__ = True 

212 __SEAMLESS_CHECK_REQUIRED_ON_INIT__ = True 

213 __SEAMLESS_SILENT_PRUNE__ = False 

214 __SEAMLESS_ALLOW_OTHER_FIELDS__ = False 

215 

216 def __init__(self, 

217 raw=None, # The raw data 

218 struct=None, 

219 coerce=None, 

220 properties=None, 

221 default_coerce=None, 

222 apply_struct_on_init=None, 

223 check_required_on_init=None, 

224 silent_prune=None, 

225 allow_other_fields=None, 

226 *args, **kwargs 

227 ): 

228 

229 # set all the working properties 

230 self.__seamless_coerce__ = coerce if coerce is not None else self.__SEAMLESS_COERCE__ 

231 self.__seamless_default_coerce__ = default_coerce if default_coerce is not None else self.__SEAMLESS_DEFAULT_COERCE__ 

232 self.__seamless_properties__ = properties if properties is not None else self.__SEAMLESS_PROPERTIES__ 

233 self.__seamless_apply_struct_on_init__ = apply_struct_on_init if apply_struct_on_init is not None else self.__SEAMLESS_APPLY_STRUCT_ON_INIT__ 

234 self.__seamless_check_required_on_init__ = check_required_on_init if check_required_on_init is not None else self.__SEAMLESS_CHECK_REQUIRED_ON_INIT__ 

235 self.__seamless_silent_prune__ = silent_prune if silent_prune is not None else self.__SEAMLESS_SILENT_PRUNE__ 

236 self.__seamless_allow_other_fields__ = allow_other_fields if allow_other_fields is not None else self.__SEAMLESS_ALLOW_OTHER_FIELDS__ 

237 

238 struct = struct if struct is not None else self.__SEAMLESS_STRUCT__ 

239 if isinstance(struct, list): 

240 struct = Construct.merge(*struct) 

241 self.__seamless_struct__ = Construct(struct, 

242 self.__seamless_coerce__, 

243 self.__seamless_default_coerce__) 

244 

245 self.__seamless__ = SeamlessData(raw, struct=self.__seamless_struct__) 

246 

247 if (self.__seamless_struct__ is not None and 

248 raw is not None and 

249 self.__seamless_apply_struct_on_init__): 

250 self.__seamless__ = self.__seamless_struct__.construct(self.__seamless__.data, 

251 check_required=self.__seamless_check_required_on_init__, 

252 silent_prune=self.__seamless_silent_prune__, 

253 allow_other_fields=self.__seamless_allow_other_fields__) 

254 

255 self.custom_validate() 

256 

257 super(SeamlessMixin, self).__init__(*args, **kwargs) 

258 

259 def __getattr__(self, name): 

260 

261 # workaround to prevent debugger from disconnecting at the deepcopy method 

262 # https://stackoverflow.com/questions/32831050/pycharms-debugger-gives-up-when-hitting-copy-deepcopy 

263 # if name.startswith("__"): 

264 # raise AttributeError 

265 

266 if hasattr(self.__class__, name): 

267 return object.__getattribute__(self, name) 

268 

269 if self.__seamless_properties__ is not None: 

270 prop = self.__seamless_properties__.get(name) 

271 if prop is not None: 

272 path = prop["path"] 

273 wrap = prop.get("wrapper") 

274 return self.__seamless__.get_property(path, wrap) 

275 

276 raise AttributeError('{name} is not set'.format(name=name)) 

277 

278 def __setattr__(self, name, value, allow_coerce_failure=False): 

279 if hasattr(self.__class__, name): 

280 return object.__setattr__(self, name, value) 

281 

282 if name.startswith("__seamless"): 

283 return object.__setattr__(self, name, value) 

284 

285 if self.__seamless_properties__ is not None: 

286 prop = self.__seamless_properties__.get(name) 

287 if prop is not None: 

288 path = prop["path"] 

289 unwrap = prop.get("unwrapper") 

290 wasset = self.__seamless__.set_property(path, value, unwrap, allow_coerce_failure) 

291 if wasset: 

292 return 

293 

294 # fall back to the default approach of allowing any attribute to be set on the object 

295 return object.__setattr__(self, name, value) 

296 

297 def __deepcopy__(self): 

298 # FIXME: should also reflect all the constructor arguments 

299 return self.__class__(deepcopy(self.__seamless__.data)) 

300 

301 def custom_validate(self): 

302 """ 

303 Should be implemented on the higher level 

304 """ 

305 pass 

306 

307 def verify_against_struct(self, check_required=True, silent_prune=None, allow_other_fields=None): 

308 

309 silent_prune = silent_prune if silent_prune is not None else self.__seamless_silent_prune__ 

310 allow_other_fields = allow_other_fields if allow_other_fields is not None else self.__seamless_allow_other_fields__ 

311 

312 if (self.__seamless_struct__ is not None and 

313 self.__seamless__ is not None): 

314 self.__seamless_struct__.construct(deepcopy(self.__seamless__.data), # use a copy of the data, to avoid messing with any references to the current data 

315 check_required=check_required, 

316 silent_prune=silent_prune, 

317 allow_other_fields=allow_other_fields) 

318 

319 def apply_struct(self, check_required=True, silent_prune=None, allow_other_fields=None): 

320 

321 silent_prune = silent_prune if silent_prune is not None else self.__seamless_silent_prune__ 

322 allow_other_fields = allow_other_fields if allow_other_fields is not None else self.__seamless_allow_other_fields__ 

323 

324 if (self.__seamless_struct__ is not None and 

325 self.__seamless__ is not None): 

326 self.__seamless__ = self.__seamless_struct__.construct(self.__seamless__.data, 

327 check_required=check_required, 

328 silent_prune=silent_prune, 

329 allow_other_fields=allow_other_fields) 

330 

331 def extend_struct(self, struct): 

332 self.__seamless_struct__ = Construct.merge(self.__seamless_struct__, struct) 

333 

334 

335class SeamlessData(object): 

336 def __init__(self, raw=None, struct=None): 

337 self.data = raw if raw is not None else {} 

338 self._struct = struct 

339 

340 def get_single(self, path, coerce=None, default=None, allow_coerce_failure=True): 

341 # get the value at the point in the object 

342 val = self._get_path(path, default) 

343 

344 if coerce is not None and val is not None: 

345 # if you want to coerce and there is something to coerce do it 

346 try: 

347 return self._coerce(val, coerce, accept_failure=allow_coerce_failure) 

348 except SeamlessException as e: 

349 e.message += "; get_single, path {x}".format(x=path) 

350 raise 

351 else: 

352 # otherwise return the value 

353 return val 

354 

355 def set_single(self, path, val, coerce=None, allow_coerce_failure=False, allowed_values=None, allowed_range=None, 

356 allow_none=True, ignore_none=False, context=""): 

357 

358 if val is None and ignore_none: 

359 return 

360 

361 if val is None and not allow_none: 

362 raise SeamlessException("NoneType is not allowed at '{x}'".format(x=context + "." + path)) 

363 

364 # first see if we need to coerce the value (and don't coerce None) 

365 if coerce is not None and val is not None: 

366 try: 

367 val = self._coerce(val, coerce, accept_failure=allow_coerce_failure) 

368 except SeamlessException as e: 

369 e.message += "; set_single, path {x}".format(x=context + "." + path) 

370 raise 

371 

372 if allowed_values is not None and val not in allowed_values: 

373 raise SeamlessException("Value '{x}' is not permitted at '{y}'".format(x=val, y=context + "." + path)) 

374 

375 if allowed_range is not None: 

376 lower, upper = allowed_range 

377 if (lower is not None and val < lower) or (upper is not None and val > upper): 

378 raise SeamlessException("Value '{x}' is outside the allowed range: {l} - {u} at '{y}'".format(x=val, l=lower, u=upper, y=context + "." + path)) 

379 

380 # now set it at the path point in the object 

381 self._set_path(path, val) 

382 

383 def delete(self, path, prune=True): 

384 parts = path.split(".") 

385 context = self.data 

386 

387 stack = [] 

388 for i in range(len(parts)): 

389 p = parts[i] 

390 if p in context: 

391 if i < len(parts) - 1: 

392 stack.append(context[p]) 

393 context = context[p] 

394 else: 

395 del context[p] 

396 if prune and len(stack) > 0: 

397 stack.pop() # the last element was just deleted 

398 self._prune_stack(stack) 

399 

400 def get_list(self, path, coerce=None, by_reference=True, allow_coerce_failure=True, context=""): 

401 # get the value at the point in the object 

402 val = self._get_path(path, None) 

403 

404 # if there is no value and we want to do by reference, then create it, bind it and return it 

405 if val is None and by_reference: 

406 mylist = [] 

407 self.set_single(path, mylist) 

408 return mylist 

409 

410 # otherwise, default is an empty list 

411 elif val is None and not by_reference: 

412 return [] 

413 

414 # check that the val is actually a list 

415 if not isinstance(val, list): 

416 raise SeamlessException("Expecting a list at '{x}' but found '{y}'".format(x=context + "." + path, y=val)) 

417 

418 # if there is a value, do we want to coerce each of them 

419 if coerce is not None: 

420 try: 

421 coerced = [self._coerce(v, coerce, accept_failure=allow_coerce_failure) for v in val] 

422 except SeamlessException as e: 

423 e.message += "; get_list, path {x}".format(x=context + "." + path) 

424 raise 

425 if by_reference: 

426 self.set_single(path, coerced) 

427 return coerced 

428 else: 

429 if by_reference: 

430 return val 

431 else: 

432 return deepcopy(val) 

433 

434 def set_list(self, path, val, coerce=None, allow_coerce_failure=False, allow_none=True, 

435 ignore_none=False, allowed_values=None, context=""): 

436 # first ensure that the value is a list 

437 if not isinstance(val, list): 

438 val = [val] 

439 

440 # now carry out the None check 

441 # for each supplied value, if it is none, and none is not allowed, raise an error if we do not 

442 # plan to ignore the nones. 

443 for v in val: 

444 if v is None and not allow_none: 

445 if not ignore_none: 

446 raise SeamlessException("NoneType is not allowed at '{x}'".format(x=context + "." + path)) 

447 if allowed_values is not None and v not in allowed_values: 

448 raise SeamlessException("Value '{x}' is not permitted at '{y}'".format(x=val, y=context + "." + path)) 

449 

450 # now coerce each of the values, stripping out Nones if necessary 

451 try: 

452 val = [self._coerce(v, coerce, accept_failure=allow_coerce_failure) for v in val if v is not None or not ignore_none] 

453 except SeamlessException as e: 

454 e.message += "; set_list, path {x}".format(x=context + "." + path) 

455 raise 

456 

457 # check that the cleaned array isn't empty, and if it is behave appropriately 

458 if len(val) == 0: 

459 # this is equivalent to a None, so we need to decide what to do 

460 if ignore_none: 

461 # if we are ignoring nones, just do nothing 

462 return 

463 elif not allow_none: 

464 # if we are not ignoring nones, and not allowing them, raise an error 

465 raise SeamlessException("Empty array not permitted at '{x}'".format(x=context + "." + path)) 

466 

467 # now set it on the path 

468 self._set_path(path, val) 

469 

470 def add_to_list(self, path, val, coerce=None, allow_coerce_failure=False, allow_none=False, 

471 ignore_none=True, unique=False, allowed_values=None, context=""): 

472 if val is None and ignore_none: 

473 return 

474 

475 if val is None and not allow_none: 

476 raise SeamlessException("NoneType is not allowed in list at '{x}'".format(x=context + "." + path)) 

477 if allowed_values is not None and val not in allowed_values: 

478 raise SeamlessException("Value '{x}' is not permitted at '{y}'".format(x=val, y=context + "." + path)) 

479 

480 # first coerce the value 

481 if coerce is not None: 

482 try: 

483 val = self._coerce(val, coerce, accept_failure=allow_coerce_failure) 

484 except SeamlessException as e: 

485 e.message += "; add_to_list, path {x}".format(x=context + "." + path) 

486 raise 

487 current = self.get_list(path, by_reference=True, context=context) 

488 

489 # if we require the list to be unique, check for the value first 

490 if unique: 

491 if val in current: 

492 return 

493 

494 # otherwise, append 

495 current.append(val) 

496 

497 def exists_in_list(self, path, val=None, matchsub=None, apply_struct_on_matchsub=True): 

498 l = self.get_list(path) 

499 

500 for entry in l: 

501 if val is not None: 

502 if entry == val: 

503 return True 

504 elif matchsub is not None: 

505 # attempt to coerce the sub 

506 if apply_struct_on_matchsub: 

507 try: 

508 type, struct, instructions = self._struct.lookup(path) 

509 if struct is not None: 

510 matchsub = struct.construct(matchsub, struct).data 

511 except: 

512 pass 

513 

514 matches = 0 

515 for k, v in matchsub.items(): 

516 if entry.get(k) == v: 

517 matches += 1 

518 if matches == len(list(matchsub.keys())): 

519 return True 

520 

521 return False 

522 

523 def delete_from_list(self, path, val=None, matchsub=None, prune=True, apply_struct_on_matchsub=True): 

524 """ 

525 Note that matchsub will be coerced with the struct if it exists, to ensure 

526 that the match is done correctly 

527 

528 :param path: 

529 :param val: 

530 :param matchsub: 

531 :param prune: 

532 :return: 

533 """ 

534 l = self.get_list(path) 

535 

536 removes = [] 

537 i = 0 

538 for entry in l: 

539 if val is not None: 

540 if entry == val: 

541 removes.append(i) 

542 elif matchsub is not None: 

543 # attempt to coerce the sub 

544 if apply_struct_on_matchsub: 

545 try: 

546 type, struct, instructions = self._struct.lookup(path) 

547 if struct is not None: 

548 matchsub = struct.construct(matchsub, struct).data 

549 except: 

550 pass 

551 

552 matches = 0 

553 for k, v in matchsub.items(): 

554 if entry.get(k) == v: 

555 matches += 1 

556 if matches == len(list(matchsub.keys())): 

557 removes.append(i) 

558 i += 1 

559 

560 removes.sort(reverse=True) 

561 for r in removes: 

562 del l[r] 

563 

564 if len(l) == 0 and prune: 

565 self.delete(path, prune) 

566 

567 def set_with_struct(self, path, val, check_required=True, silent_prune=False): 

568 typ, substruct, instructions = self._struct.lookup(path) 

569 

570 if typ == "field": 

571 coerce_name, coerce_fn = self._struct.get_coerce(instructions) 

572 if coerce_fn is None: 

573 raise SeamlessException("No coersion function defined for type '{x}' at '{c}'".format(x=coerce_name, c=path)) 

574 kwargs = self._struct.kwargs(typ, "set", instructions) 

575 self.set_single(path, val, coerce=coerce_fn, **kwargs) 

576 elif typ == "list": 

577 if not isinstance(val, list): 

578 val = [val] 

579 if substruct is not None: 

580 val = [substruct.construct(x, check_required=check_required, silent_prune=silent_prune).data for x in val] 

581 kwargs = self._struct.kwargs(typ, "set", instructions) 

582 coerce_fn = None 

583 if instructions.get("contains") != "object": 

584 coerce_name, coerce_fn = self._struct.get_coerce(instructions) 

585 self.set_list(path, val, coerce=coerce_fn, **kwargs) 

586 elif typ == "object" or typ == "struct": 

587 if substruct is not None: 

588 val = substruct.construct(val, check_required=check_required, silent_prune=silent_prune).data 

589 self.set_single(path, val) 

590 else: 

591 raise SeamlessException("Attempted to set_with_struct on path '{x}' but no such path exists in the struct".format(x=path)) 

592 

593 def add_to_list_with_struct(self, path, val): 

594 type, struct, instructions = self._struct.lookup(path) 

595 if type != "list": 

596 raise SeamlessException("Attempt to add to list '{x}' failed - it is not a list element".format(x=path)) 

597 if struct is not None: 

598 val = struct.construct(val).data 

599 kwargs = Construct.kwargs(type, "set", instructions) 

600 self.add_to_list(path, val, **kwargs) 

601 

602 def get_property(self, path, wrapper=None): 

603 if wrapper is None: 

604 wrapper = lambda x : x 

605 

606 # pull the object from the structure, to find out what kind of retrieve it needs 

607 # (if there is a struct) 

608 type, substruct, instructions = None, None, None 

609 if self._struct: 

610 type, substruct, instructions = self._struct.lookup(path) 

611 

612 if type is None: 

613 # if there is no struct, or no object mapping was found, try to pull the path 

614 # as a single node (may be a field, list or dict, we'll find out in a mo) 

615 val = self.get_single(path) 

616 

617 # if a wrapper is supplied, wrap it 

618 if isinstance(val, list): 

619 return [wrapper(v) for v in val] 

620 else: 

621 return wrapper(val) 

622 

623 if instructions is None: 

624 instructions = {} 

625 

626 # if the struct contains a reference to the path, always return something, even if it is None - don't raise an AttributeError 

627 kwargs = self._struct.kwargs(type, "get", instructions) 

628 coerce_name, coerce_fn = self._struct.get_coerce(instructions) 

629 if coerce_fn is not None: 

630 kwargs["coerce"] = coerce_fn 

631 

632 if type == "field" or type == "object": 

633 return wrapper(self.get_single(path, **kwargs)) 

634 elif type == "list": 

635 l = self.get_list(path, **kwargs) 

636 return [wrapper(o) for o in l] 

637 

638 return None 

639 

640 def set_property(self, path, value, unwrapper=None, allow_coerce_failure=False): 

641 if unwrapper is None: 

642 unwrapper = lambda x : x 

643 

644 # pull the object from the structure, to find out what kind of retrieve it needs 

645 # (if there is a struct) 

646 type, substruct, instructions = None, None, None 

647 if self._struct: 

648 type, substruct, instructions = self._struct.lookup(path) 

649 

650 # if no type is found, then this means that either the struct was undefined, or the 

651 # path did not point to a valid point in the struct. In the case that the struct was 

652 # defined, this means the property is trying to set something outside the struct, which 

653 # isn't allowed. So, only set types which are None against objects which don't define 

654 # the struct. 

655 if type is None: 

656 if self._struct is None: 

657 if isinstance(value, list): 

658 value = [unwrapper(v) for v in value] 

659 self.set_list(path, value, allow_coerce_failure) 

660 else: 

661 value = unwrapper(value) 

662 self.set_single(path, value, allow_coerce_failure) 

663 

664 return True 

665 else: 

666 return False 

667 

668 if type == "field" or type == "object": 

669 value = unwrapper(value) 

670 if type == "list": 

671 value = [unwrapper(v) for v in value] 

672 

673 try: 

674 self.set_with_struct(path, value) 

675 return 

676 except SeamlessException: 

677 return False 

678 

679 def _get_path(self, path, default): 

680 parts = path.split(".") 

681 context = self.data 

682 

683 for i in range(len(parts)): 

684 p = parts[i] 

685 d = {} if i < len(parts) - 1 else default 

686 context = context.get(p, d) 

687 return context 

688 

689 def _set_path(self, path, val): 

690 parts = path.split(".") 

691 context = self.data 

692 

693 for i in range(len(parts)): 

694 p = parts[i] 

695 

696 if p not in context and i < len(parts) - 1: 

697 context[p] = {} 

698 context = context[p] 

699 elif p in context and i < len(parts) - 1: 

700 context = context[p] 

701 else: 

702 context[p] = val 

703 

704 @staticmethod 

705 def _coerce(val, coerce, accept_failure=False): 

706 if coerce is None: 

707 return val 

708 try: 

709 return coerce(val) 

710 except (ValueError, TypeError): 

711 if accept_failure: 

712 return val 

713 raise SeamlessException("Coerce with '{x}' failed on '{y}' of type '{z}'".format(x=coerce, y=val, z=type(val))) 

714 

715 @staticmethod 

716 def _prune_stack(stack): 

717 while len(stack) > 0: 

718 context = stack.pMax.Pop() 

719 todelete = [] 

720 for k, v in context.items(): 

721 if isinstance(v, dict) and len(list(v.keys())) == 0: 

722 todelete.append(k) 

723 for d in todelete: 

724 del context[d] 

725 

726 

727class Construct(object): 

728 def __init__(self, definition, coerce, default_coerce): 

729 if isinstance(definition, Construct): 

730 definition = definition._definition 

731 

732 self._definition = definition 

733 self._coerce = coerce 

734 self._default_coerce = default_coerce 

735 

736 @classmethod 

737 def merge(cls, target, *args): 

738 # TODO: add an override or mode argument so we can perform a merge with replacements 

739 if not isinstance(target, Construct): 

740 merged = Construct(deepcopy(target), None, None) 

741 else: 

742 merged = target 

743 

744 for source in args: 

745 if not isinstance(source, Construct): 

746 source = Construct(source, None, None) 

747 

748 for field, instructions in source.fields: 

749 merged.add_field(field, instructions, overwrite=False) 

750 

751 for obj in source.objects: 

752 merged.add_object(obj) 

753 

754 for field, instructions in source.lists: 

755 merged.add_list(field, instructions, overwrite=False) 

756 

757 for r in source._definition.get("required", []): 

758 merged.add_required(r) 

759 

760 for field, struct in source._definition.get("structs", {}).items(): 

761 merged.add_substruct(field, struct, mode="merge") 

762 

763 return merged 

764 

765 @classmethod 

766 def kwargs(cls, type, dir, instructions): 

767 # if there are no instructions there are no kwargs 

768 if instructions is None: 

769 return {} 

770 

771 # take a copy of the instructions that we can modify 

772 kwargs = deepcopy(instructions) 

773 

774 # remove the known arguments for the field type 

775 if type == "field": 

776 if "coerce" in kwargs: 

777 del kwargs["coerce"] 

778 

779 elif type == "list": 

780 if "coerce" in kwargs: 

781 del kwargs["coerce"] 

782 if "contains" in kwargs: 

783 del kwargs["contains"] 

784 

785 nk = {} 

786 if dir == "set": 

787 for k, v in kwargs.items(): 

788 # basically everything is a "set" argument unless explicitly stated to be a "get" argument 

789 if not k.startswith("get__"): 

790 if k.startswith("set__"): # if it starts with the set__ prefix, remove it 

791 k = k[5:] 

792 nk[k] = v 

793 elif dir == "get": 

794 for k, v in kwargs.items(): 

795 # must start with "get" argument 

796 if k.startswith("get__"): 

797 nk[k[5:]] = v 

798 

799 return nk 

800 

801 @property 

802 def raw(self): 

803 return self._definition 

804 

805 @property 

806 def required(self): 

807 return self._definition.get("required", []) 

808 

809 def add_required(self, field): 

810 if "required" not in self._definition: 

811 self._definition["required"] = [] 

812 if field not in self._definition["required"]: 

813 self._definition["required"].append(field) 

814 

815 @property 

816 def allowed(self): 

817 return list(self._definition.get("fields", {}).keys()) + \ 

818 self._definition.get("objects", []) + \ 

819 list(self._definition.get("lists", {}).keys()) 

820 

821 @property 

822 def objects(self): 

823 return self._definition.get("objects", []) 

824 

825 def add_object(self, object_name): 

826 if "objects" not in self._definition: 

827 self._definition["objects"] = [] 

828 if object_name not in self._definition["objects"]: 

829 self._definition["objects"].append(object_name) 

830 

831 @property 

832 def substructs(self): 

833 return self._definition.get("structs", {}) 

834 

835 def substruct(self, field): 

836 s = self.substructs.get(field) 

837 if s is None: 

838 return None 

839 return Construct(s, self._coerce, self._default_coerce) 

840 

841 def add_substruct(self, field, struct, mode="merge"): 

842 if "structs" not in self._definition: 

843 self._definition["structs"] = {} 

844 if mode == "overwrite" or field not in self._definition["structs"]: 

845 self._definition["structs"][field] = deepcopy(struct) 

846 else: 

847 # recursively merge 

848 self._definition["structs"][field] = Construct.merge(self._definition["structs"][field], struct).raw 

849 

850 @property 

851 def fields(self): 

852 return self._definition.get("fields", {}).items() 

853 

854 def field_instructions(self, field): 

855 return self._definition.get("fields", {}).get(field) 

856 

857 def add_field(self, field_name, instructions, overwrite=False): 

858 if "fields" not in self._definition: 

859 self._definition["fields"] = {} 

860 if overwrite or field_name not in self._definition["fields"]: 

861 self._definition["fields"][field_name] = deepcopy(instructions) 

862 

863 @property 

864 def lists(self): 

865 return self._definition.get("lists", {}).items() 

866 

867 @property 

868 def list_names(self): 

869 return self._definition.get("lists", {}).keys() 

870 

871 def list_instructions(self, field): 

872 return self._definition.get("lists", {}).get(field) 

873 

874 def add_list(self, list_name, instructions, overwrite=False): 

875 if "lists" not in self._definition: 

876 self._definition["lists"] = {} 

877 if overwrite or list_name not in self._definition["lists"]: 

878 self._definition["lists"][list_name] = deepcopy(instructions) 

879 

880 def get_coerce(self, instructions): 

881 coerce_name = instructions.get("coerce", self._default_coerce) 

882 return coerce_name, self._coerce.get(coerce_name) 

883 

884 def get(self, elem, default=None): 

885 if elem in self._definition: 

886 return self._definition.get(elem) 

887 else: 

888 return default 

889 

890 def lookup(self, path): 

891 bits = path.split(".") 

892 

893 # if there's more than one path element, we will need to recurse 

894 if len(bits) > 1: 

895 # it has to be an object, in order for the path to still have multiple 

896 # segments 

897 if bits[0] not in self.objects: 

898 return None, None, None 

899 substruct = self.substruct(bits[0]) 

900 return substruct.lookup(".".join(bits[1:])) 

901 elif len(bits) == 1: 

902 # first check the fields 

903 instructions = self.field_instructions(bits[0]) 

904 if instructions is not None: 

905 return "field", None, instructions 

906 

907 # then check the lists 

908 instructions = self.list_instructions(bits[0]) 

909 if instructions is not None: 

910 substruct = self.substruct(bits[0]) 

911 return "list", substruct, instructions 

912 

913 # then check the objects 

914 if bits[0] in self.objects: 

915 substruct = self.substruct(bits[0]) 

916 return "struct", substruct, instructions 

917 

918 return None, None, None 

919 

920 def construct(self, obj, check_required=True, silent_prune=False, allow_other_fields=False): 

921 

922 def recurse(obj, struct, context): 

923 if obj is None: 

924 return None 

925 if not isinstance(obj, dict): 

926 raise SeamlessException("Expected a dict at '{c}' but found something else instead".format(c=context)) 

927 

928 keyset = obj.keys() 

929 

930 # if we are checking required fields, then check them 

931 # FIXME: might be sensible to move this out to a separate phase, independent of constructing 

932 if check_required: 

933 for r in struct.required: 

934 if r not in keyset: 

935 raise SeamlessException("Field '{r}' is required but not present at '{c}'".format(r=r, c=context)) 

936 

937 # check that there are no fields that are not allowed 

938 # Note that since the construct mechanism copies fields explicitly, silent_prune just turns off this 

939 # check 

940 if not allow_other_fields and not silent_prune: 

941 allowed = struct.allowed 

942 for k in keyset: 

943 if k not in allowed: 

944 c = context if context != "" else "root" 

945 raise SeamlessException("Field '{k}' is not permitted at '{c}'".format(k=k, c=c)) 

946 

947 # make a SeamlessData instance for gathering all the new data 

948 constructed = SeamlessData(struct=struct) 

949 

950 # now check all the fields 

951 for field_name, instructions in struct.fields: 

952 val = obj.get(field_name) 

953 if val is None: 

954 continue 

955 typ, substruct, instructions = struct.lookup(field_name) 

956 if instructions is None: 

957 raise SeamlessException("No instruction set defined for field at '{x}'".format(x=context + field_name)) 

958 coerce_name, coerce_fn = struct.get_coerce(instructions) 

959 if coerce_fn is None: 

960 raise SeamlessException("No coerce function defined for type '{x}' at '{c}'".format(x=coerce_name, c=context + field_name)) 

961 kwargs = struct.kwargs(typ, "set", instructions) 

962 constructed.set_single(field_name, val, coerce=coerce_fn, context=context, **kwargs) 

963 

964 # next check all the objects (which will involve a recursive call to this function) 

965 for field_name in struct.objects: 

966 val = obj.get(field_name) 

967 if val is None: 

968 continue 

969 if type(val) != dict: 

970 raise SeamlessException("Expected dict at '{x}' but found '{y}'".format(x=context + field_name, y=type(val))) 

971 

972 typ, substruct, instructions = struct.lookup(field_name) 

973 

974 if substruct is None: 

975 # this is the lowest point at which we have instructions, so just accept the data structure as-is 

976 # (taking a deep copy to destroy any references) 

977 constructed.set_single(field_name, deepcopy(val)) 

978 else: 

979 # we need to recurse further down 

980 beneath = recurse(val, substruct, context=context + field_name + ".") 

981 

982 # what we get back is the correct sub-data structure, which we can then store 

983 constructed.set_single(field_name, beneath) 

984 

985 # now check all the lists 

986 for field_name, instructions in struct.lists: 

987 vals = obj.get(field_name) 

988 if vals is None: 

989 continue 

990 if not isinstance(vals, list): 

991 raise SeamlessException("Expecting list at '{x}' but found something else '{y}'".format(x=context + field_name, y=type(val))) 

992 

993 typ, substruct, instructions = struct.lookup(field_name) 

994 kwargs = struct.kwargs(typ, "set", instructions) 

995 

996 contains = instructions.get("contains") 

997 if contains == "field": 

998 # coerce all the values in the list 

999 coerce_name, coerce_fn = struct.get_coerce(instructions) 

1000 if coerce_fn is None: 

1001 raise SeamlessException("No coerce function defined for type '{x}' at '{c}'".format(x=coerce_name, c=context + field_name)) 

1002 

1003 for i in range(len(vals)): 

1004 val = vals[i] 

1005 constructed.add_to_list(field_name, val, coerce=coerce_fn, **kwargs) 

1006 

1007 elif contains == "object": 

1008 # for each object in the list, send it for construction 

1009 for i in range(len(vals)): 

1010 val = vals[i] 

1011 

1012 if type(val) != dict: 

1013 print("Expected dict at '{x}[{p}]' but got '{y}'".format(x=context + field_name, y=type(val), p=i)) 

1014 raise SeamlessException("Expected dict at '{x}[{p}]' but got '{y}'".format(x=context + field_name, y=type(val), p=i)) 

1015 

1016 substruct = struct.substruct(field_name) 

1017 if substruct is None: 

1018 constructed.add_to_list(field_name, deepcopy(val)) 

1019 else: 

1020 # we need to recurse further down 

1021 beneath = recurse(val, substruct, context=context + field_name + "[" + str(i) + "].") 

1022 

1023 # what we get back is the correct sub-data structure, which we can then store 

1024 constructed.add_to_list(field_name, beneath) 

1025 

1026 else: 

1027 raise SeamlessException("Cannot understand structure where list '{x}' elements contain '{y}'".format(x=context + field_name, y=contains)) 

1028 

1029 # finally, if we allow other fields, make sure that they come across too 

1030 if allow_other_fields: 

1031 known = struct.allowed 

1032 for k, v in obj.items(): 

1033 if k not in known: 

1034 constructed.set_single(k, v) 

1035 

1036 # ensure any external references to the object persist 

1037 obj.clear() 

1038 obj.update(constructed.data) 

1039 return obj 

1040 

1041 ready = recurse(obj, self, "[root]") 

1042 return SeamlessData(ready, struct=self) 

1043 

1044 def validate(self): 

1045 

1046 def recurse(struct, context): 

1047 # check that only the allowed keys are present 

1048 keys = struct.raw.keys() 

1049 for k in keys: 

1050 if k not in ["fields", "objects", "lists", "required", "structs"]: 

1051 raise SeamlessException("Key '{x}' present in struct at '{y}', but is not permitted".format(x=k, y=context)) 

1052 

1053 # now go through and make sure the fields are the right shape: 

1054 for field_name, instructions in struct.fields: 

1055 for k,v in instructions.items(): 

1056 if not isinstance(v, list) and not isinstance(v, str) and not isinstance(v, bool): 

1057 raise SeamlessException("Argument '{a}' in field '{b}' at '{c}' is not a string, list or boolean".format(a=k, b=field_name, c=context)) 

1058 

1059 # then make sure the objects are ok 

1060 for o in struct.objects: 

1061 if not isinstance(o, str): 

1062 raise SeamlessException("There is a non-string value in the object list at '{y}'".format(y=context)) 

1063 

1064 # make sure the lists are correct 

1065 for field_name, instructions in struct.lists: 

1066 contains = instructions.get("contains") 

1067 if contains is None: 

1068 raise SeamlessException("No 'contains' argument in list definition for field '{x}' at '{y}'".format(x=field_name, y=context)) 

1069 if contains not in ["object", "field"]: 

1070 raise SeamlessException("'contains' argument in list '{x}' at '{y}' contains illegal value '{z}'".format(x=field_name, y=context, z=contains)) 

1071 for k,v in instructions.items(): 

1072 if not isinstance(v, list) and not isinstance(v, str) and not isinstance(v, bool): 

1073 raise SeamlessException("Argument '{a}' in list '{b}' at '{c}' is not a string, list or boolean".format(a=k, b=field_name, c=context)) 

1074 

1075 # make sure the requireds are correct 

1076 for o in struct.required: 

1077 if not isinstance(o, str): 

1078 raise SeamlessException("There is a non-string value in the required list at '{y}'".format(y=context)) 

1079 

1080 # now do the structs, which will involve some recursion 

1081 substructs = struct.substructs 

1082 

1083 # first check that there are no previously unknown keys in there 

1084 possibles = struct.objects + list(struct.list_names) 

1085 for s in substructs: 

1086 if s not in possibles: 

1087 raise SeamlessException("struct contains key '{a}' which is not listed in object or list definitions at '{x}'".format(a=s, x=context)) 

1088 

1089 # now recurse into each struct 

1090 for k, v in substructs.items(): 

1091 nc = context 

1092 if nc == "": 

1093 nc = k 

1094 else: 

1095 nc += "." + k 

1096 recurse(Construct(v, None, None), context=nc) 

1097 

1098 return True 

1099 

1100 recurse(self, "[root]") 

1101 

1102 

1103def create_allowed_values_by_constant(constant_class: Type[ConstantList]): 

1104 return { 

1105 'allowed_values': list(constant_class.all_constants()) 

1106 }