Coverage for portality/decorators.py: 52%

92 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-20 16:12 +0100

1import json, signal, datetime 

2from functools import wraps 

3from flask import request, abort, redirect, flash, url_for, render_template, make_response 

4from flask_login import login_user, current_user 

5 

6from portality.api.common import Api401Error 

7 

8from portality.core import app 

9from portality.models import Account 

10from portality.models.harvester import HarvesterProgressReport as Report 

11 

12 

13def swag(swag_summary, swag_spec): 

14 """ 

15 ~~Swagger:Feature~~ 

16 Decorator for API functions, adding swagger info to the swagger spec. 

17 """ 

18 def decorator(f): 

19 f.summary = swag_summary 

20 f.swag = swag_spec 

21 f.description = swag_summary 

22 return f 

23 

24 return decorator 

25 

26 

27def api_key_required(fn): 

28 """ 

29 ~~APIKey:Feature~~ 

30 Decorator for API functions, requiring a valid key to find a user 

31 """ 

32 @wraps(fn) 

33 def decorated_view(*args, **kwargs): 

34 api_key = request.values.get("api_key", None) 

35 if api_key is not None: 

36 user = Account.pull_by_api_key(api_key) 

37 if user is not None: 

38 if login_user(user, remember=False): 

39 return fn(*args, **kwargs) 

40 # else 

41 raise Api401Error("An API Key is required to access this.") 

42 

43 return decorated_view 

44 

45 

46def api_key_optional(fn): 

47 """ 

48 ~~APIKey:Feature~~ 

49 Decorator for API functions, requiring a valid key to find a user if a key is provided. OK if none provided. 

50 """ 

51 @wraps(fn) 

52 def decorated_view(*args, **kwargs): 

53 api_key = request.values.get("api_key", None) 

54 if api_key: 

55 user = Account.pull_by_api_key(api_key) 

56 if user is not None: 

57 if login_user(user, remember=False): 

58 return fn(*args, **kwargs) 

59 # else 

60 abort(401) 

61 

62 # no api key, which is ok 

63 return fn(*args, **kwargs) 

64 

65 return decorated_view 

66 

67 

68def ssl_required(fn): 

69 """ 

70 ~~SSLRequired:Feature~~ 

71 Decorator for when a view f() should be served only over SSL 

72 """ 

73 @wraps(fn) 

74 def decorated_view(*args, **kwargs): 

75 if app.config.get("SSL"): 

76 if request.is_secure: 

77 return fn(*args, **kwargs) 

78 else: 

79 return redirect(request.url.replace("http://", "https://")) 

80 

81 return fn(*args, **kwargs) 

82 

83 return decorated_view 

84 

85 

86def restrict_to_role(role): 

87 """ 

88 ~~Authorisation:Feature~~ 

89 :param role: 

90 :return: 

91 """ 

92 if current_user.is_anonymous: 

93 flash('You are trying to access a protected area. Please log in first.', 'error') 

94 return redirect(url_for('account.login', next=request.url)) 

95 

96 if not current_user.has_role(role): 

97 flash('You do not have permission to access this area of the site.', 'error') 

98 return redirect(url_for('doaj.home')) 

99 

100 

101def write_required(script=False, api=False): 

102 """ 

103 ~~ReadOnlyMode:Feature~~ 

104 :param script: 

105 :param api: 

106 :return: 

107 """ 

108 def decorator(fn): 

109 @wraps(fn) 

110 def decorated_view(*args, **kwargs): 

111 if app.config.get("READ_ONLY_MODE", False): 

112 # TODO remove "script" argument from decorator. 

113 # Should be possible to detect if this is run in a web context or not. 

114 if script: 

115 raise RuntimeError('This task cannot run since the system is in read-only mode.') 

116 elif api: 

117 resp = make_response(json.dumps({"message" : "We are currently carrying out essential maintenance, and this route is temporarily unavailable"}), 503) 

118 resp.mimetype = "application/json" 

119 return resp 

120 else: 

121 return render_template("doaj/readonly.html") 

122 

123 return fn(*args, **kwargs) 

124 

125 return decorated_view 

126 return decorator 

127 

128 

129class CaughtTermException(Exception): 

130 pass 

131 

132 

133def _term_handler(signum, frame): 

134 app.logger.warning("Harvester terminated with signal " + str(signum)) 

135 raise CaughtTermException 

136 

137 

138def capture_sigterm(fn): 

139 """ 

140 ~~CaptureSigterm:Feature~~ 

141 Decorator which allows graceful exit on SIGTERM 

142 """ 

143 

144 # Register the SIGTERM handler to raise an exception, allowing graceful exit. 

145 signal.signal(signal.SIGTERM, _term_handler) 

146 

147 @wraps(fn) 

148 def decorated_fn(*args, **kwargs): 

149 try: 

150 fn(*args, **kwargs) 

151 except (CaughtTermException, KeyboardInterrupt): 

152 app.logger.warning(u"Harvester caught SIGTERM. Exiting.") 

153 report = Report.write_report() 

154 if app.config.get("HARVESTER_EMAIL_ON_EVENT", False): 

155 to = app.config.get("HARVESTER_EMAIL_RECIPIENTS", None) 

156 fro = app.config.get("SYSTEM_EMAIL_FROM") 

157 

158 if to is not None: 

159 from portality import app_email as mail 

160 mail.send_mail( 

161 to=to, 

162 fro=fro, 

163 subject="DOAJ Harvester caught SIGTERM at {0}".format( 

164 datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")), 

165 msg_body=report 

166 ) 

167 app.logger.info(report) 

168 exit(1) 

169 

170 return decorated_fn