Coverage for portality/util.py: 49%

109 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-11-09 15:10 +0000

1import json 

2import urllib.request 

3from functools import wraps 

4 

5import werkzeug.routing 

6from flask import request, current_app, flash, make_response, url_for as flask_url_for 

7from urllib.parse import urlparse, urljoin 

8from portality.core import app 

9 

10 

11def is_safe_url(target): 

12 ref_url = urlparse(request.host_url) 

13 test_url = urlparse(urljoin(request.host_url, target)) 

14 if test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc: 

15 return target 

16 else: 

17 return '/' 

18 

19 

20def jsonp(f): 

21 """Wraps JSONified output for JSONP""" 

22 @wraps(f) 

23 def decorated_function(*args, **kwargs): 

24 callback = request.args.get('callback', False) 

25 if callback: 

26 content = str(callback) + '(' + str(f(*args, **kwargs).data.decode("utf-8")) + ')' 

27 return current_app.response_class(content, mimetype='application/javascript') 

28 else: 

29 return f(*args, **kwargs) 

30 return decorated_function 

31 

32 

33# derived from http://flask.pocoo.org/snippets/45/ (pd) and customised 

34def request_wants_json(): 

35 best = request.accept_mimetypes.best_match(['application/json', 'text/html']) 

36 if best == 'application/json' and request.accept_mimetypes[best] > request.accept_mimetypes['text/html']: 

37 best = True 

38 else: 

39 best = False 

40 if request.values.get('format','').lower() == 'json' or request.path.endswith(".json"): 

41 best = True 

42 return best 

43 

44 

45def flash_with_url(message, category=''): 

46 flash(message, category + '+contains-url') 

47 

48 

49def listpop(l, default=None): 

50 return l[0] if l else default 

51 

52 

53def normalise_issn(issn): 

54 issn = issn.upper() 

55 if len(issn) > 8: 

56 return issn 

57 if len(issn) == 8: 

58 if "-" in issn: 

59 return "0" + issn 

60 else: return issn[:4] + "-" + issn[4:] 

61 if len(issn) < 8: 

62 if "-" in issn: 

63 return ("0" * (9 - len(issn))) + issn 

64 else: 

65 issn = ("0" * (8 - len(issn))) + issn 

66 return issn[:4] + "-" + issn[4:] 

67 

68 

69def load_file(filename): 

70 with open(filename, 'r') as f: 

71 content = f.read() 

72 return content 

73 

74 

75def make_json_resp(data, status_code, json_dumps_kwargs=None): 

76 if json_dumps_kwargs is None: 

77 json_dumps_kwargs = {} 

78 resp = make_response(json.dumps(data, **json_dumps_kwargs)) 

79 resp.status_code = status_code 

80 resp.mimetype = "application/json" 

81 return resp 

82 

83 

84def get_web_json_payload(): 

85 """ 

86 Attempts to load JSON from request.data. 

87 

88 If valid, returns the decoded JSON payload to the caller. 

89 If invalid returns a JSON response with a 400 Bad Request to the web user. 

90 """ 

91 r = {} 

92 try: 

93 payload = json.loads(request.data.decode("utf-8")) 

94 except ValueError: 

95 r['error'] = "Invalid JSON payload from request.data .\n{}".format(request.data) 

96 return make_json_resp(r, status_code=400) 

97 return payload 

98 

99 

100def validate_json(payload, fields_must_be_present=None, fields_must_not_be_present=None, error_to_raise=None): 

101 if not fields_must_be_present: 

102 fields_must_be_present = [] 

103 

104 if not fields_must_not_be_present: 

105 fields_must_not_be_present = [] 

106 

107 for f in fields_must_be_present: 

108 if f not in payload: 

109 if error_to_raise: 

110 raise error_to_raise('Invalid JSON. The field {} was missing and is required.'.format(f)) 

111 else: 

112 return False 

113 

114 for f in fields_must_not_be_present: 

115 if f in payload: 

116 if error_to_raise: 

117 raise error_to_raise('Invalid JSON. The field {} was present and must not be present.'.format(f)) 

118 else: 

119 return False 

120 

121 return True 

122 

123 

124def batch_up(long_list, batch_size): 

125 """Yield successive n-sized chunks from l (a list).""" 

126 # http://stackoverflow.com/a/312464/1154882 

127 for i in range(0, len(long_list), batch_size): 

128 yield long_list[i:i + batch_size] 

129 

130 

131def ipt_prefix(type): 

132 """ For IPT connections, prepend the index prefix to the type so we connect to the right index-per-type index. """ 

133 # ~~Elasticsearch:Technology~~ 

134 if app.config['ELASTIC_SEARCH_INDEX_PER_TYPE']: 

135 return app.config['ELASTIC_SEARCH_DB_PREFIX'] + type 

136 else: 

137 return type 

138 

139 

140def verify_recaptcha(g_recaptcha_response): 

141 """ 

142 ~~ReCAPTCHA:ExternalService~~ 

143 :param g_recaptcha_response: 

144 :return: 

145 """ 

146 with urllib.request.urlopen('https://www.recaptcha.net/recaptcha/api/siteverify?secret=' + app.config.get("RECAPTCHA_SECRET_KEY") + '&response=' + g_recaptcha_response) as url: 

147 data = json.loads(url.read().decode()) 

148 return data 

149 

150 

151def url_for(*args, **kwargs): 

152 """ 

153 This function is a hack to allow us to use url_for where we may nor may not have the 

154 right request context. 

155 

156 HACK: this bit of code is required because notifications called from huey using the shortcircuit event 

157 dispatcher do not have the correct request context, and I was unable to figure out how to set the correct 

158 one in the framework above. So instead this is a dirty workaround which pushes the right test context 

159 if needed. 

160 

161 :param args: 

162 :param kwargs: 

163 :return: 

164 """ 

165 try: 

166 url = flask_url_for(*args, **kwargs) 

167 except: 

168 from portality.app import app as doajapp 

169 

170 with doajapp.test_request_context("/"): 

171 url = flask_url_for(*args, **kwargs) 

172 

173 return url 

174 

175 

176def get_full_url_by_endpoint(endpoint): 

177 """ 

178 werkzeug.routing.BuildError will be throw if rout endpoint not found 

179 """ 

180 return app.config.get("BASE_URL", "https://doaj.org") + url_for(endpoint) 

181 

182 

183def get_full_url_safe(endpoint): 

184 try: 

185 return get_full_url_by_endpoint(endpoint) 

186 except werkzeug.routing.BuildError: 

187 app.logger.warning(f'endpoint not found -- [{endpoint}]') 

188 return None