Software Backend/Python & Django & flask

7.Flask 회원가입, 로그인 암호화 및 JWT + Redis

light_meal 2022. 6. 8. 00:07
728x90

 

flask로 회원가입, 로그인 암호화 및 JWT를 통한 사이트의 안전성 확보를 하겠습니다.

https://github.com/kschoi93/flask-example

 

패키지 설치

pip install flask-jwt-extended
pip install flask-bcrypt
pip install redis

# Ubuntu에서 redis server 설치
sudo apt-get update
sudo apt-get install redis

 

toy.models.example_models.py

테스트 용도의 user_id와 password를 가지고 있는 User 모델 추가합니다.

class User(db.Model):
    __tablename__ = 'user'

    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    user_id = db.Column(db.String(20), unique=True, nullable=False)
    password = db.Column(db.String(72), nullable=False)

 

Database에 적용 시킵니다

flask db migrate
flask db upgrade

 

환경 설정을 합니다

config.py

jwt 관련 설정 추가합니다.

from datetime import timedelta
from dotenv import load_dotenv
import os

# .env 파일 auto load
load_dotenv()

class Config(object):
    TESTING = False
    DEBUG = False
    SECRET_KEY = os.getenv('SECRET_KEY')
    SQLALCHEMY_TRACK_MODIFICATIONS = False

    # JWT 비밀 키
    # 다음 코드를 통해 비밀 키 생성 - python -c 'import os; print(os.urandom(16))'
    JWT_SECRET_KEY = b"N'\5308]\s7\xceJSB\x9e"
    # 알고리즘 종류
    JWT_DECODE_ALGORITHMS = ['HS256']
    # JWT Token을 점검 할 때 확인할 위치
    JWT_TOKEN_LOCATION = ['cookies']
    # JWT Access token의 만료기간
    JWT_ACCESS_TOKEN_EXPIRES = timedelta(minutes=10)
    # JWT refresh token의 만료 기간
    JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=14)
    # production이 아닐 경우 secure를 해제
    JWT_COOKIE_SECURE = False
    # csrf 보호 활성화
    JWT_COOKIE_CSRF_PROTECT = True
    # CSRF에 대해 검사하는 메소드 지정
    JWT_CSRF_METHODS = ['POST', 'PUT', 'PATCH', 'DELETE']
    # form에 대한 csrf 체크
    JWT_CSRF_CHECK_FORM = True
    # 이중 제출 토큰이 쿠키에 추가 저장되는지 여부를 제어
    JWT_CSRF_IN_COOKIES = True

class DevelopmentConfig(Config):
    DEBUG = True
    SQLALCHEMY_DATABASE_URI = f"mysql://{os.getenv('DB_USER')}:" \
                              f"{os.getenv('DB_PWD')}@" \
                              f"{os.getenv('DB_HOST')}:" \
                              f"{os.getenv('DB_PORT')}/" \
                              f"{os.getenv('DB_NAME')}?charset=utf8"

class ProductionConfig(Config):
    # https 적용
    JWT_COOKIE_SECURE = True
    SQLALCHEMY_DATABASE_URI = f"mysql://{os.getenv('DB_PRODUCT_USER')}:" \
                              f"{os.getenv('DB_PRODUCT_PWD')}@" \
                              f"{os.getenv('DB_PRODUCT_HOST')}:" \
                              f"{os.getenv('DB_PRODUCT_PORT')}/" \
                              f"{os.getenv('DB_PRODUCT_NAME')}?charset=utf8"

class TestingConfig(Config):
    TESTING = True

 

.env

맨 밑에 redis_host 설정합니다.

# normal
SECRET_KEY='123456789'

# database develop
DB_USER='admin'
DB_PWD='password'
DB_HOST='127.0.0.1'
DB_PORT='3306'
DB_NAME='toy_project'

# database product
DB_PRODUCT_USER='admin'
DB_PRODUCT_PWD='password'
DB_PRODUCT_HOST='127.0.0.1'
DB_PRODUCT_PORT='3306'
DB_PRODUCT_NAME='toy_project'

# Redis Host
REDIS_HOST = '0.0.0.0'

 

toy.init.py

bcrypt와 jwt 선언 및 redis 연결합니다.

from flask import Flask
from flask_restx import Api
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
from flask_bcrypt import Bcrypt
from flask_jwt_extended import JWTManager

import redis
from dotenv import load_dotenv
import os

# .env 파일 auto load
load_dotenv()

api = Api(
    version='1.0',
    title='toy_project',
    prefix='/api',
    contact='name',
    contact_email='email',
    description="example",
    validate=True
)
db = SQLAlchemy()
migrate = Migrate()
bcrypt = Bcrypt()
jwt = JWTManager()

jwt_redis = redis.StrictRedis(host=os.getenv('REDIS_HOST'), port=6379, db=0, decode_responses=True)

def create_app():
    app = Flask(__name__)

    # rest api
    api.init_app(app)

    # bcrypt
    bcrypt.init_app(app)

    # jwt
    jwt.init_app(app)

    # config
    config = app.config.get('ENV')
    if config == 'production':
        app.config.from_object('config.ProductionConfig')
    elif config == 'testing':
        app.config.from_object('config.TestingConfig')
    else:
        app.config.from_object('config.DevelopmentConfig')

    # database
    from toy.models import example_models
    db.init_app(app)
    migrate.init_app(app, db)

    # routes list
    from .routes import routes_list
    routes_list(api)

    # general error handler
    from .common.errors import error_handle
    error_handle(app)

    return app

 

toy.controllers.example_controllers.py

route를 받는 controller 작성합니다.

from flask import json
from flask import request
from flask_restx import Namespace
from flask_restx import Resource
from flask_restx import fields

from flask_jwt_extended import jwt_required

import toy.services.example_services as example_services

...

example_sign_up_model = example.model('sign-up', {
    'user_id': fields.String(required=True),
    'password': fields.String(required=True),
})

@example.route('/sign-up')
class ExampleSingUp(Resource):
    @example.expect(example_sign_up_model)
    def post(self):
        result = example_services.example_sign_up(request.data)
        return result, 200

example_login_model = example.model('login', {
    'user_id': fields.String(required=True),
    'password': fields.String(required=True),
})

@example.route('/login')
class ExampleLogin(Resource):
    @example.expect(example_login_model)
    def post(self) -> json:
        result = example_services.example_login(request.data)
        return result

@example.route('/logout')
class ExampleLogout(Resource):
    @jwt_required()
    def delete(self):
        return example_services.example_logout()

@example.route('/refresh')
class ExampleRefresh(Resource):
    @jwt_required(refresh=True)
    def get(self):
        return example_services.example_refresh()

 

toy.services.example_services.py

controller를 통해 각 서비스에 연결합니다.

from flask import json
from flask import jsonify
from flask import request

from datetime import datetime
from datetime import timedelta
from datetime import timezone

from toy import bcrypt
from toy import jwt_redis

from flask_jwt_extended import get_jwt
from flask_jwt_extended import get_jwt_identity
from flask_jwt_extended import get_jwt_header
from flask_jwt_extended import create_access_token
from flask_jwt_extended import create_refresh_token
from flask_jwt_extended import set_access_cookies
from flask_jwt_extended import set_refresh_cookies
from flask_jwt_extended import unset_jwt_cookies

from toy.daos.example_dao import ExampleDAO
from toy.daos.example_dao import ExampleSignDAO
from toy.daos.example_dao import ExampleRefreshDAO

...

# 회원가입
def example_sign_up(data: bytes) -> json:
    user = json.loads(data)
    user_id = user['user_id']
    # password를 bcrypt로 암호화 하고 저장한다.
    pw_hash = bcrypt.generate_password_hash(user['password'])
    dao = ExampleSignDAO()
    dao.create(user_id=user_id, password=pw_hash)

    return {"result": "success"}

# 로그인
def example_login(data: bytes) -> json:
    user = json.loads(data)
    user_id = user['user_id']
    password = user['password']

    # 로그인
    dao = ExampleSignDAO()
    dao.get(user_id=user_id, password=password)

    response = jsonify({"msg": "login successful"})

    # token 생성
    access_token = create_access_token(identity=user_id)
    refresh_token = create_refresh_token(identity=user_id)

    # cookie 설정
    set_access_cookies(response=response, encoded_access_token=access_token)
    set_refresh_cookies(response=response, encoded_refresh_token=refresh_token)

    # redis에 설정
    jwt_redis.set(refresh_token, user_id, ex=timedelta(days=14))
    return response

# 로그아웃
def example_logout() -> json:
    response = jsonify({"msg": "logout successful"})

    # redis에 저장되어 있는 refresh token 삭제
    jwt_redis.delete(request.cookies.get('refresh_token_cookie'))

    # jwt로 생성된 cookie 전체 삭제
    unset_jwt_cookies(response)
    return response

# jwt tokent refresh
def example_refresh():
    token = request.cookies.get('refresh_token_cookie')
    # refresh token이 redis에 존재 여부 확인
    is_tk = jwt_redis.get(token)
    # refresh token에 있는 user_id가 유저가 맞는지 확인
    is_user_id = ExampleRefreshDAO()
    if is_tk is None or is_user_id.get(get_jwt_identity()) is None:
        return {'msg': 'refresh failed'}, 400

    # access token 재발급
    user_id = get_jwt_identity()
    response = jsonify({"msg": "refresh successful"})
    access_token = create_access_token(identity=user_id)
    set_access_cookies(response=response, encoded_access_token=access_token)

    # refresh token의 expire 시간이 10시간 이하일 경우 refresh token 재발급
    exp_timestamp = get_jwt()['exp']
    now = datetime.now(timezone.utc)
    target_timestamp = datetime.timestamp(now + timedelta(hours=10))
    if target_timestamp > exp_timestamp:
        # 기존 redis에 존재하는 token 삭제
        jwt_redis.delete(token)
        refresh_token = create_refresh_token(identity=user_id)
        set_refresh_cookies(response=response, encoded_refresh_token=refresh_token)
        # redis에 토큰 저장
        jwt_redis.set(refresh_token, user_id, ex=timedelta(days=14))

    return response

 

toy.daos.example_dao.py

from toy import db
from toy import bcrypt

from toy.common.exceptions import CustomException

from toy.models.example_models import ExampleUser
from toy.models.example_models import User

....

class ExampleSignDAO(object):
    def __init__(self):
        pass

    def get(self, user_id: str, password: str) -> str:
        user = User.query.filter_by(user_id=user_id).one_or_none()
        if not user or not bcrypt.check_password_hash(pw_hash=user.password, password=password):
            raise CustomException()

    def create(self, user_id: str, password: str):
        if User.query.filter_by(user_id=user_id).one_or_none():
            raise CustomException()
        user = User(user_id=user_id, password=password)
        db.session.add(user)
        db.session.commit()

class ExampleRefreshDAO(object):
    def __init__(self):
        pass

    def get(self, user_id: str) -> str:
        return User.query.filter_by(user_id=user_id).one_or_none()

 


 

유저 생성 테스트

포스트맨으로 생성 요청

 

생성 결과 확인

 

동일한 조건으로 생성 테스트

포스트맨으로 생성 요청

 

생성된 동일한 아이디가 존재하기 때문에 에러 발생함을 확인

첫 번째는 성공 200, 두 번째는 실패 601

 

로그인 테스트

access token과 refresh token을 cookie로 받은 것을 확인 할 수 있습니다.

 

Redis 확인

Redis 에 데이터 추가 확인

로그인 함으로써 refresh 토큰이 redis에 저장 된 것을 확인합니다.

redis가 설치된 위치에서 redis-cli를 사용해 확인하면 다음과 같이 잘 들어가 있는 것을 확인 할 수 있습니다.

 

access 토큰 재발급 테스트

테스트를 위해 config에서 설정한 JWT_ACCESS_TOKEN_EXPIRES의 시간을 10초로 설정합니다

 

  • 로그인

 

  • 10초 후 토큰 expire 체크

 

위와 같이 토큰이 expired 되었을 경우 refresh token을 통한 access token 재발급 진행

refresh 토큰은 redis에 저장되어 있으며 refresh 토큰을 통해 access token 재발급 진행 시 redis의 토큰과 현재 요청한 사람의 cookie refresh token을 비교하여 실제로 존재하는지 여부를 파악하고 재발급 합니다

추가적으로 refresh token이 expire 10시간이 남은 상태일 때 refresh를 했을 경우 refresh token도 재 발급 해주는 로직을 추가합니다.

# refresh token의 expire 시간이 10시간 이하일 경우 refresh token 재발급
    exp_timestamp = get_jwt()['exp']
    now = datetime.now(timezone.utc)
    target_timestamp = datetime.timestamp(now + timedelta(hours=10))
    if target_timestamp > exp_timestamp:
        # 기존 redis에 존재하는 token 삭제
        jwt_redis.delete(token)
        refresh_token = create_refresh_token(identity=user_id)
        set_refresh_cookies(response=response, encoded_refresh_token=refresh_token)
        # redis에 토큰 저장
        jwt_redis.set(refresh_token, user_id, ex=timedelta(days=14))

 

 

728x90