Coverage for portality/lib/edges.py: 92%

37 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-08-04 15:38 +0100

1# ~~EdgesIntegration:Library~~ 

2# ~~-> Edges:Technology~~ 

3# ~~-> Elasticsearch:Technology~~ 

4 

5from urllib import parse 

6import json 

7 

8 

9def make_url_query(**params): 

10 query = make_query_json(**params) 

11 return parse.quote_plus(query) 

12 

13 

14def make_query_json(**params): 

15 query = make_query(**params) 

16 return json.dumps(query) 

17 

18 

19def make_query(**params): 

20 gsq = GeneralSearchQuery(**params) 

21 return gsq.query() 

22 

23 

24class GeneralSearchQuery(object): 

25 # ~~-> Edges:Query~~ 

26 def __init__(self, term=None, terms=None, query_string=None, sort=None): 

27 self.terms = None if terms is None else terms if isinstance(terms, list) else [terms] 

28 self.term = None if term is None else term 

29 self.query_string = query_string 

30 self.sort = sort 

31 

32 def query(self): 

33 musts = [] 

34 if self.terms is not None: 

35 for term in self.terms: 

36 musts.append({"terms" : term}) 

37 

38 if self.term is not None: 

39 for term in self.term: 

40 musts.append({"term" : term}) 

41 

42 if self.query_string is not None: 

43 qs = {"query_string": {"default_operator": "AND", "query": self.query_string}} 

44 musts.append(qs) 

45 

46 query = {"match_all": {}} 

47 if len(musts) > 0: 

48 query = { 

49 "bool": { 

50 "must": musts 

51 } 

52 } 

53 

54 obj = {"query": query} 

55 if self.sort: 

56 if not isinstance(self.sort, list): 

57 obj["sort"] = [self.sort] 

58 else: 

59 obj["sort"] = self.sort 

60 

61 return obj