認証(bcrypt)¶
データベースに存在するユーザーだけが CRUD 処理できるようにする。
※コードはデータベースからの続き
(準備)データベースにパスワード列の追加¶
パスワードハッシュ化アルゴリズム
ここではパスワードのハッシュ化に bcrypt を使うことにする。
アルゴリズムを選択する時は OWASP の Password Storage Cheat Sheet が参考になる。
前のページまで使ってた app.db は削除しておいて、 models.py を編集する。
# models.py
from sqlmodel import SQLModel, Field
class User(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str
email: str
hashed_password: str
パスワードのハッシュ化と検証をするコードを作成。まずは bcrypt をインストールしてから。
# security.py
import bcrypt
def hash_password(password: str) -> str:
pwd_bytes = password.encode("utf-8")
salt = bcrypt.gensalt()
return bcrypt.hashpw(pwd_bytes, salt).decode("utf-8")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return bcrypt.checkpw(
plain_password.encode("utf-8"),
hashed_password.encode("utf-8"),
)
データベースを新しく作るための db_init.py の編集。
# db_init.py
from sqlmodel import Session
from db import engine, create_db_and_tables
from models import User
from security import hash_password
create_db_and_tables()
with Session(engine) as session:
user = User(
name="alice",
email="alice@example.com",
hashed_password=hash_password("secret_password"),
)
session.add(user)
session.commit()
session.refresh(user)
db_init.py を作成したら実行しておく。
Info
sqlite3 コマンドでデータベースを確認すると、 hashed_password が追加されてる。
JWTを発行するエンドポイントの作成¶
JWTの署名アルゴリズム
ここでは JWT の署名アルゴリズムには HS256 を使うことにする。
PyJWT を使う時は、 https://pyjwt.readthedocs.io/en/stable/algorithms.html で使えるアルゴリズムを確認できる。
アルゴリズムの強さを確認する時は、 https://nvlpubs.nist.gov/nistpubs/SpecialPublications/NIST.SP.800-57pt1r5.pdf が参考になる。
RS256 / EdDSA との比較は JWT(RS256/EdDSA) を参照。
security.py にJWTを生成するコードを追加。まずは pyjwt をインストールしてから。
# security.py
from datetime import datetime, timedelta, timezone
import bcrypt
import jwt
# 本番では openssl rand -hex 32 とかで生成して環境変数から読み込む
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def hash_password(password: str) -> str:
pwd_bytes = password.encode("utf-8")
salt = bcrypt.gensalt()
return bcrypt.hashpw(pwd_bytes, salt).decode("utf-8")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return bcrypt.checkpw(
plain_password.encode("utf-8"),
hashed_password.encode("utf-8"),
)
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
JWT を生成するエンドポイントを main.py に追加する。
# main.py
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlmodel import select
from db import SessionDep
from models import User
from security import (
verify_password,
create_access_token,
)
from users import router as users_router
app = FastAPI()
app.include_router(users_router)
@app.get("/")
def read_root():
return {"Hello": "World"}
@app.post("/token")
def login(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
session: SessionDep,
):
user = session.exec(select(User).where(User.email == form_data.username)).first()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=401,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(data={"sub": user.email})
return {"access_token": access_token, "token_type": "bearer"}
curl コマンドでJWTが発行されるか確認できるし、 Swagger UI でも確認できる。
% curl -X 'POST' \
'http://localhost:8000/token' \
-H 'accept: application/json' \
-H 'Content-Type: application/x-www-form-urlencoded' \
-d 'grant_type=password&username=alice%40example.com&password=secret_password'
{"access_token":"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhbGljZUBleGFtcGxlLmNvbSIsImV4cCI6MTc4MDI5OTQxMn0.B1WmYhBwz7klf8boMM89IX5QBJaep7ymFnHnSKQmDmo","token_type":"bearer"}
CRUD 処理をするエンドポイントを JWT 認証必須にする¶
まずは送られてきた JWT を検証するために、 security.py に decode_access_token を追加しておく。
# security.py
from datetime import datetime, timedelta, timezone
import bcrypt
import jwt
# 本番では openssl rand -hex 32 とかで生成して環境変数から読み込む
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def hash_password(password: str) -> str:
pwd_bytes = password.encode("utf-8")
salt = bcrypt.gensalt()
return bcrypt.hashpw(pwd_bytes, salt).decode("utf-8")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return bcrypt.checkpw(
plain_password.encode("utf-8"),
hashed_password.encode("utf-8"),
)
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def decode_access_token(token: str) -> dict:
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
Note
jwt.decode は署名や有効期限が無効だとエラーになるので、 JWT を検証したらエラーになる = 認証失敗、 JWT を検証してエラーにならない = 認証処理の続き(データベースにユーザーが存在するかとか)をやる、っていうふうにできる。
次に users.py で、 有効な JWT じゃなかったら 401 を返す get_current_user 関数を追加する。
# users.py
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jwt.exceptions import InvalidTokenError
from sqlmodel import select
from db import SessionDep
from models import User
from security import decode_access_token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
session: SessionDep,
) -> User:
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = decode_access_token(token)
email = payload.get("sub")
if email is None:
raise credentials_exception
except InvalidTokenError:
raise credentials_exception
user = session.exec(select(User).where(User.email == email)).first()
if user is None:
raise credentials_exception
return user
router = APIRouter(
prefix="/users",
tags=["users"],
dependencies=[Depends(get_current_user)],
)
@router.post("/")
def create_user(user: User, session: SessionDep):
session.add(user)
session.commit()
session.refresh(user)
return user
@router.get("/{user_id}")
def read_user(user_id: int, session: SessionDep):
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.patch("/{user_id}")
def update_user(user_id: int, user: User, session: SessionDep):
db_user = session.get(User, user_id)
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
user_data = user.model_dump(exclude_unset=True)
for key, value in user_data.items():
if key == "id":
continue
setattr(db_user, key, value)
session.add(db_user)
session.commit()
session.refresh(db_user)
return db_user
@router.delete("/{user_id}")
def delete_user(user_id: int, session: SessionDep):
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
session.delete(user)
session.commit()
return {"message": "User deleted successfully"}
curl コマンドで JWT 認証が効いてるか確認できるし、 Swagger UI でも確認できる。
% curl -X 'GET' \
'http://localhost:8000/users/1' \
-H 'accept: application/json'
{"detail":"Not authenticated"}
% curl -X 'GET' \
'http://localhost:8000/users/1' \
-H 'accept: application/json' \
-H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhbGljZUBleGFtcGxlLmNvbSIsImV4cCI6MTc4MDMwMDYwMH0.sbkZ1_sVoljTmT6VicTXx7NDD-2rypTJR8WdW-pARfg'
{"name":"alice","email":"alice@example.com","id":1,"hashed_password":"$2b$12$dqIPrPgcnQn4eASah7eHB.5.xc0eZy8pTxPis8H0IUFE3PDrqqhoy"
認証機能自体はできたけど・・・
User クラスしか作っていないせいで、セキュリティ的に良くない感じになってる。
例えば、レスポンスに hashed_password が含まれてるので、ハッシュ化されてるとはいえパスワードをいちいち通信に乗せている・・・
あと、 create_user 関数では引数で User 型のデータを求めてるので、ユーザーにハッシュ化した状態のパスワードを送らせるっていう・・・
なので、本番ではモデル定義のところからちゃんとやらないと。 Full Stack FastAPI Template のモデル定義 はかなり参考になる。