Coverage for portality / util.py: 53%

113 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 00:09 +0100

1import json 

2import urllib.request 

3from functools import wraps 

4 

5import werkzeug.routing 

6from flask import request, current_app, flash, make_response, url_for as flask_url_for 

7from urllib.parse import urlparse, urljoin 

8from portality.core import app 

9 

10 

11def is_safe_url(target): 

12 ref_url = urlparse(request.host_url) 

13 test_url = urlparse(urljoin(request.host_url, target)) 

14 if test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc: 

15 return target 

16 else: 

17 return '/' 

18 

19 

20def jsonp(f): 

21 """Wraps JSONified output for JSONP""" 

22 @wraps(f) 

23 def decorated_function(*args, **kwargs): 

24 callback = request.args.get('callback', False) 

25 if callback: 

26 content = str(callback) + '(' + str(f(*args, **kwargs).data.decode("utf-8")) + ')' 

27 return current_app.response_class(content, mimetype='application/javascript') 

28 else: 

29 return f(*args, **kwargs) 

30 return decorated_function 

31 

32 

33# derived from http://flask.pocoo.org/snippets/45/ (pd) and customised 

34def request_wants_json(): 

35 best = request.accept_mimetypes.best_match(['application/json', 'text/html']) 

36 if best == 'application/json' and request.accept_mimetypes[best] > request.accept_mimetypes['text/html']: 

37 best = True 

38 else: 

39 best = False 

40 if request.values.get('format','').lower() == 'json' or request.path.endswith(".json"): 

41 best = True 

42 return best 

43 

44 

45def flash_with_url(message, category=''): 

46 flash(message, category + '+contains-url') 

47 

48 

49def listpop(l, default=None): 

50 return l[0] if l else default 

51 

52 

53def normalise_issn(issn): 

54 issn = issn.upper() 

55 if len(issn) > 8: 

56 return issn 

57 if len(issn) == 8: 

58 if "-" in issn: 

59 return "0" + issn 

60 else: return issn[:4] + "-" + issn[4:] 

61 if len(issn) < 8: 

62 if "-" in issn: 

63 return ("0" * (9 - len(issn))) + issn 

64 else: 

65 issn = ("0" * (8 - len(issn))) + issn 

66 return issn[:4] + "-" + issn[4:] 

67 

68 

69def load_file(filename): 

70 with open(filename, 'r') as f: 

71 content = f.read() 

72 return content 

73 

74 

75def make_json_resp(data, status_code, json_dumps_kwargs=None): 

76 if json_dumps_kwargs is None: 

77 json_dumps_kwargs = {} 

78 resp = make_response(json.dumps(data, **json_dumps_kwargs)) 

79 resp.status_code = status_code 

80 resp.mimetype = "application/json" 

81 return resp 

82 

83 

84def get_web_json_payload(): 

85 """ 

86 Attempts to load JSON from request.data. 

87 

88 If valid, returns the decoded JSON payload to the caller. 

89 If invalid returns a JSON response with a 400 Bad Request to the web user. 

90 """ 

91 r = {} 

92 try: 

93 payload = json.loads(request.data.decode("utf-8")) 

94 except ValueError: 

95 r['error'] = "Invalid JSON payload from request.data .\n{}".format(request.data) 

96 return make_json_resp(r, status_code=400) 

97 return payload 

98 

99 

100def validate_json(payload, fields_must_be_present=None, fields_must_not_be_present=None, error_to_raise=None): 

101 if not fields_must_be_present: 

102 fields_must_be_present = [] 

103 

104 if not fields_must_not_be_present: 

105 fields_must_not_be_present = [] 

106 

107 for f in fields_must_be_present: 

108 if f not in payload: 

109 if error_to_raise: 

110 raise error_to_raise('Invalid JSON. The field {} was missing and is required.'.format(f)) 

111 else: 

112 return False 

113 

114 for f in fields_must_not_be_present: 

115 if f in payload: 

116 if error_to_raise: 

117 raise error_to_raise('Invalid JSON. The field {} was present and must not be present.'.format(f)) 

118 else: 

119 return False 

120 

121 return True 

122 

123 

124def batch_up(long_list, batch_size): 

125 """Yield successive n-sized chunks from l (a list).""" 

126 # http://stackoverflow.com/a/312464/1154882 

127 for i in range(0, len(long_list), batch_size): 

128 yield long_list[i:i + batch_size] 

129 

130 

131def ipt_prefix(type): 

132 """ For IPT connections, prepend the index prefix to the type so we connect to the right index-per-type index. """ 

133 # ~~Elasticsearch:Technology~~ 

134 if app.config['ELASTIC_SEARCH_INDEX_PER_TYPE']: 

135 return app.config['ELASTIC_SEARCH_DB_PREFIX'] + type 

136 else: 

137 return type 

138 

139 

140def url_for(*args, **kwargs): 

141 """ 

142 This function is a hack to allow us to use url_for where we may nor may not have the 

143 right request context. 

144 

145 HACK: this bit of code is required because notifications called from huey using the shortcircuit event 

146 dispatcher do not have the correct request context, and I was unable to figure out how to set the correct 

147 one in the framework above. So instead this is a dirty workaround which pushes the right test context 

148 if needed. 

149 

150 :param args: 

151 :param kwargs: 

152 :return: 

153 """ 

154 try: 

155 url = flask_url_for(*args, **kwargs) 

156 except: 

157 from portality.app import app as doajapp 

158 

159 with doajapp.test_request_context("/"): 

160 url = flask_url_for(*args, **kwargs) 

161 

162 return url 

163 

164 

165def get_full_url_by_endpoint(endpoint): 

166 """ 

167 werkzeug.routing.BuildError will be throw if rout endpoint not found 

168 """ 

169 return app.config.get("BASE_URL", "https://doaj.org") + url_for(endpoint) 

170 

171 

172def get_full_url_safe(endpoint): 

173 try: 

174 return get_full_url_by_endpoint(endpoint) 

175 except werkzeug.routing.BuildError: 

176 app.logger.warning(f'endpoint not found -- [{endpoint}]') 

177 return None 

178 

179def no_op(*args, **kwargs): 

180 """ noop (no operation) function """ 

181 pass 

182 

183 

184def patch_config(inst, properties): 

185 originals = {} 

186 for k, v in properties.items(): 

187 originals[k] = inst.config.get(k) 

188 inst.config[k] = v 

189 return originals