Coverage for portality/decorators.py: 68%
92 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-22 15:59 +0100
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-22 15:59 +0100
1import json, signal, datetime
2from functools import wraps
3from flask import request, abort, redirect, flash, url_for, render_template, make_response
4from flask_login import login_user, current_user
6from portality.api.common import Api401Error
8from portality.core import app
9from portality.models import Account
10from portality.models.harvester import HarvesterProgressReport as Report
13def swag(swag_summary, swag_spec):
14 """
15 ~~Swagger:Feature~~
16 Decorator for API functions, adding swagger info to the swagger spec.
17 """
18 def decorator(f):
19 f.summary = swag_summary
20 f.swag = swag_spec
21 f.description = swag_summary
22 return f
24 return decorator
27def api_key_required(fn):
28 """
29 ~~APIKey:Feature~~
30 Decorator for API functions, requiring a valid key to find a user
31 """
32 @wraps(fn)
33 def decorated_view(*args, **kwargs):
34 api_key = request.values.get("api_key", None)
35 if api_key is not None:
36 user = Account.pull_by_api_key(api_key)
37 if user is not None:
38 if login_user(user, remember=False):
39 return fn(*args, **kwargs)
40 # else
41 raise Api401Error("An API Key is required to access this.")
43 return decorated_view
46def api_key_optional(fn):
47 """
48 ~~APIKey:Feature~~
49 Decorator for API functions, requiring a valid key to find a user if a key is provided. OK if none provided.
50 """
51 @wraps(fn)
52 def decorated_view(*args, **kwargs):
53 api_key = request.values.get("api_key", None)
54 if api_key:
55 user = Account.pull_by_api_key(api_key)
56 if user is not None:
57 if login_user(user, remember=False):
58 return fn(*args, **kwargs)
59 # else
60 abort(401)
62 # no api key, which is ok
63 return fn(*args, **kwargs)
65 return decorated_view
68def ssl_required(fn):
69 """
70 ~~SSLRequired:Feature~~
71 Decorator for when a view f() should be served only over SSL
72 """
73 @wraps(fn)
74 def decorated_view(*args, **kwargs):
75 if app.config.get("SSL"):
76 if request.is_secure:
77 return fn(*args, **kwargs)
78 else:
79 return redirect(request.url.replace("http://", "https://"))
81 return fn(*args, **kwargs)
83 return decorated_view
86def restrict_to_role(role):
87 """
88 ~~Authorisation:Feature~~
89 :param role:
90 :return:
91 """
92 if current_user.is_anonymous:
93 flash('You are trying to access a protected area. Please log in first.', 'error')
94 return redirect(url_for('account.login', next=request.url))
96 if not current_user.has_role(role):
97 flash('You do not have permission to access this area of the site.', 'error')
98 return redirect(url_for('doaj.home'))
101def write_required(script=False, api=False):
102 """
103 ~~ReadOnlyMode:Feature~~
104 :param script:
105 :param api:
106 :return:
107 """
108 def decorator(fn):
109 @wraps(fn)
110 def decorated_view(*args, **kwargs):
111 if app.config.get("READ_ONLY_MODE", False):
112 # TODO remove "script" argument from decorator.
113 # Should be possible to detect if this is run in a web context or not.
114 if script:
115 raise RuntimeError('This task cannot run since the system is in read-only mode.')
116 elif api:
117 resp = make_response(json.dumps({"message" : "We are currently carrying out essential maintenance, and this route is temporarily unavailable"}), 503)
118 resp.mimetype = "application/json"
119 return resp
120 else:
121 return render_template("doaj/readonly.html")
123 return fn(*args, **kwargs)
125 return decorated_view
126 return decorator
129class CaughtTermException(Exception):
130 pass
133def _term_handler(signum, frame):
134 app.logger.warning("Harvester terminated with signal " + str(signum))
135 raise CaughtTermException
138def capture_sigterm(fn):
139 """
140 ~~CaptureSigterm:Feature~~
141 Decorator which allows graceful exit on SIGTERM
142 """
144 # Register the SIGTERM handler to raise an exception, allowing graceful exit.
145 signal.signal(signal.SIGTERM, _term_handler)
147 @wraps(fn)
148 def decorated_fn(*args, **kwargs):
149 try:
150 fn(*args, **kwargs)
151 except (CaughtTermException, KeyboardInterrupt):
152 app.logger.warning(u"Harvester caught SIGTERM. Exiting.")
153 report = Report.write_report()
154 if app.config.get("HARVESTER_EMAIL_ON_EVENT", False):
155 to = app.config.get("HARVESTER_EMAIL_RECIPIENTS", None)
156 fro = app.config.get("SYSTEM_EMAIL_FROM")
158 if to is not None:
159 from portality import app_email as mail
160 mail.send_mail(
161 to=to,
162 fro=fro,
163 subject="DOAJ Harvester caught SIGTERM at {0}".format(
164 datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")),
165 msg_body=report
166 )
167 app.logger.info(report)
168 exit(1)
170 return decorated_fn