Upload files to "user_managment"

Add user_management module to initial commit
This commit is contained in:
chris 2025-03-10 23:13:49 +00:00
parent 85162101c2
commit 9c4b497ed9
2 changed files with 85 additions and 0 deletions

BIN
user_managment/__init__.py Normal file

Binary file not shown.

85
user_managment/models.py Normal file
View File

@ -0,0 +1,85 @@
import jwt
from uuid import uuid4
from flask_mongoengine import Document, DoesNotExist
from mongoengine import StringField, BooleanField, DateField, DateTimeField, EmailField
from flask_login import UserMixin
from time import time
from werkzeug.security import generate_password_hash, check_password_hash
_JWT_ALGO = 'HSA256'
class JWTTokenExpiredException(Exception):
pass
class UserAlreadyVerified(Exception):
pass
class CurrentEmailNotVerified(Exception):
pass
class User(UserMixin, Document):
user_name = StringField(required=True, unique=True)
pass_hash = StringField(required=True)
email = EmailField(required=True, unique=True)
first_name = StringField(required=True)
last_name = StringField(required=True)
join_date = DateField(required=False)
last_login = DateTimeField(required=False)
email_verified = BooleanField(default=False)
def set_password(self, password):
self.pass_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.pass_hash, password)
def update_email(self, new_email):
if not self.email_verified:
raise CurrentEmailNotVerified()
self.email_verified = False
self.email = new_email
def get_activation_token(self, expire_secs=1200, secret_key=None):
payload = {
'exp': time() + expire_secs,
'activate': self.user_name
}
return jwt.encode(payload, secret_key, algorithm=_JWT_ALGO)
@staticmethod
def from_activation_token(token, secret_key=None):
try:
jwt_data = jwt.decode(token, secret_key, algorithms=_JWT_ALGO)
user_name = jwt_data['activate']
except jwt.ExpiredSignatureError:
raise JWTTokenExpiredException('The JWT token has expired!')
user = User.objects(user_name=user_name).first()
if not user:
raise DoesNotExist('The given user does not exist')
elif user.email_verified:
raise UserAlreadyVerified('The given user has already verified their email')
user.email_verified = True
def get_password_reset_token(self, expire_secs=600, secret_key=None):
payload = {
'exp': time() + expire_secs,
'reset_password': self.user_name,
'jti': str(uuid4())
}
return jwt.encode(payload, secret_key, algorithm=_JWT_ALGO)
@staticmethod
def from_password_reset_token(token, secret_key=None):
try:
jwt_data = jwt.decode(token, secret_key, algorithms=_JWT_ALGO)
user_name = jwt_data['reset_password']
except jwt.ExpiredSignatureError:
raise JWTTokenExpiredException('The JWT token has expired')
return User.objects(user_name=user_name).first()