Coverage for portality/lib/edges.py: 92%
37 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-08-30 11:09 +0100
« prev ^ index » next coverage.py v6.4.2, created at 2022-08-30 11:09 +0100
1# ~~EdgesIntegration:Library~~
2# ~~-> Edges:Technology~~
3# ~~-> Elasticsearch:Technology~~
5from urllib import parse
6import json
9def make_url_query(**params):
10 query = make_query_json(**params)
11 return parse.quote_plus(query)
14def make_query_json(**params):
15 query = make_query(**params)
16 return json.dumps(query)
19def make_query(**params):
20 gsq = GeneralSearchQuery(**params)
21 return gsq.query()
24class GeneralSearchQuery(object):
25 # ~~-> Edges:Query~~
26 def __init__(self, term=None, terms=None, query_string=None, sort=None):
27 self.terms = None if terms is None else terms if isinstance(terms, list) else [terms]
28 self.term = None if term is None else term
29 self.query_string = query_string
30 self.sort = sort
32 def query(self):
33 musts = []
34 if self.terms is not None:
35 for term in self.terms:
36 musts.append({"terms" : term})
38 if self.term is not None:
39 for term in self.term:
40 musts.append({"term" : term})
42 if self.query_string is not None:
43 qs = {"query_string": {"default_operator": "AND", "query": self.query_string}}
44 musts.append(qs)
46 query = {"match_all": {}}
47 if len(musts) > 0:
48 query = {
49 "bool": {
50 "must": musts
51 }
52 }
54 obj = {"query": query}
55 if self.sort:
56 if not isinstance(self.sort, list):
57 obj["sort"] = [self.sort]
58 else:
59 obj["sort"] = self.sort
61 return obj