From 9c4b497ed91d65c0b2bbe35a5b2c08b86f4c62f6 Mon Sep 17 00:00:00 2001 From: chris Date: Mon, 10 Mar 2025 23:13:49 +0000 Subject: [PATCH] Upload files to "user_managment" Add user_management module to initial commit --- user_managment/__init__.py | Bin 0 -> 1024 bytes user_managment/models.py | 85 +++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+) create mode 100644 user_managment/__init__.py create mode 100644 user_managment/models.py diff --git a/user_managment/__init__.py b/user_managment/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..06d7405020018ddf3cacee90fd4af10487da3d20 GIT binary patch literal 1024 ScmZQz7zLvtFd70QH3R?z00031 literal 0 HcmV?d00001 diff --git a/user_managment/models.py b/user_managment/models.py new file mode 100644 index 0000000..3589ad3 --- /dev/null +++ b/user_managment/models.py @@ -0,0 +1,85 @@ +import jwt +from uuid import uuid4 +from flask_mongoengine import Document, DoesNotExist +from mongoengine import StringField, BooleanField, DateField, DateTimeField, EmailField +from flask_login import UserMixin +from time import time +from werkzeug.security import generate_password_hash, check_password_hash + + +_JWT_ALGO = 'HSA256' + + +class JWTTokenExpiredException(Exception): + pass + + +class UserAlreadyVerified(Exception): + pass + + +class CurrentEmailNotVerified(Exception): + pass + + +class User(UserMixin, Document): + user_name = StringField(required=True, unique=True) + pass_hash = StringField(required=True) + email = EmailField(required=True, unique=True) + first_name = StringField(required=True) + last_name = StringField(required=True) + join_date = DateField(required=False) + last_login = DateTimeField(required=False) + email_verified = BooleanField(default=False) + + def set_password(self, password): + self.pass_hash = generate_password_hash(password) + + def check_password(self, password): + return check_password_hash(self.pass_hash, password) + + def update_email(self, new_email): + if not self.email_verified: + raise CurrentEmailNotVerified() + + self.email_verified = False + self.email = new_email + + def get_activation_token(self, expire_secs=1200, secret_key=None): + payload = { + 'exp': time() + expire_secs, + 'activate': self.user_name + } + + return jwt.encode(payload, secret_key, algorithm=_JWT_ALGO) + + @staticmethod + def from_activation_token(token, secret_key=None): + try: + jwt_data = jwt.decode(token, secret_key, algorithms=_JWT_ALGO) + user_name = jwt_data['activate'] + except jwt.ExpiredSignatureError: + raise JWTTokenExpiredException('The JWT token has expired!') + user = User.objects(user_name=user_name).first() + if not user: + raise DoesNotExist('The given user does not exist') + elif user.email_verified: + raise UserAlreadyVerified('The given user has already verified their email') + user.email_verified = True + + def get_password_reset_token(self, expire_secs=600, secret_key=None): + payload = { + 'exp': time() + expire_secs, + 'reset_password': self.user_name, + 'jti': str(uuid4()) + } + return jwt.encode(payload, secret_key, algorithm=_JWT_ALGO) + + @staticmethod + def from_password_reset_token(token, secret_key=None): + try: + jwt_data = jwt.decode(token, secret_key, algorithms=_JWT_ALGO) + user_name = jwt_data['reset_password'] + except jwt.ExpiredSignatureError: + raise JWTTokenExpiredException('The JWT token has expired') + return User.objects(user_name=user_name).first()