86 lines
2.8 KiB
Python
86 lines
2.8 KiB
Python
import jwt
|
|
from uuid import uuid4
|
|
from flask_mongoengine import Document, DoesNotExist
|
|
from mongoengine import StringField, BooleanField, DateField, DateTimeField, EmailField
|
|
from flask_login import UserMixin
|
|
from time import time
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
|
|
|
|
_JWT_ALGO = 'HSA256'
|
|
|
|
|
|
class JWTTokenExpiredException(Exception):
|
|
pass
|
|
|
|
|
|
class UserAlreadyVerified(Exception):
|
|
pass
|
|
|
|
|
|
class CurrentEmailNotVerified(Exception):
|
|
pass
|
|
|
|
|
|
class User(UserMixin, Document):
|
|
user_name = StringField(required=True, unique=True)
|
|
pass_hash = StringField(required=True)
|
|
email = EmailField(required=True, unique=True)
|
|
first_name = StringField(required=True)
|
|
last_name = StringField(required=True)
|
|
join_date = DateField(required=False)
|
|
last_login = DateTimeField(required=False)
|
|
email_verified = BooleanField(default=False)
|
|
|
|
def set_password(self, password):
|
|
self.pass_hash = generate_password_hash(password)
|
|
|
|
def check_password(self, password):
|
|
return check_password_hash(self.pass_hash, password)
|
|
|
|
def update_email(self, new_email):
|
|
if not self.email_verified:
|
|
raise CurrentEmailNotVerified()
|
|
|
|
self.email_verified = False
|
|
self.email = new_email
|
|
|
|
def get_activation_token(self, expire_secs=1200, secret_key=None):
|
|
payload = {
|
|
'exp': time() + expire_secs,
|
|
'activate': self.user_name
|
|
}
|
|
|
|
return jwt.encode(payload, secret_key, algorithm=_JWT_ALGO)
|
|
|
|
@staticmethod
|
|
def from_activation_token(token, secret_key=None):
|
|
try:
|
|
jwt_data = jwt.decode(token, secret_key, algorithms=_JWT_ALGO)
|
|
user_name = jwt_data['activate']
|
|
except jwt.ExpiredSignatureError:
|
|
raise JWTTokenExpiredException('The JWT token has expired!')
|
|
user = User.objects(user_name=user_name).first()
|
|
if not user:
|
|
raise DoesNotExist('The given user does not exist')
|
|
elif user.email_verified:
|
|
raise UserAlreadyVerified('The given user has already verified their email')
|
|
user.email_verified = True
|
|
|
|
def get_password_reset_token(self, expire_secs=600, secret_key=None):
|
|
payload = {
|
|
'exp': time() + expire_secs,
|
|
'reset_password': self.user_name,
|
|
'jti': str(uuid4())
|
|
}
|
|
return jwt.encode(payload, secret_key, algorithm=_JWT_ALGO)
|
|
|
|
@staticmethod
|
|
def from_password_reset_token(token, secret_key=None):
|
|
try:
|
|
jwt_data = jwt.decode(token, secret_key, algorithms=_JWT_ALGO)
|
|
user_name = jwt_data['reset_password']
|
|
except jwt.ExpiredSignatureError:
|
|
raise JWTTokenExpiredException('The JWT token has expired')
|
|
return User.objects(user_name=user_name).first()
|