Coverage for portality/util.py: 49%
109 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-11-09 16:22 +0000
« prev ^ index » next coverage.py v6.4.2, created at 2022-11-09 16:22 +0000
1import json
2import urllib.request
3from functools import wraps
5import werkzeug.routing
6from flask import request, current_app, flash, make_response, url_for as flask_url_for
7from urllib.parse import urlparse, urljoin
8from portality.core import app
11def is_safe_url(target):
12 ref_url = urlparse(request.host_url)
13 test_url = urlparse(urljoin(request.host_url, target))
14 if test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc:
15 return target
16 else:
17 return '/'
20def jsonp(f):
21 """Wraps JSONified output for JSONP"""
22 @wraps(f)
23 def decorated_function(*args, **kwargs):
24 callback = request.args.get('callback', False)
25 if callback:
26 content = str(callback) + '(' + str(f(*args, **kwargs).data.decode("utf-8")) + ')'
27 return current_app.response_class(content, mimetype='application/javascript')
28 else:
29 return f(*args, **kwargs)
30 return decorated_function
33# derived from http://flask.pocoo.org/snippets/45/ (pd) and customised
34def request_wants_json():
35 best = request.accept_mimetypes.best_match(['application/json', 'text/html'])
36 if best == 'application/json' and request.accept_mimetypes[best] > request.accept_mimetypes['text/html']:
37 best = True
38 else:
39 best = False
40 if request.values.get('format','').lower() == 'json' or request.path.endswith(".json"):
41 best = True
42 return best
45def flash_with_url(message, category=''):
46 flash(message, category + '+contains-url')
49def listpop(l, default=None):
50 return l[0] if l else default
53def normalise_issn(issn):
54 issn = issn.upper()
55 if len(issn) > 8:
56 return issn
57 if len(issn) == 8:
58 if "-" in issn:
59 return "0" + issn
60 else: return issn[:4] + "-" + issn[4:]
61 if len(issn) < 8:
62 if "-" in issn:
63 return ("0" * (9 - len(issn))) + issn
64 else:
65 issn = ("0" * (8 - len(issn))) + issn
66 return issn[:4] + "-" + issn[4:]
69def load_file(filename):
70 with open(filename, 'r') as f:
71 content = f.read()
72 return content
75def make_json_resp(data, status_code, json_dumps_kwargs=None):
76 if json_dumps_kwargs is None:
77 json_dumps_kwargs = {}
78 resp = make_response(json.dumps(data, **json_dumps_kwargs))
79 resp.status_code = status_code
80 resp.mimetype = "application/json"
81 return resp
84def get_web_json_payload():
85 """
86 Attempts to load JSON from request.data.
88 If valid, returns the decoded JSON payload to the caller.
89 If invalid returns a JSON response with a 400 Bad Request to the web user.
90 """
91 r = {}
92 try:
93 payload = json.loads(request.data.decode("utf-8"))
94 except ValueError:
95 r['error'] = "Invalid JSON payload from request.data .\n{}".format(request.data)
96 return make_json_resp(r, status_code=400)
97 return payload
100def validate_json(payload, fields_must_be_present=None, fields_must_not_be_present=None, error_to_raise=None):
101 if not fields_must_be_present:
102 fields_must_be_present = []
104 if not fields_must_not_be_present:
105 fields_must_not_be_present = []
107 for f in fields_must_be_present:
108 if f not in payload:
109 if error_to_raise:
110 raise error_to_raise('Invalid JSON. The field {} was missing and is required.'.format(f))
111 else:
112 return False
114 for f in fields_must_not_be_present:
115 if f in payload:
116 if error_to_raise:
117 raise error_to_raise('Invalid JSON. The field {} was present and must not be present.'.format(f))
118 else:
119 return False
121 return True
124def batch_up(long_list, batch_size):
125 """Yield successive n-sized chunks from l (a list)."""
126 # http://stackoverflow.com/a/312464/1154882
127 for i in range(0, len(long_list), batch_size):
128 yield long_list[i:i + batch_size]
131def ipt_prefix(type):
132 """ For IPT connections, prepend the index prefix to the type so we connect to the right index-per-type index. """
133 # ~~Elasticsearch:Technology~~
134 if app.config['ELASTIC_SEARCH_INDEX_PER_TYPE']:
135 return app.config['ELASTIC_SEARCH_DB_PREFIX'] + type
136 else:
137 return type
140def verify_recaptcha(g_recaptcha_response):
141 """
142 ~~ReCAPTCHA:ExternalService~~
143 :param g_recaptcha_response:
144 :return:
145 """
146 with urllib.request.urlopen('https://www.recaptcha.net/recaptcha/api/siteverify?secret=' + app.config.get("RECAPTCHA_SECRET_KEY") + '&response=' + g_recaptcha_response) as url:
147 data = json.loads(url.read().decode())
148 return data
151def url_for(*args, **kwargs):
152 """
153 This function is a hack to allow us to use url_for where we may nor may not have the
154 right request context.
156 HACK: this bit of code is required because notifications called from huey using the shortcircuit event
157 dispatcher do not have the correct request context, and I was unable to figure out how to set the correct
158 one in the framework above. So instead this is a dirty workaround which pushes the right test context
159 if needed.
161 :param args:
162 :param kwargs:
163 :return:
164 """
165 try:
166 url = flask_url_for(*args, **kwargs)
167 except:
168 from portality.app import app as doajapp
170 with doajapp.test_request_context("/"):
171 url = flask_url_for(*args, **kwargs)
173 return url
176def get_full_url_by_endpoint(endpoint):
177 """
178 werkzeug.routing.BuildError will be throw if rout endpoint not found
179 """
180 return app.config.get("BASE_URL", "https://doaj.org") + url_for(endpoint)
183def get_full_url_safe(endpoint):
184 try:
185 return get_full_url_by_endpoint(endpoint)
186 except werkzeug.routing.BuildError:
187 app.logger.warning(f'endpoint not found -- [{endpoint}]')
188 return None