Coverage for venv1 / lib / python3.10 / site-packages / combinatrix / core.py: 83%

214 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-04 09:41 +0100

1import json, os, csv 

2from combinatrix import models, constants 

3from combinatrix.exceptions import CombinatrixException, ValidationException 

4 

5 

6def convert_csv(csv_path, params_out_path=None): 

7 if csv_path is None: 

8 raise CombinatrixException("You must specify a csv_path to convert") 

9 parameters = _csv2parameters(csv_path) 

10 if params_out_path is not None: 

11 dir = os.path.dirname(params_out_path) 

12 if dir == "": # we were just given a filename for the current directory 

13 dir = "." 

14 if not os.path.exists(dir): 

15 raise CombinatrixException("The directory for the params_out_path does not exist") 

16 with open(params_out_path, "w", encoding="utf-8") as f: 

17 f.write(json.dumps(parameters.as_dict(), indent=2)) 

18 return parameters 

19 

20 

21def _csv2parameters(csv_path): 

22 if not os.path.exists(csv_path) or os.path.isdir(csv_path): 

23 raise CombinatrixException("csv_path is set to a missing file or to a directory") 

24 with open(csv_path, "r", encoding="utf-8") as f: 

25 reader = csv.reader(f) 

26 

27 parameters = models.Parameters() 

28 try: 

29 first = next(reader) 

30 second = next(reader) 

31 except StopIteration: 

32 raise CombinatrixException("Empty settings csv") 

33 except UnicodeDecodeError as e: 

34 raise CombinatrixException(e) 

35 

36 if first[0] != constants.FIELD: 

37 raise CombinatrixException("First CSV row must be a 'field' row") 

38 if second[0] != constants.TYPE: 

39 raise CombinatrixException("Second CSV row must be a 'type' row") 

40 

41 for i in range(1, len(first)): 

42 parameters.add_field(first[i], second[i]) 

43 

44 for row in reader: 

45 if row[0] == "": 

46 continue 

47 elif row[0] == constants.DEFAULT: 

48 _read_defaults(row[1:], parameters) 

49 elif row[0] == constants.VALUES: 

50 _read_values(row[1:], parameters) 

51 elif row[0].startswith(constants.CONSTRAINT): 

52 key = row[0].split(" ", 1)[1].strip() 

53 _read_constraint(row[1:], key, parameters) 

54 elif row[0].startswith(constants.CONDITION): 

55 key = row[0].split(" ", 1)[1].strip() 

56 _read_condition(row[1:], key, parameters) 

57 

58 return parameters 

59 

60 

61def _read_defaults(default_list, parameters): 

62 for i in range(len(default_list)): 

63 parameters.set_default(i, default_list[i]) 

64 

65 

66def _read_values(value_list, parameters): 

67 for i in range(len(value_list)): 

68 parameters.add_value(i, value_list[i]) 

69 

70 

71def _read_constraint(constraint_set, field, parameters): 

72 headers = parameters.field_names() 

73 key_idx = headers.index(field) 

74 key_value = None 

75 

76 # work out the value and the set of constraints for the supplied field which are specified by this row 

77 constraint_map = {} 

78 for i in range(len(constraint_set)): 

79 if parameters.get(i).get("type") not in [constants.GENERATED]: 

80 continue 

81 

82 constraint = constraint_set[i] 

83 if constraint in constants.ANY: 

84 continue 

85 

86 if i == key_idx: 

87 key_value = constraint 

88 else: 

89 constraint = constraint.strip() 

90 is_not = False 

91 if constraint.startswith(constants.NOT): 

92 is_not = True 

93 constraint = constraint[1:] 

94 bits = constraint.split(constants.OR) 

95 bits = [b.strip() for b in bits] 

96 

97 if is_not: 

98 if headers[i] not in constraint_map: 

99 constraint_map[headers[i]] = {} 

100 constraint_map[headers[i]]["nor"] = bits 

101 else: 

102 if headers[i] not in constraint_map: 

103 constraint_map[headers[i]] = {} 

104 constraint_map[headers[i]]["or"] = bits 

105 

106 # if we didn't find any constraints no need to go any further 

107 if len(list(constraint_map.keys())) == 0: 

108 return 

109 

110 for other_field, rules in constraint_map.items(): 

111 parameters.add_constraint(field, key_value, other_field, or_values=rules.get("or"), nor_values=rules.get("nor")) 

112 

113 

114def _read_condition(condition_set, field, parameters): 

115 headers = parameters.field_names() 

116 key_idx = headers.index(field) 

117 key_value = None 

118 

119 # work out the value and the set of constraints for the supplied field which are specified by this row 

120 condition_map = {} 

121 for i in range(len(condition_set)): 

122 if parameters.get(i).get("type") not in [constants.GENERATED, constants.CONDITIONAL]: 

123 continue 

124 

125 condition = condition_set[i] 

126 if condition in constants.ANY: 

127 continue 

128 

129 if i == key_idx: 

130 key_value = condition 

131 else: 

132 condition = condition.strip() 

133 is_not = False 

134 if condition.startswith(constants.NOT): 

135 is_not = True 

136 condition = condition[1:] 

137 bits = condition.split(constants.OR) 

138 bits = [b.strip() for b in bits] 

139 

140 if is_not: 

141 if headers[i] not in condition_map: 

142 condition_map[headers[i]] = {} 

143 condition_map[headers[i]]["nor"] = bits 

144 else: 

145 if headers[i] not in condition_map: 

146 condition_map[headers[i]] = {} 

147 condition_map[headers[i]]["or"] = bits 

148 

149 if len(list(condition_map.keys())) == 0: 

150 return 

151 

152 parameters.add_condition_set(field, key_value, condition_map) 

153 

154 

155def fromcsv(csv_path, combos_out_path, params_out_path=None): 

156 parameters = convert_csv(csv_path, params_out_path) 

157 return combine(parameters, combos_out_path) 

158 

159 

160def fromjsonfile(json_path, out_path): 

161 with open(json_path, "r", encoding="utf-8") as f: 

162 j = json.loads(f.read()) 

163 return combine(j, out_path) 

164 

165 

166def combine(parameters, out_path): 

167 # validate the input 

168 if parameters is None or out_path is None: 

169 raise CombinatrixException("parameters and out_path must be set") 

170 

171 if not isinstance(parameters, models.Parameters): 

172 parameters = models.Parameters(parameters) 

173 

174 out_dir = os.path.dirname(out_path) 

175 if out_dir == "": # we have been given a filename in the current directory 

176 out_dir = "." 

177 if not os.path.exists(out_dir): 

178 raise CombinatrixException("Directory {x} does not exist for output path".format(x=out_dir)) 

179 

180 # construct the counter around the parameters 

181 counter = models.ComboIterator(parameters) 

182 

183 # get all the generated fields 

184 generated_fields = parameters.field_names(types=[constants.GENERATED]) 

185 

186 # get all the fields which are populated conditionally 

187 conditional_fields = parameters.field_names(types=[constants.CONDITIONAL]) 

188 

189 # get all the fields which are index fields 

190 index_fields = parameters.field_names(types=[constants.INDEX]) 

191 current_index = 1 

192 

193 # iterate through all allowable combinations, and construct the final combo set 

194 combinations = [] 

195 while next(counter): 

196 combo = _generate_current(generated_fields, parameters, counter) 

197 _add_conditionals(combo, conditional_fields, parameters) 

198 current_index = _add_index(combo, current_index, index_fields, parameters) 

199 combinations.append(combo) 

200 

201 if out_path: 

202 header = parameters.field_names() 

203 with open(out_path, "w", encoding="utf-8") as f: 

204 writer = csv.writer(f) 

205 writer.writerow(header) 

206 for combo in combinations: 

207 row = [str(combo.get(name, "")) for name in header] 

208 writer.writerow(row) 

209 

210 return combinations 

211 

212 

213def _generate_current(fields, parameters, counter): 

214 """ 

215 For each field, select the value from the list of values (which are consistently ordered) 

216 that corresponds to the current counter position 

217 :param fields: 

218 :param counter: 

219 :return: 

220 """ 

221 record = {} 

222 for name in fields: 

223 values = parameters.get_values(name) 

224 current = counter.get_current(name) 

225 record[name] = values[current] 

226 return record 

227 

228 

229def _filter(combo, fields, parameters): 

230 for name in fields: 

231 cval = combo[name] 

232 constraints = parameters.get_constraints(name, cval) 

233 if (len(list(constraints.keys()))) == 0: 

234 continue 

235 

236 for cfield, ors_and_nors in constraints.items(): 

237 if "or" in ors_and_nors and "nor" in ors_and_nors: 

238 raise CombinatrixException("You cannot define both 'or' and 'nor' in your constraints") 

239 

240 if "or" in ors_and_nors: 

241 if combo[cfield] not in ors_and_nors["or"]: 

242 return False 

243 if "nor" in ors_and_nors: 

244 if combo[cfield] in ors_and_nors["nor"]: 

245 return False 

246 

247 return True 

248 

249 

250def _add_conditionals(combo, fields, parameters): 

251 for name in fields: 

252 possible_values = [] 

253 values = parameters.get_values(name) 

254 for val in values: 

255 conditions = parameters.get_conditions(name, val) 

256 if conditions is None: 

257 continue 

258 for match_group in conditions: 

259 if _conditions_match(combo, match_group): 

260 possible_values.append(val) 

261 break 

262 

263 """ 

264 for match_group in conditions: 

265 trips = 0 

266 for other_field, other_values in match_group.iteritems(): 

267 if combo[other_field] in other_values: 

268 trips += 1 

269 if trips == len(match_group.keys()): 

270 possible_values.append(val) 

271 break 

272 """ 

273 

274 possible_values = list(set(possible_values)) 

275 if len(possible_values) == 0: 

276 combo[name] = parameters.get_default(name) 

277 elif len(possible_values) == 1: 

278 combo[name] = possible_values[0] 

279 else: 

280 raise CombinatrixException("More than one possible value for '{x}'. For combination: {y} the possible values are: {z}".format( 

281 x=name, y=combo, z=possible_values)) 

282 return 

283 

284 

285def _conditions_match(combo, match_group): 

286 trips = 0 

287 for other_field, match_conditions in match_group.items(): 

288 if "or" in match_conditions: 

289 if combo[other_field] in match_conditions.get("or", []): 

290 trips += 1 

291 elif "nor" in match_conditions: 

292 if combo[other_field] not in match_conditions.get("nor", []): 

293 trips += 1 

294 else: 

295 raise ValidationException("Expected 'or' or 'nor' in match group") 

296 return trips == len(list(match_group.keys())) 

297 

298 

299def _add_index(combo, current_index, indices, parameters): 

300 for name in indices: 

301 combo[name] = str(current_index) 

302 return current_index + 1 

303 

304 

305def load_matrix(source_path): 

306 with open(source_path, "r", encoding="utf-8") as f: 

307 return [p for p in csv.DictReader(f)]