Skip to content

認証(bcrypt)

データベースに存在するユーザーだけが CRUD 処理できるようにする。
※コードはデータベースからの続き


(準備)データベースにパスワード列の追加

パスワードハッシュ化アルゴリズム

ここではパスワードのハッシュ化に bcrypt を使うことにする。
アルゴリズムを選択する時は OWASP の Password Storage Cheat Sheet が参考になる。

前のページまで使ってた app.db は削除しておいて、 models.py を編集する。

# models.py
from sqlmodel import SQLModel, Field


class User(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    name: str
    email: str
    hashed_password: str

パスワードのハッシュ化と検証をするコードを作成。まずは bcrypt をインストールしてから。

uv add bcrypt

# security.py
import bcrypt


def hash_password(password: str) -> str:
    pwd_bytes = password.encode("utf-8")
    salt = bcrypt.gensalt()
    return bcrypt.hashpw(pwd_bytes, salt).decode("utf-8")


def verify_password(plain_password: str, hashed_password: str) -> bool:
    return bcrypt.checkpw(
        plain_password.encode("utf-8"),
        hashed_password.encode("utf-8"),
    )

データベースを新しく作るための db_init.py の編集。

# db_init.py
from sqlmodel import Session

from db import engine, create_db_and_tables
from models import User
from security import hash_password

create_db_and_tables()

with Session(engine) as session:
    user = User(
        name="alice",
        email="alice@example.com",
        hashed_password=hash_password("secret_password"),
    )
    session.add(user)
    session.commit()
    session.refresh(user)

db_init.py を作成したら実行しておく。

uv run python db_init.py

Info

sqlite3 コマンドでデータベースを確認すると、 hashed_password が追加されてる。

% sqlite3 app.db 
SQLite version 3.51.0 2025-06-12 13:14:41
Enter ".help" for usage hints.
sqlite> .header on
sqlite> SELECT * FROM user;
id|name|email|hashed_password
1|alice|alice@example.com|$2b$12$dqIPrPgcnQn4eASah7eHB.5.xc0eZy8pTxPis8H0IUFE3PDrqqhoy


JWTを発行するエンドポイントの作成

JWTの署名アルゴリズム

ここでは JWT の署名アルゴリズムには HS256 を使うことにする。
PyJWT を使う時は、 https://pyjwt.readthedocs.io/en/stable/algorithms.html で使えるアルゴリズムを確認できる。
アルゴリズムの強さを確認する時は、 https://nvlpubs.nist.gov/nistpubs/SpecialPublications/NIST.SP.800-57pt1r5.pdf が参考になる。
RS256 / EdDSA との比較は JWT(RS256/EdDSA) を参照。

security.py にJWTを生成するコードを追加。まずは pyjwt をインストールしてから。

uv add pyjwt

# security.py
from datetime import datetime, timedelta, timezone

import bcrypt
import jwt

# 本番では openssl rand -hex 32 とかで生成して環境変数から読み込む
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


def hash_password(password: str) -> str:
    pwd_bytes = password.encode("utf-8")
    salt = bcrypt.gensalt()
    return bcrypt.hashpw(pwd_bytes, salt).decode("utf-8")


def verify_password(plain_password: str, hashed_password: str) -> bool:
    return bcrypt.checkpw(
        plain_password.encode("utf-8"),
        hashed_password.encode("utf-8"),
    )


def create_access_token(data: dict) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

JWT を生成するエンドポイントを main.py に追加する。

# main.py
from typing import Annotated

from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlmodel import select

from db import SessionDep
from models import User
from security import (
    verify_password,
    create_access_token,
)
from users import router as users_router

app = FastAPI()

app.include_router(users_router)


@app.get("/")
def read_root():
    return {"Hello": "World"}


@app.post("/token")
def login(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
    session: SessionDep,
):
    user = session.exec(select(User).where(User.email == form_data.username)).first()
    if not user or not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=401,
            detail="Incorrect email or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token = create_access_token(data={"sub": user.email})
    return {"access_token": access_token, "token_type": "bearer"}

curl コマンドでJWTが発行されるか確認できるし、 Swagger UI でも確認できる。

% curl -X 'POST' \
  'http://localhost:8000/token' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  -d 'grant_type=password&username=alice%40example.com&password=secret_password'                             
{"access_token":"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhbGljZUBleGFtcGxlLmNvbSIsImV4cCI6MTc4MDI5OTQxMn0.B1WmYhBwz7klf8boMM89IX5QBJaep7ymFnHnSKQmDmo","token_type":"bearer"}


CRUD 処理をするエンドポイントを JWT 認証必須にする

まずは送られてきた JWT を検証するために、 security.pydecode_access_token を追加しておく。

# security.py
from datetime import datetime, timedelta, timezone

import bcrypt
import jwt

# 本番では openssl rand -hex 32 とかで生成して環境変数から読み込む
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


def hash_password(password: str) -> str:
    pwd_bytes = password.encode("utf-8")
    salt = bcrypt.gensalt()
    return bcrypt.hashpw(pwd_bytes, salt).decode("utf-8")


def verify_password(plain_password: str, hashed_password: str) -> bool:
    return bcrypt.checkpw(
        plain_password.encode("utf-8"),
        hashed_password.encode("utf-8"),
    )


def create_access_token(data: dict) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


def decode_access_token(token: str) -> dict:
    return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])

Note

jwt.decode は署名や有効期限が無効だとエラーになるので、 JWT を検証したらエラーになる = 認証失敗、 JWT を検証してエラーにならない = 認証処理の続き(データベースにユーザーが存在するかとか)をやる、っていうふうにできる。

次に users.py で、 有効な JWT じゃなかったら 401 を返す get_current_user 関数を追加する。

# users.py
from typing import Annotated

from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jwt.exceptions import InvalidTokenError
from sqlmodel import select

from db import SessionDep
from models import User
from security import decode_access_token

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


def get_current_user(
    token: Annotated[str, Depends(oauth2_scheme)],
    session: SessionDep,
) -> User:
    credentials_exception = HTTPException(
        status_code=401,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = decode_access_token(token)
        email = payload.get("sub")
        if email is None:
            raise credentials_exception
    except InvalidTokenError:
        raise credentials_exception

    user = session.exec(select(User).where(User.email == email)).first()
    if user is None:
        raise credentials_exception
    return user


router = APIRouter(
    prefix="/users",
    tags=["users"],
    dependencies=[Depends(get_current_user)],
)


@router.post("/")
def create_user(user: User, session: SessionDep):
    session.add(user)
    session.commit()
    session.refresh(user)
    return user


@router.get("/{user_id}")
def read_user(user_id: int, session: SessionDep):
    user = session.get(User, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user


@router.patch("/{user_id}")
def update_user(user_id: int, user: User, session: SessionDep):
    db_user = session.get(User, user_id)
    if not db_user:
        raise HTTPException(status_code=404, detail="User not found")
    user_data = user.model_dump(exclude_unset=True)
    for key, value in user_data.items():
        if key == "id":
            continue
        setattr(db_user, key, value)
    session.add(db_user)
    session.commit()
    session.refresh(db_user)
    return db_user


@router.delete("/{user_id}")
def delete_user(user_id: int, session: SessionDep):
    user = session.get(User, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    session.delete(user)
    session.commit()
    return {"message": "User deleted successfully"}

curl コマンドで JWT 認証が効いてるか確認できるし、 Swagger UI でも確認できる。

% curl -X 'GET' \
  'http://localhost:8000/users/1' \
  -H 'accept: application/json'
{"detail":"Not authenticated"}

% curl -X 'GET' \
  'http://localhost:8000/users/1' \
  -H 'accept: application/json' \
  -H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhbGljZUBleGFtcGxlLmNvbSIsImV4cCI6MTc4MDMwMDYwMH0.sbkZ1_sVoljTmT6VicTXx7NDD-2rypTJR8WdW-pARfg'
{"name":"alice","email":"alice@example.com","id":1,"hashed_password":"$2b$12$dqIPrPgcnQn4eASah7eHB.5.xc0eZy8pTxPis8H0IUFE3PDrqqhoy"

認証機能自体はできたけど・・・

User クラスしか作っていないせいで、セキュリティ的に良くない感じになってる。
例えば、レスポンスに hashed_password が含まれてるので、ハッシュ化されてるとはいえパスワードをいちいち通信に乗せている・・・
あと、 create_user 関数では引数で User 型のデータを求めてるので、ユーザーにハッシュ化した状態のパスワードを送らせるっていう・・・
なので、本番ではモデル定義のところからちゃんとやらないと。 Full Stack FastAPI Template のモデル定義 はかなり参考になる。