728x90
flask로 회원가입, 로그인 암호화 및 JWT를 통한 사이트의 안전성 확보를 하겠습니다.
https://github.com/kschoi93/flask-example
패키지 설치
pip install flask-jwt-extended
pip install flask-bcrypt
pip install redis
# Ubuntu에서 redis server 설치
sudo apt-get update
sudo apt-get install redis
toy.models.example_models.py
테스트 용도의 user_id와 password를 가지고 있는 User 모델 추가합니다.
class User(db.Model):
__tablename__ = 'user'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
user_id = db.Column(db.String(20), unique=True, nullable=False)
password = db.Column(db.String(72), nullable=False)
Database에 적용 시킵니다
flask db migrate
flask db upgrade
환경 설정을 합니다
config.py
jwt 관련 설정 추가합니다.
from datetime import timedelta
from dotenv import load_dotenv
import os
# .env 파일 auto load
load_dotenv()
class Config(object):
TESTING = False
DEBUG = False
SECRET_KEY = os.getenv('SECRET_KEY')
SQLALCHEMY_TRACK_MODIFICATIONS = False
# JWT 비밀 키
# 다음 코드를 통해 비밀 키 생성 - python -c 'import os; print(os.urandom(16))'
JWT_SECRET_KEY = b"N'\5308]\s7\xceJSB\x9e"
# 알고리즘 종류
JWT_DECODE_ALGORITHMS = ['HS256']
# JWT Token을 점검 할 때 확인할 위치
JWT_TOKEN_LOCATION = ['cookies']
# JWT Access token의 만료기간
JWT_ACCESS_TOKEN_EXPIRES = timedelta(minutes=10)
# JWT refresh token의 만료 기간
JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=14)
# production이 아닐 경우 secure를 해제
JWT_COOKIE_SECURE = False
# csrf 보호 활성화
JWT_COOKIE_CSRF_PROTECT = True
# CSRF에 대해 검사하는 메소드 지정
JWT_CSRF_METHODS = ['POST', 'PUT', 'PATCH', 'DELETE']
# form에 대한 csrf 체크
JWT_CSRF_CHECK_FORM = True
# 이중 제출 토큰이 쿠키에 추가 저장되는지 여부를 제어
JWT_CSRF_IN_COOKIES = True
class DevelopmentConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = f"mysql://{os.getenv('DB_USER')}:" \
f"{os.getenv('DB_PWD')}@" \
f"{os.getenv('DB_HOST')}:" \
f"{os.getenv('DB_PORT')}/" \
f"{os.getenv('DB_NAME')}?charset=utf8"
class ProductionConfig(Config):
# https 적용
JWT_COOKIE_SECURE = True
SQLALCHEMY_DATABASE_URI = f"mysql://{os.getenv('DB_PRODUCT_USER')}:" \
f"{os.getenv('DB_PRODUCT_PWD')}@" \
f"{os.getenv('DB_PRODUCT_HOST')}:" \
f"{os.getenv('DB_PRODUCT_PORT')}/" \
f"{os.getenv('DB_PRODUCT_NAME')}?charset=utf8"
class TestingConfig(Config):
TESTING = True
.env
맨 밑에 redis_host 설정합니다.
# normal
SECRET_KEY='123456789'
# database develop
DB_USER='admin'
DB_PWD='password'
DB_HOST='127.0.0.1'
DB_PORT='3306'
DB_NAME='toy_project'
# database product
DB_PRODUCT_USER='admin'
DB_PRODUCT_PWD='password'
DB_PRODUCT_HOST='127.0.0.1'
DB_PRODUCT_PORT='3306'
DB_PRODUCT_NAME='toy_project'
# Redis Host
REDIS_HOST = '0.0.0.0'
toy.init.py
bcrypt와 jwt 선언 및 redis 연결합니다.
from flask import Flask
from flask_restx import Api
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
from flask_bcrypt import Bcrypt
from flask_jwt_extended import JWTManager
import redis
from dotenv import load_dotenv
import os
# .env 파일 auto load
load_dotenv()
api = Api(
version='1.0',
title='toy_project',
prefix='/api',
contact='name',
contact_email='email',
description="example",
validate=True
)
db = SQLAlchemy()
migrate = Migrate()
bcrypt = Bcrypt()
jwt = JWTManager()
jwt_redis = redis.StrictRedis(host=os.getenv('REDIS_HOST'), port=6379, db=0, decode_responses=True)
def create_app():
app = Flask(__name__)
# rest api
api.init_app(app)
# bcrypt
bcrypt.init_app(app)
# jwt
jwt.init_app(app)
# config
config = app.config.get('ENV')
if config == 'production':
app.config.from_object('config.ProductionConfig')
elif config == 'testing':
app.config.from_object('config.TestingConfig')
else:
app.config.from_object('config.DevelopmentConfig')
# database
from toy.models import example_models
db.init_app(app)
migrate.init_app(app, db)
# routes list
from .routes import routes_list
routes_list(api)
# general error handler
from .common.errors import error_handle
error_handle(app)
return app
toy.controllers.example_controllers.py
route를 받는 controller 작성합니다.
from flask import json
from flask import request
from flask_restx import Namespace
from flask_restx import Resource
from flask_restx import fields
from flask_jwt_extended import jwt_required
import toy.services.example_services as example_services
...
example_sign_up_model = example.model('sign-up', {
'user_id': fields.String(required=True),
'password': fields.String(required=True),
})
@example.route('/sign-up')
class ExampleSingUp(Resource):
@example.expect(example_sign_up_model)
def post(self):
result = example_services.example_sign_up(request.data)
return result, 200
example_login_model = example.model('login', {
'user_id': fields.String(required=True),
'password': fields.String(required=True),
})
@example.route('/login')
class ExampleLogin(Resource):
@example.expect(example_login_model)
def post(self) -> json:
result = example_services.example_login(request.data)
return result
@example.route('/logout')
class ExampleLogout(Resource):
@jwt_required()
def delete(self):
return example_services.example_logout()
@example.route('/refresh')
class ExampleRefresh(Resource):
@jwt_required(refresh=True)
def get(self):
return example_services.example_refresh()
toy.services.example_services.py
controller를 통해 각 서비스에 연결합니다.
from flask import json
from flask import jsonify
from flask import request
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from toy import bcrypt
from toy import jwt_redis
from flask_jwt_extended import get_jwt
from flask_jwt_extended import get_jwt_identity
from flask_jwt_extended import get_jwt_header
from flask_jwt_extended import create_access_token
from flask_jwt_extended import create_refresh_token
from flask_jwt_extended import set_access_cookies
from flask_jwt_extended import set_refresh_cookies
from flask_jwt_extended import unset_jwt_cookies
from toy.daos.example_dao import ExampleDAO
from toy.daos.example_dao import ExampleSignDAO
from toy.daos.example_dao import ExampleRefreshDAO
...
# 회원가입
def example_sign_up(data: bytes) -> json:
user = json.loads(data)
user_id = user['user_id']
# password를 bcrypt로 암호화 하고 저장한다.
pw_hash = bcrypt.generate_password_hash(user['password'])
dao = ExampleSignDAO()
dao.create(user_id=user_id, password=pw_hash)
return {"result": "success"}
# 로그인
def example_login(data: bytes) -> json:
user = json.loads(data)
user_id = user['user_id']
password = user['password']
# 로그인
dao = ExampleSignDAO()
dao.get(user_id=user_id, password=password)
response = jsonify({"msg": "login successful"})
# token 생성
access_token = create_access_token(identity=user_id)
refresh_token = create_refresh_token(identity=user_id)
# cookie 설정
set_access_cookies(response=response, encoded_access_token=access_token)
set_refresh_cookies(response=response, encoded_refresh_token=refresh_token)
# redis에 설정
jwt_redis.set(refresh_token, user_id, ex=timedelta(days=14))
return response
# 로그아웃
def example_logout() -> json:
response = jsonify({"msg": "logout successful"})
# redis에 저장되어 있는 refresh token 삭제
jwt_redis.delete(request.cookies.get('refresh_token_cookie'))
# jwt로 생성된 cookie 전체 삭제
unset_jwt_cookies(response)
return response
# jwt tokent refresh
def example_refresh():
token = request.cookies.get('refresh_token_cookie')
# refresh token이 redis에 존재 여부 확인
is_tk = jwt_redis.get(token)
# refresh token에 있는 user_id가 유저가 맞는지 확인
is_user_id = ExampleRefreshDAO()
if is_tk is None or is_user_id.get(get_jwt_identity()) is None:
return {'msg': 'refresh failed'}, 400
# access token 재발급
user_id = get_jwt_identity()
response = jsonify({"msg": "refresh successful"})
access_token = create_access_token(identity=user_id)
set_access_cookies(response=response, encoded_access_token=access_token)
# refresh token의 expire 시간이 10시간 이하일 경우 refresh token 재발급
exp_timestamp = get_jwt()['exp']
now = datetime.now(timezone.utc)
target_timestamp = datetime.timestamp(now + timedelta(hours=10))
if target_timestamp > exp_timestamp:
# 기존 redis에 존재하는 token 삭제
jwt_redis.delete(token)
refresh_token = create_refresh_token(identity=user_id)
set_refresh_cookies(response=response, encoded_refresh_token=refresh_token)
# redis에 토큰 저장
jwt_redis.set(refresh_token, user_id, ex=timedelta(days=14))
return response
toy.daos.example_dao.py
from toy import db
from toy import bcrypt
from toy.common.exceptions import CustomException
from toy.models.example_models import ExampleUser
from toy.models.example_models import User
....
class ExampleSignDAO(object):
def __init__(self):
pass
def get(self, user_id: str, password: str) -> str:
user = User.query.filter_by(user_id=user_id).one_or_none()
if not user or not bcrypt.check_password_hash(pw_hash=user.password, password=password):
raise CustomException()
def create(self, user_id: str, password: str):
if User.query.filter_by(user_id=user_id).one_or_none():
raise CustomException()
user = User(user_id=user_id, password=password)
db.session.add(user)
db.session.commit()
class ExampleRefreshDAO(object):
def __init__(self):
pass
def get(self, user_id: str) -> str:
return User.query.filter_by(user_id=user_id).one_or_none()
유저 생성 테스트
포스트맨으로 생성 요청
생성 결과 확인
동일한 조건으로 생성 테스트
포스트맨으로 생성 요청
생성된 동일한 아이디가 존재하기 때문에 에러 발생함을 확인
첫 번째는 성공 200, 두 번째는 실패 601
로그인 테스트
access token과 refresh token을 cookie로 받은 것을 확인 할 수 있습니다.
Redis 확인
Redis 에 데이터 추가 확인
로그인 함으로써 refresh 토큰이 redis에 저장 된 것을 확인합니다.
redis가 설치된 위치에서 redis-cli를 사용해 확인하면 다음과 같이 잘 들어가 있는 것을 확인 할 수 있습니다.
access 토큰 재발급 테스트
테스트를 위해 config에서 설정한 JWT_ACCESS_TOKEN_EXPIRES의 시간을 10초로 설정합니다
- 로그인
- 10초 후 토큰 expire 체크
위와 같이 토큰이 expired 되었을 경우 refresh token을 통한 access token 재발급 진행
refresh 토큰은 redis에 저장되어 있으며 refresh 토큰을 통해 access token 재발급 진행 시 redis의 토큰과 현재 요청한 사람의 cookie refresh token을 비교하여 실제로 존재하는지 여부를 파악하고 재발급 합니다
추가적으로 refresh token이 expire 10시간이 남은 상태일 때 refresh를 했을 경우 refresh token도 재 발급 해주는 로직을 추가합니다.
# refresh token의 expire 시간이 10시간 이하일 경우 refresh token 재발급
exp_timestamp = get_jwt()['exp']
now = datetime.now(timezone.utc)
target_timestamp = datetime.timestamp(now + timedelta(hours=10))
if target_timestamp > exp_timestamp:
# 기존 redis에 존재하는 token 삭제
jwt_redis.delete(token)
refresh_token = create_refresh_token(identity=user_id)
set_refresh_cookies(response=response, encoded_refresh_token=refresh_token)
# redis에 토큰 저장
jwt_redis.set(refresh_token, user_id, ex=timedelta(days=14))
728x90
'Programming > Backend' 카테고리의 다른 글
6.Flask error, exception general handling (0) | 2022.05.29 |
---|---|
5.Flask sqlalchemy - get, create (0) | 2022.05.29 |
4.Flask restx 적용 + Swagger (0) | 2022.05.29 |
3.Flask Database 연결 + Config 설정 (0) | 2022.04.30 |
2.Flask MVC 패턴 환경 구축과 Blueprint (0) | 2022.04.22 |