-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
213 lines (170 loc) · 6.81 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import os
import base64
from io import BytesIO
from flask import Flask, render_template, redirect, url_for, flash, session,abort, request
from werkzeug.security import generate_password_hash, check_password_hash
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, logout_user,current_user
from flask_bootstrap import Bootstrap
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, SubmitField
from wtforms.validators import Required, Length, EqualTo
from flask_mail import Mail, Message
from itsdangerous import URLSafeTimedSerializer, SignatureExpired
import onetimepass
import pyqrcode
# create application instance
app = Flask(__name__)
app.config.from_object('config')
app.config.from_pyfile('config.cfg')
# initialize extensions
bootstrap = Bootstrap(app)
db = SQLAlchemy(app)
lm = LoginManager(app)
mail = Mail(app)
s = URLSafeTimedSerializer('Thisisasecret!')
class User(UserMixin, db.Model):
"""User model."""
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), index=True)
email = db.Column(db.String(120), index=True, unique=True)
password_hash = db.Column(db.String(128))
otp_secret = db.Column(db.String(16))
emailv = db.Column(db.Boolean, default=False, nullable=False)
def __init__(self, **kwargs):
super(User, self).__init__(**kwargs)
if self.otp_secret is None:
# generate a random secret
self.otp_secret = base64.b32encode(os.urandom(10)).decode('utf-8')
@property
def password(self):
raise AttributeError('password is not a readable attribute')
@password.setter
def password(self, password):
self.password_hash = generate_password_hash(password)
def verify_password(self, password):
return check_password_hash(self.password_hash, password)
def get_totp_uri(self):
return 'otpauth://totp/2FA-Demo:{0}?secret={1}&issuer=2FA-Demo' \
.format(self.username, self.otp_secret)
def verify_totp(self, token):
return onetimepass.valid_totp(token, self.otp_secret)
@lm.user_loader
def load_user(user_id):
"""User loader callback for Flask-Login."""
return User.query.get(int(user_id))
class RegisterForm(FlaskForm):
"""Registration form."""
username = StringField('Username', validators=[Required(), Length(1, 64)])
email = StringField('Email', validators=[Required(), Length(1, 120)])
password = PasswordField('Password', validators=[Required()])
password_again = PasswordField('Password again',validators=[Required(), EqualTo('password')])
submit = SubmitField('Register')
class LoginForm(FlaskForm):
"""Login form."""
username = StringField('Username', validators=[Required(), Length(1, 64)])
password = PasswordField('Password', validators=[Required()])
token = StringField('Token', validators=[Required(), Length(6, 6)])
submit = SubmitField('Login')
@app.route('/')
def index():
return render_template('index.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
"""User registration route."""
if current_user.is_authenticated:
# if user is logged in we get out of here
return redirect(url_for('index'))
form = RegisterForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data).first()
if user is not None:
flash('Username already exists.')
return redirect(url_for('register'))
# add new user to the database
user = User(username=form.username.data, password=form.password.data, email=form.email.data)
db.session.add(user)
db.session.commit()
# redirect to the two-factor auth page, passing username in session
session['username'] = user.username
return redirect(url_for('two_factor_setup'))
return render_template('register.html', form=form)
@app.route('/twofactor')
def two_factor_setup():
if 'username' not in session:
return redirect(url_for('index'))
user = User.query.filter_by(username=session['username']).first()
if user is None:
return redirect(url_for('index'))
# since this page contains the sensitive qrcode, make sure the browser
# does not cache it
return render_template('two-factor-setup.html'), 200, {
'Cache-Control': 'no-cache, no-store, must-revalidate',
'Pragma': 'no-cache',
'Expires': '0'}
@app.route('/qrcode')
def qrcode():
if 'username' not in session:
abort(404)
user = User.query.filter_by(username=session['username']).first()
if user is None:
abort(404)
# for added security, remove username from session
del session['username']
# render qrcode for FreeTOTP
url = pyqrcode.create(user.get_totp_uri())
stream = BytesIO()
url.svg(stream, scale=3)
return stream.getvalue(), 200, {
'Content-Type': 'image/svg+xml',
'Cache-Control': 'no-cache, no-store, must-revalidate',
'Pragma': 'no-cache',
'Expires': '0'}
@app.route('/login', methods=['GET', 'POST'])
def login():
"""User login route."""
if current_user.is_authenticated:
# if user is logged in we get out of here
return redirect(url_for('index'))
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data).first()
if user is None or not user.verify_password(form.password.data) or \
not user.verify_totp(form.token.data):
flash('Invalid username, password or token.')
return redirect(url_for('login'))
# log user in
login_user(user)
flash('You are now logged in!')
return redirect(url_for('index'))
return render_template('login.html', form=form)
@app.route('/emailv/<email>', methods=['GET', 'POST'])
def emailv(email):
token = s.dumps(email, salt='email-confirm')
msg = Message('Confirm Email', sender='test@test.com', recipients=[email])
link = url_for('confirm_email', token=token, _external=True)
msg.body = 'Your link is {}'.format(link)
mail.send(msg)
flash('Verification link sent to your email!')
return redirect(url_for('index'))
@app.route('/confirm_email/<token>')
def confirm_email(token):
un=current_user.username
user = User.query.filter_by(username=un).first()
try:
email = s.loads(token, salt='email-confirm', max_age=3600)
except SignatureExpired:
return '<h1>The token is expired!</h1>'
user.emailv=True
db.session.commit()
return redirect(url_for('index'))
@app.route('/logout')
def logout():
"""User logout route."""
logout_user()
return redirect(url_for('index'))
# create database tables if they don't exist yet
db.create_all()
if __name__ == '__main__':
app.run(debug=True)