Coverage for portality / view / account.py: 44%

252 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 00:09 +0100

1import uuid, json 

2 

3from flask import Blueprint, request, url_for, flash, redirect, make_response 

4from flask import render_template, abort 

5from flask_login import login_user, logout_user, current_user, login_required 

6from wtforms import StringField, HiddenField, PasswordField, DecimalField, validators, Form 

7 

8from portality import util 

9from portality import constants 

10from portality.core import app 

11from portality.decorators import ssl_required, write_required 

12from portality.models import Account, Event 

13from portality.forms.validate import DataOptional, EmailAvailable, ReservedUsernames, IdAvailable, IgnoreUnchanged 

14from portality.bll import DOAJ 

15from portality.ui.messages import Messages 

16 

17from portality.ui import templates 

18 

19blueprint = Blueprint('account', __name__) 

20 

21@blueprint.url_value_preprocessor 

22def pull_lang(endpoint, values): 

23 # Remove 'lang' so it is not passed to the view function 

24 if values: 

25 lang = values.pop('lang', None) 

26 

27@blueprint.route('/') 

28@login_required 

29@ssl_required 

30def index(): 

31 if not current_user.has_role("list_users"): 

32 abort(401) 

33 return render_template(templates.USER_LIST) 

34 

35 

36class UserEditForm(Form): 

37 

38 # Let's not allow anyone to change IDs - there lies madness and destruction (referential integrity) 

39 # id = StringField('ID', [IgnoreUnchanged(), ReservedUsernames(), IdAvailable()]) 

40 

41 name = StringField('Account name', [DataOptional(), validators.Length(min=3, max=64)]) 

42 email = StringField('Email address', [ 

43 IgnoreUnchanged(), 

44 validators.Length(min=3, max=254), 

45 validators.Email(message='Must be a valid email address'), 

46 EmailAvailable(), 

47 validators.EqualTo('email_confirm', message='Email confirmation must match'), 

48 ]) 

49 email_confirm = StringField('Confirm email address') 

50 roles = StringField('User roles') 

51 password_change = PasswordField('Change password', [ 

52 validators.EqualTo('password_confirm', message='Passwords must match'), 

53 ]) 

54 password_confirm = PasswordField('Confirm password') 

55 

56 

57@blueprint.route('/<username>', methods=['GET', 'POST', 'DELETE']) 

58@login_required 

59@ssl_required 

60@write_required() 

61def username(username): 

62 acc = Account.pull(username) 

63 

64 template = templates.PUBLIC_EDIT_USER 

65 if current_user.is_super: 

66 template = templates.ADMIN_EDIT_USER 

67 elif current_user.has_role(constants.ROLE_ASSOCIATE_EDITOR) or current_user.has_role(constants.ROLE_EDITOR): 

68 template = templates.EDITOR_EDIT_USER 

69 

70 if acc is None: 

71 abort(404) 

72 if (request.method == 'DELETE' or 

73 (request.method == 'POST' and request.values.get('submit', False) == 'Delete')): 

74 if current_user.id != acc.id and not current_user.is_super: 

75 abort(401) 

76 else: 

77 conf = request.values.get("delete_confirm") 

78 if conf is None or conf != "delete_confirm": 

79 flash('Check the box to confirm you really mean it!', "error") 

80 return render_template(template, account=acc, form=UserEditForm(obj=acc)) 

81 acc.delete() 

82 flash('Account ' + acc.id + ' deleted') 

83 return redirect(url_for('.index')) 

84 

85 elif request.method == 'POST': 

86 if current_user.id != acc.id and not current_user.is_super: 

87 abort(401) 

88 

89 form = UserEditForm(obj=acc, formdata=request.form) 

90 

91 if not form.validate(): 

92 return render_template(template, account=acc, form=form) 

93 

94 newdata = request.values 

95 try: 

96 newdata = request.json 

97 except: 

98 pass 

99 

100 # newdata = request.json if request.json else request.values 

101 if request.values.get('submit', False) == 'Generate a new API Key': 

102 acc.generate_api_key() 

103 

104 if 'name' in newdata: 

105 acc.set_name(newdata['name']) 

106 if 'password_change' in newdata and len(newdata['password_change']) > 0 and not newdata['password_change'].startswith('sha1'): 

107 acc.set_password(newdata['password_change']) 

108 

109 # only super users can re-write roles 

110 if "roles" in newdata and current_user.is_super: 

111 new_roles = [r.strip() for r in newdata.get("roles").split(",")] 

112 acc.set_role(new_roles) 

113 

114 if "marketing_consent" in newdata: 

115 acc.set_marketing_consent(newdata["marketing_consent"] == "true") 

116 

117 if 'email' in newdata and len(newdata['email']) > 0 and newdata['email'] != acc.email: 

118 acc.set_email(newdata['email']) 

119 

120 # If the user updated their own email address, invalidate the password and require verification again. 

121 if current_user.id == acc.id: 

122 acc.clear_password() 

123 reset_token = uuid.uuid4().hex 

124 acc.set_reset_token(reset_token, app.config.get("PASSWORD_RESET_TIMEOUT", 86400)) 

125 acc.save() 

126 

127 events_svc = DOAJ.eventsService() 

128 events_svc.trigger(Event(constants.EVENT_ACCOUNT_PASSWORD_RESET, acc.id, context={"account" : acc.data})) 

129 flash("Email address updated. You have been logged out for email address verification.") 

130 

131 logout_user() 

132 

133 if app.config.get('DEBUG', False): 

134 reset_url = url_for('account.reset', reset_token=acc.reset_token) 

135 util.flash_with_url('Debug mode - url for reset is <a href={0}>{0}</a>'.format(reset_url)) 

136 

137 return redirect(url_for('doaj.home')) 

138 

139 acc.save() 

140 flash("Record updated") 

141 return render_template(template, account=acc, form=form) 

142 

143 else: # GET 

144 if util.request_wants_json(): 

145 resp = make_response( 

146 json.dumps(acc.data, sort_keys=True, indent=4)) 

147 resp.mimetype = "application/json" 

148 return resp 

149 else: 

150 form = UserEditForm(obj=acc) 

151 return render_template(template, account=acc, form=form) 

152 

153 

154def get_redirect_target(form=None, acc=None): 

155 form_target = '' 

156 if form and hasattr(form, 'next') and getattr(form, 'next'): 

157 form_target = form.next.data 

158 

159 for target in form_target, request.args.get('next', []): 

160 if not target: 

161 continue 

162 if target == util.is_safe_url(target): 

163 return target 

164 

165 if acc is None: 

166 return "" 

167 

168 destinations = app.config.get("ROLE_LOGIN_DESTINATIONS") 

169 for role, dest in destinations: 

170 if acc.has_role(role): 

171 return url_for(dest) 

172 

173 return url_for(app.config.get("DEFAULT_LOGIN_DESTINATION")) 

174 

175 

176class RedirectForm(Form): 

177 next = HiddenField() 

178 

179 def __init__(self, *args, **kwargs): 

180 Form.__init__(self, *args, **kwargs) 

181 if not self.next.data: 

182 self.next.data = get_redirect_target() or '' 

183 

184 def redirect(self, endpoint='index', **values): 

185 if self.next.data == util.is_safe_url(self.next.data): 

186 return redirect(self.next.data) 

187 target = get_redirect_target() 

188 return redirect(target or url_for(endpoint, **values)) 

189 

190 

191class LoginForm(RedirectForm): 

192 user = StringField('Email address or username', [validators.DataRequired()]) 

193 password = PasswordField('Password', [validators.DataRequired()]) 

194 

195 

196@blueprint.route('/login', methods=['GET', 'POST']) 

197@ssl_required 

198def login(): 

199 current_info = {'next': request.args.get('next', '')} 

200 form = LoginForm(request.form, csrf_enabled=False, **current_info) 

201 if request.method == 'POST' and form.validate(): 

202 password = form.password.data 

203 username = form.user.data 

204 

205 # If our settings allow, try getting the user account by ID first, then by email address 

206 if app.config.get('LOGIN_VIA_ACCOUNT_ID', False): 

207 user = Account.pull(username) or Account.pull_by_email(username) 

208 else: 

209 user = Account.pull_by_email(username) 

210 

211 # If we have a verified user account, proceed to attempt login 

212 try: 

213 if user is not None: 

214 if user.check_password(password): 

215 login_user(user, remember=True) 

216 flash('Welcome back.', 'success') 

217 return redirect(get_redirect_target(form=form, acc=user)) 

218 else: 

219 form.password.errors.append('The password you entered is incorrect. Try again or <a href="{0}">reset your password</a>.'.format(url_for(".forgot"))) 

220 else: 

221 form.user.errors.append('Account not recognised. If you entered an email address, try your username instead.') 

222 except KeyError: 

223 # Account has no password set, the user needs to reset or use an existing valid reset link 

224 FORGOT_INSTR = '<a href="{url}">&lt;click here&gt;</a> to send a new reset link.'.format(url=url_for('.forgot')) 

225 util.flash_with_url('Account verification is incomplete. Check your emails for the link or ' + FORGOT_INSTR, 

226 'error') 

227 return redirect(url_for('doaj.home')) 

228 

229 if request.args.get("redirected") == "apply": 

230 form['next'].data = url_for("apply.public_application") 

231 return render_template(templates.LOGIN_TO_APPLY, form=form) 

232 return render_template(templates.GLOBAL_LOGIN, form=form) 

233 

234@blueprint.route('/forgot', methods=['GET', 'POST']) 

235@ssl_required 

236@write_required() 

237def forgot(): 

238 CONTACT_INSTR = ' Please <a href="{url}">contact us.</a>'.format(url=url_for('doaj.contact')) 

239 if request.method == 'POST': 

240 # get hold of the user account 

241 un = request.form.get('un', "") 

242 if app.config.get('LOGIN_VIA_ACCOUNT_ID', False): 

243 account = Account.pull(un) or Account.pull_by_email(un) 

244 else: 

245 account = Account.pull_by_email(un) 

246 

247 if account is None: 

248 util.flash_with_url('Error - your account username / email address is not recognised.' + CONTACT_INSTR, 

249 'error') 

250 return render_template(templates.FORGOT_PASSWORD) 

251 

252 if not account.data.get('email'): 

253 util.flash_with_url('Error - your account does not have an associated email address.' + CONTACT_INSTR, 

254 'error') 

255 return render_template(templates.FORGOT_PASSWORD) 

256 

257 # if we get to here, we have a user account to reset 

258 reset_token = uuid.uuid4().hex 

259 account.set_reset_token(reset_token, app.config.get("PASSWORD_RESET_TIMEOUT", 86400)) 

260 account.save() 

261 

262 events_svc = DOAJ.eventsService() 

263 events_svc.trigger(Event(constants.EVENT_ACCOUNT_PASSWORD_RESET, account.id, context={"account": account.data})) 

264 flash('Instructions to reset your password have been sent to you. Please check your emails.') 

265 

266 if app.config.get('DEBUG', False): 

267 util.flash_with_url('Debug mode - url for reset is <a href={0}>{0}</a>'.format( 

268 url_for('account.reset', reset_token=account.reset_token))) 

269 

270 return render_template(templates.FORGOT_PASSWORD) 

271 

272 

273class ResetForm(Form): 

274 password = PasswordField('Password', [ 

275 validators.DataRequired(), 

276 validators.EqualTo('confirm', message='Passwords must match') 

277 ]) 

278 confirm = PasswordField('Repeat Password') 

279 

280 

281@blueprint.route("/reset/<reset_token>", methods=["GET", "POST"]) 

282@ssl_required 

283@write_required() 

284def reset(reset_token): 

285 form = ResetForm(request.form, csrf_enabled=False) 

286 account = Account.get_by_reset_token(reset_token) 

287 if account is None: 

288 abort(404) 

289 

290 if request.method == "POST" and form.validate(): 

291 # check that the passwords match, and bounce if not 

292 pw = request.values.get("password") 

293 conf = request.values.get("confirm") 

294 if pw != conf: 

295 flash("Passwords do not match - please try again", "error") 

296 return render_template(templates.RESET_PASSWORD, account=account, form=form) 

297 

298 # update the user's account 

299 account.set_password(pw) 

300 account.remove_reset_token() 

301 account.save() 

302 flash("New password has been set and you're now logged in.", "success") 

303 

304 # log the user in 

305 login_user(account, remember=True) 

306 return redirect(url_for('doaj.home')) 

307 

308 return render_template(templates.RESET_PASSWORD, account=account, form=form) 

309 

310 

311@blueprint.route('/logout', methods=['POST']) 

312@ssl_required 

313def logout(): 

314 logout_user() 

315 flash('You are now logged out', 'success') 

316 return redirect('/') 

317 

318 

319class RegisterForm(RedirectForm): 

320 identifier = StringField('ID', [ReservedUsernames(), IdAvailable()]) 

321 name = StringField('Name', [validators.Optional(), validators.Length(min=3, max=64)]) 

322 sender_email = StringField('Email address', [ 

323 validators.DataRequired(), 

324 validators.Length(min=3, max=254), 

325 validators.Email(message='Must be a valid email address'), 

326 EmailAvailable(message="That email address is already in use. Please <a href='/account/forgot'>reset your password</a>. If you still cannot login, <a href='/contact'>contact us</a>.") 

327 ]) 

328 roles = StringField('Roles') 

329 # These are honeypot (bot-trap) fields 

330 email = StringField('email') 

331 hptimer = DecimalField('hptimer', [validators.Optional()]) 

332 

333 def is_bot(self): 

334 """ 

335 Checks honeypot fields and determines whether the form was submitted by a bot 

336 :return: True, if bot suspected; False, if human 

337 """ 

338 return self.email.data != "" or self.hptimer.data is None or self.hptimer.data < app.config.get("HONEYPOT_TIMER_THRESHOLD", 5000) 

339 

340@blueprint.route('/register', methods=['GET', 'POST']) 

341@ssl_required 

342@write_required() 

343def register(template=templates.REGISTER): 

344 # ~~-> Honeypot:Feature ~~ 

345 # 3rd-party registration only for those with create_user role, only allow public registration when configured 

346 if current_user.is_authenticated and not current_user.has_role("create_user") \ 

347 or current_user.is_anonymous and app.config.get('PUBLIC_REGISTER', False) is False: 

348 abort(401) # todo: we may need a template to explain this since it's linked from the application form 

349 

350 form = RegisterForm(request.form, csrf_enabled=False, roles='api,publisher', identifier=Account.new_short_uuid()) 

351 

352 if request.method == 'POST': 

353 

354 if not current_user.is_authenticated and form.is_bot(): 

355 flash(Messages.ARE_YOU_A_HUMAN, "error") 

356 return render_template(template, form=form) 

357 

358 if form.validate(): 

359 account = Account.make_account(email=form.sender_email.data, username=form.identifier.data, name=form.name.data, 

360 roles=[r.strip() for r in form.roles.data.split(',')]) 

361 account.save() 

362 

363 event_svc = DOAJ.eventsService() 

364 event_svc.trigger(Event(constants.EVENT_ACCOUNT_CREATED, account.id, context={"account" : account.data})) 

365 # send_account_created_email(account) 

366 

367 if app.config.get('DEBUG', False): 

368 util.flash_with_url('Debug mode - url for verify is <a href={0}>{0}</a>'.format(url_for('account.reset', reset_token=account.reset_token))) 

369 

370 if current_user.is_authenticated: 

371 util.flash_with_url('Account created for {0}. View Account: <a href={1}>{1}</a>'.format(account.email, url_for('.username', username=account.id))) 

372 return redirect(url_for('.index')) 

373 else: 

374 flash('Thank you, please verify email address ' + form.sender_email.data + ' to set your password and login.', 

375 'success') 

376 

377 # We must redirect home because the user now needs to verify their email address. 

378 return redirect(url_for('doaj.home')) 

379 else: 

380 flash('Please correct the errors', 'error') 

381 

382 return render_template(template, form=form) 

383 

384@blueprint.route('/create/', methods=['GET', 'POST']) 

385@write_required() 

386def create(): 

387 return register(template=templates.CREATE_USER)