Skip to content

Latest commit

 

History

History
325 lines (232 loc) · 8.25 KB

Day20.md

File metadata and controls

325 lines (232 loc) · 8.25 KB

[Day20] OAuth2 實例:實作總結

本次的程式碼與目錄結構可以參考 FastAPI Tutorial : Day20 branch

回顧

我們在

  • Day17 完成 hash password 的實作
  • Day18 完成 OAuth2 login 和
    Bearer token Schema 的實作
    與 JWT token 的實作
  • Day19 完成 Authorize Dependency 和 權限管理 的實作

今天我們會完成整體 OAuth2 Authentication 的實作
把各個部分串起來
並修好細節 !

Hash Password 修正

單一 Password 更新 Router

PUT /user/{user_id}PUT /user/{user_id}/password 的時候
這兩個 API 都會更新密碼
應該改為 PUT /user/{user_id}/password 這個 API 才會更新密碼比較合適

schemas/user.py

# ...   

class UserUpdate(UserBase):
    #password: Optional[str] = Field(min_length=6) # <------ 移除 
    avatar: Optional[str] = None
    age: Optional[int] = Field(gt=0,lt=100)
    birthday: Optional[date] = Field()

# ...

api/user.py

@router.put("/users/{user_id}" , response_model=UserSchema.UserUpdateResponse )
    # ...

    # newUser.password = get_password_hash(newUser.password) # <------ 移除

    await UserCrud.update_user(newUser,user_id)
    return newUser

crud/user.py

# ...
    async def update_user(self,newUser: UserSchema.UserUpdate,user_id:int, db_session:AsyncSession=None):
        stmt = update(UserModel).where(UserModel.id == user_id).values(
            # password=newUser.password, # <------ 新增
            name=newUser.name,
            age=newUser.age,
            birthday=newUser.birthday,
            avatar=newUser.avatar
        )

Hash Password 注入點

create_userupdate_user_password 這兩個 API 中
將明碼密碼改成 hash 過的密碼都是寫在 api/user.py
應該把這部分的邏輯寫在 crud/user.py


api/users.py

# ... 
async def create_user(newUser: UserSchema.UserCreate ):

    # ...

    # newUser.password = get_password_hash(newUser.password) # <------ 移除
    user = await UserCrud.create_user(newUser)
    return vars(user)


async def update_user_password(newUser:UserSchema.UserUpdatePassword,user_id:int=Depends(check_user_id)):

    # newUser.password = get_password_hash(newUser.password) # <------ 移除
    await UserCrud.update_user_password(newUser,user_id)
    return 

crud/user.py

    async def create_user(self,newUser: UserSchema.UserCreate, db_session:AsyncSession=None ):
        user = UserModel(
            name=newUser.name,
            password=get_password_hash(newUser.password), # <------ 新增
            # ...
        )
        # ...
        return user

    # ...

    async def update_user_password(self,newUser: UserSchema.UserUpdatePassword,user_id:int, db_session:AsyncSession=None):
        stmt = update(UserModel).where(UserModel.id == user_id).values(
            password=get_password_hash(newUser.password), # <------ 新增
        )
        # ...

驗證權限

我們在 Day19 只有為 update_user 這個 API 加上了權限驗證


接下來加上 CurrentUser Schema 確保傳遞正確
並為所有需要驗證權限的 API 加上 get_current_user dependency

  • PUT /user/{user_id}
  • PUT /user/{user_id}/password
  • DELETE /user/{user_id}

schemas/user.py

class CurrentUser(BaseModel):
    id: int
    name: str
    email: str

api/user.py

# ...
async def update_user_password(
    newUser:UserSchema.UserUpdatePassword,
    user_id:int=Depends(check_user_id),
    current_user:UserSchema.CurrentUser=Depends(get_current_user) # <------ 新增
):
    # ...
    if current_user.id != user_id: # <------ 新增
        raise Exception403
    # ...

# ...
async def delete_users(
    user_id:int = Depends(check_user_id),
    user:UserSchema.CurrentUser = Depends(get_current_user) # <------ 新增
):

    # ...
    if user.id != user_id:
        raise Exception403
    # ...

Item 相關功能

Item Schema

我們為 GET /items/{item_id} 加上 ItemInfor Schema
再新增 UpdateItem Schema


schemas/item.py

# ...

class ItemInfor(ItemRead):
    brand: str
    description: Optional[str] = None

class ItemUpdate(ItemBase):
    name: Optional[str] = None
    price: Optional[float] = None
    brand: Optional[str] = None
    description: Optional[str] = None

# ...

Item CRUD

我們從 CRUD 開始的範例都以 User 為主
所以 Item 的 CRUD 只需要照著 User 的 CRUD 做一些修改就可以了


先 import 相關的 Schema 、 Model
和 SQLAlchemy 的操作、AsyncSession
crud_class_decorator
crud/item.py

@crud_class_decorator
class ItemCrudManager:

    async def get_item(self, item_id: int):
        stmt = select(ItemModel).where(ItemModel.id == item_id)
        result = await self.db_session.execute(stmt)
        return result.scalars().first()

    async def get_items(self, skip: int = 0, limit: int = 100):
        stmt = select(ItemModel).offset(skip).limit(limit)
        result = await self.db_session.execute(stmt)
        return result.scalars().all()

    async def create_item(self, newItem: ItemSchema.ItemCreate):
        item = ItemModel(
            name=newItem.name,
            price=newItem.price,
            brand=newItem.brand,
            description=newItem.description
        )
        self.db_session.add(item)
        await self.db_session.commit()
        await self.db_session.refresh(item)
        return item

    async def update_item(self, newItem: ItemSchema.ItemUpdate, item_id: int):
        stmt = update(ItemModel).where(ItemModel.id == item_id).values(
            name=newItem.name,
            price=newItem.price,
            brand=newItem.brand,
            description=newItem.description
        )
        await self.db_session.execute(stmt)
        await self.db_session.commit()
        return

    async def delete_item(self, item_id: int):
        stmt = delete(ItemModel).where(ItemModel.id == item_id)
        await self.db_session.execute(stmt)
        await self.db_session.commit()
        return

Item 權限管理

接下來要為 Item 加上權限管理
避免 Item 被不屬於自己的 User 修改、刪除


所以一樣可以透過我們的 get_current_user 來取得當前的 User
check_item_id 來確認 Item 是否存在
來檢查 item.user_id 是否等於 current_user.id


api/item.py

@router.put("/items/{item_id}" , response_model=ItemUpdate)
async def update_items(
    updateItem: ItemUpdate, 
    item:CurrentItem = Depends(check_item_id),
    user:CurrentUser = Depends(get_current_user)):
    
    if item.user_id != user.id:
        raise HTTPException(status_code=403, detail="Forbidden")
    
    item = await ItemCrud.update_item_by_id(item.id,updateItem)
    return item


@router.delete("/items/{item_id}")
async def delete_items(
    item:CurrentItem = Depends(check_item_id),
    user:CurrentUser = Depends(get_current_user)):

    if item.user_id != user.id:
        raise HTTPException(status_code=403, detail="Forbidden")
        
    await ItemCrud.delete_item_by_id(item.id)

透過 Depends 來取得 itemuser
只需要額外加上一行判斷就可以完成權限管理!


當我們以 user1 的身份來修改 test 的 Item 時

403

就會回傳 403 Permissison Denied


總結

延續著 User 與 Item 的 CRUD
我們完成了整個 OAuth2 Authentication 的實作
並且加上了權限管理

swagger

快看到需要驗證權限的 API 都有 lock 的圖示了!