Coverage for portality / util.py: 53%
113 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
1import json
2import urllib.request
3from functools import wraps
5import werkzeug.routing
6from flask import request, current_app, flash, make_response, url_for as flask_url_for
7from urllib.parse import urlparse, urljoin
8from portality.core import app
11def is_safe_url(target):
12 ref_url = urlparse(request.host_url)
13 test_url = urlparse(urljoin(request.host_url, target))
14 if test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc:
15 return target
16 else:
17 return '/'
20def jsonp(f):
21 """Wraps JSONified output for JSONP"""
22 @wraps(f)
23 def decorated_function(*args, **kwargs):
24 callback = request.args.get('callback', False)
25 if callback:
26 content = str(callback) + '(' + str(f(*args, **kwargs).data.decode("utf-8")) + ')'
27 return current_app.response_class(content, mimetype='application/javascript')
28 else:
29 return f(*args, **kwargs)
30 return decorated_function
33# derived from http://flask.pocoo.org/snippets/45/ (pd) and customised
34def request_wants_json():
35 best = request.accept_mimetypes.best_match(['application/json', 'text/html'])
36 if best == 'application/json' and request.accept_mimetypes[best] > request.accept_mimetypes['text/html']:
37 best = True
38 else:
39 best = False
40 if request.values.get('format','').lower() == 'json' or request.path.endswith(".json"):
41 best = True
42 return best
45def flash_with_url(message, category=''):
46 flash(message, category + '+contains-url')
49def listpop(l, default=None):
50 return l[0] if l else default
53def normalise_issn(issn):
54 issn = issn.upper()
55 if len(issn) > 8:
56 return issn
57 if len(issn) == 8:
58 if "-" in issn:
59 return "0" + issn
60 else: return issn[:4] + "-" + issn[4:]
61 if len(issn) < 8:
62 if "-" in issn:
63 return ("0" * (9 - len(issn))) + issn
64 else:
65 issn = ("0" * (8 - len(issn))) + issn
66 return issn[:4] + "-" + issn[4:]
69def load_file(filename):
70 with open(filename, 'r') as f:
71 content = f.read()
72 return content
75def make_json_resp(data, status_code, json_dumps_kwargs=None):
76 if json_dumps_kwargs is None:
77 json_dumps_kwargs = {}
78 resp = make_response(json.dumps(data, **json_dumps_kwargs))
79 resp.status_code = status_code
80 resp.mimetype = "application/json"
81 return resp
84def get_web_json_payload():
85 """
86 Attempts to load JSON from request.data.
88 If valid, returns the decoded JSON payload to the caller.
89 If invalid returns a JSON response with a 400 Bad Request to the web user.
90 """
91 r = {}
92 try:
93 payload = json.loads(request.data.decode("utf-8"))
94 except ValueError:
95 r['error'] = "Invalid JSON payload from request.data .\n{}".format(request.data)
96 return make_json_resp(r, status_code=400)
97 return payload
100def validate_json(payload, fields_must_be_present=None, fields_must_not_be_present=None, error_to_raise=None):
101 if not fields_must_be_present:
102 fields_must_be_present = []
104 if not fields_must_not_be_present:
105 fields_must_not_be_present = []
107 for f in fields_must_be_present:
108 if f not in payload:
109 if error_to_raise:
110 raise error_to_raise('Invalid JSON. The field {} was missing and is required.'.format(f))
111 else:
112 return False
114 for f in fields_must_not_be_present:
115 if f in payload:
116 if error_to_raise:
117 raise error_to_raise('Invalid JSON. The field {} was present and must not be present.'.format(f))
118 else:
119 return False
121 return True
124def batch_up(long_list, batch_size):
125 """Yield successive n-sized chunks from l (a list)."""
126 # http://stackoverflow.com/a/312464/1154882
127 for i in range(0, len(long_list), batch_size):
128 yield long_list[i:i + batch_size]
131def ipt_prefix(type):
132 """ For IPT connections, prepend the index prefix to the type so we connect to the right index-per-type index. """
133 # ~~Elasticsearch:Technology~~
134 if app.config['ELASTIC_SEARCH_INDEX_PER_TYPE']:
135 return app.config['ELASTIC_SEARCH_DB_PREFIX'] + type
136 else:
137 return type
140def url_for(*args, **kwargs):
141 """
142 This function is a hack to allow us to use url_for where we may nor may not have the
143 right request context.
145 HACK: this bit of code is required because notifications called from huey using the shortcircuit event
146 dispatcher do not have the correct request context, and I was unable to figure out how to set the correct
147 one in the framework above. So instead this is a dirty workaround which pushes the right test context
148 if needed.
150 :param args:
151 :param kwargs:
152 :return:
153 """
154 try:
155 url = flask_url_for(*args, **kwargs)
156 except:
157 from portality.app import app as doajapp
159 with doajapp.test_request_context("/"):
160 url = flask_url_for(*args, **kwargs)
162 return url
165def get_full_url_by_endpoint(endpoint):
166 """
167 werkzeug.routing.BuildError will be throw if rout endpoint not found
168 """
169 return app.config.get("BASE_URL", "https://doaj.org") + url_for(endpoint)
172def get_full_url_safe(endpoint):
173 try:
174 return get_full_url_by_endpoint(endpoint)
175 except werkzeug.routing.BuildError:
176 app.logger.warning(f'endpoint not found -- [{endpoint}]')
177 return None
179def no_op(*args, **kwargs):
180 """ noop (no operation) function """
181 pass
184def patch_config(inst, properties):
185 originals = {}
186 for k, v in properties.items():
187 originals[k] = inst.config.get(k)
188 inst.config[k] = v
189 return originals