Coverage for venv1 / lib / python3.10 / site-packages / combinatrix / core.py: 83%
214 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
1import json, os, csv
2from combinatrix import models, constants
3from combinatrix.exceptions import CombinatrixException, ValidationException
6def convert_csv(csv_path, params_out_path=None):
7 if csv_path is None:
8 raise CombinatrixException("You must specify a csv_path to convert")
9 parameters = _csv2parameters(csv_path)
10 if params_out_path is not None:
11 dir = os.path.dirname(params_out_path)
12 if dir == "": # we were just given a filename for the current directory
13 dir = "."
14 if not os.path.exists(dir):
15 raise CombinatrixException("The directory for the params_out_path does not exist")
16 with open(params_out_path, "w", encoding="utf-8") as f:
17 f.write(json.dumps(parameters.as_dict(), indent=2))
18 return parameters
21def _csv2parameters(csv_path):
22 if not os.path.exists(csv_path) or os.path.isdir(csv_path):
23 raise CombinatrixException("csv_path is set to a missing file or to a directory")
24 with open(csv_path, "r", encoding="utf-8") as f:
25 reader = csv.reader(f)
27 parameters = models.Parameters()
28 try:
29 first = next(reader)
30 second = next(reader)
31 except StopIteration:
32 raise CombinatrixException("Empty settings csv")
33 except UnicodeDecodeError as e:
34 raise CombinatrixException(e)
36 if first[0] != constants.FIELD:
37 raise CombinatrixException("First CSV row must be a 'field' row")
38 if second[0] != constants.TYPE:
39 raise CombinatrixException("Second CSV row must be a 'type' row")
41 for i in range(1, len(first)):
42 parameters.add_field(first[i], second[i])
44 for row in reader:
45 if row[0] == "":
46 continue
47 elif row[0] == constants.DEFAULT:
48 _read_defaults(row[1:], parameters)
49 elif row[0] == constants.VALUES:
50 _read_values(row[1:], parameters)
51 elif row[0].startswith(constants.CONSTRAINT):
52 key = row[0].split(" ", 1)[1].strip()
53 _read_constraint(row[1:], key, parameters)
54 elif row[0].startswith(constants.CONDITION):
55 key = row[0].split(" ", 1)[1].strip()
56 _read_condition(row[1:], key, parameters)
58 return parameters
61def _read_defaults(default_list, parameters):
62 for i in range(len(default_list)):
63 parameters.set_default(i, default_list[i])
66def _read_values(value_list, parameters):
67 for i in range(len(value_list)):
68 parameters.add_value(i, value_list[i])
71def _read_constraint(constraint_set, field, parameters):
72 headers = parameters.field_names()
73 key_idx = headers.index(field)
74 key_value = None
76 # work out the value and the set of constraints for the supplied field which are specified by this row
77 constraint_map = {}
78 for i in range(len(constraint_set)):
79 if parameters.get(i).get("type") not in [constants.GENERATED]:
80 continue
82 constraint = constraint_set[i]
83 if constraint in constants.ANY:
84 continue
86 if i == key_idx:
87 key_value = constraint
88 else:
89 constraint = constraint.strip()
90 is_not = False
91 if constraint.startswith(constants.NOT):
92 is_not = True
93 constraint = constraint[1:]
94 bits = constraint.split(constants.OR)
95 bits = [b.strip() for b in bits]
97 if is_not:
98 if headers[i] not in constraint_map:
99 constraint_map[headers[i]] = {}
100 constraint_map[headers[i]]["nor"] = bits
101 else:
102 if headers[i] not in constraint_map:
103 constraint_map[headers[i]] = {}
104 constraint_map[headers[i]]["or"] = bits
106 # if we didn't find any constraints no need to go any further
107 if len(list(constraint_map.keys())) == 0:
108 return
110 for other_field, rules in constraint_map.items():
111 parameters.add_constraint(field, key_value, other_field, or_values=rules.get("or"), nor_values=rules.get("nor"))
114def _read_condition(condition_set, field, parameters):
115 headers = parameters.field_names()
116 key_idx = headers.index(field)
117 key_value = None
119 # work out the value and the set of constraints for the supplied field which are specified by this row
120 condition_map = {}
121 for i in range(len(condition_set)):
122 if parameters.get(i).get("type") not in [constants.GENERATED, constants.CONDITIONAL]:
123 continue
125 condition = condition_set[i]
126 if condition in constants.ANY:
127 continue
129 if i == key_idx:
130 key_value = condition
131 else:
132 condition = condition.strip()
133 is_not = False
134 if condition.startswith(constants.NOT):
135 is_not = True
136 condition = condition[1:]
137 bits = condition.split(constants.OR)
138 bits = [b.strip() for b in bits]
140 if is_not:
141 if headers[i] not in condition_map:
142 condition_map[headers[i]] = {}
143 condition_map[headers[i]]["nor"] = bits
144 else:
145 if headers[i] not in condition_map:
146 condition_map[headers[i]] = {}
147 condition_map[headers[i]]["or"] = bits
149 if len(list(condition_map.keys())) == 0:
150 return
152 parameters.add_condition_set(field, key_value, condition_map)
155def fromcsv(csv_path, combos_out_path, params_out_path=None):
156 parameters = convert_csv(csv_path, params_out_path)
157 return combine(parameters, combos_out_path)
160def fromjsonfile(json_path, out_path):
161 with open(json_path, "r", encoding="utf-8") as f:
162 j = json.loads(f.read())
163 return combine(j, out_path)
166def combine(parameters, out_path):
167 # validate the input
168 if parameters is None or out_path is None:
169 raise CombinatrixException("parameters and out_path must be set")
171 if not isinstance(parameters, models.Parameters):
172 parameters = models.Parameters(parameters)
174 out_dir = os.path.dirname(out_path)
175 if out_dir == "": # we have been given a filename in the current directory
176 out_dir = "."
177 if not os.path.exists(out_dir):
178 raise CombinatrixException("Directory {x} does not exist for output path".format(x=out_dir))
180 # construct the counter around the parameters
181 counter = models.ComboIterator(parameters)
183 # get all the generated fields
184 generated_fields = parameters.field_names(types=[constants.GENERATED])
186 # get all the fields which are populated conditionally
187 conditional_fields = parameters.field_names(types=[constants.CONDITIONAL])
189 # get all the fields which are index fields
190 index_fields = parameters.field_names(types=[constants.INDEX])
191 current_index = 1
193 # iterate through all allowable combinations, and construct the final combo set
194 combinations = []
195 while next(counter):
196 combo = _generate_current(generated_fields, parameters, counter)
197 _add_conditionals(combo, conditional_fields, parameters)
198 current_index = _add_index(combo, current_index, index_fields, parameters)
199 combinations.append(combo)
201 if out_path:
202 header = parameters.field_names()
203 with open(out_path, "w", encoding="utf-8") as f:
204 writer = csv.writer(f)
205 writer.writerow(header)
206 for combo in combinations:
207 row = [str(combo.get(name, "")) for name in header]
208 writer.writerow(row)
210 return combinations
213def _generate_current(fields, parameters, counter):
214 """
215 For each field, select the value from the list of values (which are consistently ordered)
216 that corresponds to the current counter position
217 :param fields:
218 :param counter:
219 :return:
220 """
221 record = {}
222 for name in fields:
223 values = parameters.get_values(name)
224 current = counter.get_current(name)
225 record[name] = values[current]
226 return record
229def _filter(combo, fields, parameters):
230 for name in fields:
231 cval = combo[name]
232 constraints = parameters.get_constraints(name, cval)
233 if (len(list(constraints.keys()))) == 0:
234 continue
236 for cfield, ors_and_nors in constraints.items():
237 if "or" in ors_and_nors and "nor" in ors_and_nors:
238 raise CombinatrixException("You cannot define both 'or' and 'nor' in your constraints")
240 if "or" in ors_and_nors:
241 if combo[cfield] not in ors_and_nors["or"]:
242 return False
243 if "nor" in ors_and_nors:
244 if combo[cfield] in ors_and_nors["nor"]:
245 return False
247 return True
250def _add_conditionals(combo, fields, parameters):
251 for name in fields:
252 possible_values = []
253 values = parameters.get_values(name)
254 for val in values:
255 conditions = parameters.get_conditions(name, val)
256 if conditions is None:
257 continue
258 for match_group in conditions:
259 if _conditions_match(combo, match_group):
260 possible_values.append(val)
261 break
263 """
264 for match_group in conditions:
265 trips = 0
266 for other_field, other_values in match_group.iteritems():
267 if combo[other_field] in other_values:
268 trips += 1
269 if trips == len(match_group.keys()):
270 possible_values.append(val)
271 break
272 """
274 possible_values = list(set(possible_values))
275 if len(possible_values) == 0:
276 combo[name] = parameters.get_default(name)
277 elif len(possible_values) == 1:
278 combo[name] = possible_values[0]
279 else:
280 raise CombinatrixException("More than one possible value for '{x}'. For combination: {y} the possible values are: {z}".format(
281 x=name, y=combo, z=possible_values))
282 return
285def _conditions_match(combo, match_group):
286 trips = 0
287 for other_field, match_conditions in match_group.items():
288 if "or" in match_conditions:
289 if combo[other_field] in match_conditions.get("or", []):
290 trips += 1
291 elif "nor" in match_conditions:
292 if combo[other_field] not in match_conditions.get("nor", []):
293 trips += 1
294 else:
295 raise ValidationException("Expected 'or' or 'nor' in match group")
296 return trips == len(list(match_group.keys()))
299def _add_index(combo, current_index, indices, parameters):
300 for name in indices:
301 combo[name] = str(current_index)
302 return current_index + 1
305def load_matrix(source_path):
306 with open(source_path, "r", encoding="utf-8") as f:
307 return [p for p in csv.DictReader(f)]