2025-12-14 22:57:54 +03:00

82 lines
2.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
API роутеры для работы с пользователями
"""
from uuid import UUID
from fastapi import APIRouter, status
from fastapi.responses import JSONResponse
from typing import List
from dishka.integrations.fastapi import FromDishka
from src.presentation.schemas.user_schemas import UserCreate, UserUpdate, UserResponse
from src.application.use_cases.user_use_cases import UserUseCases
from src.domain.entities.user import User
router = APIRouter(prefix="/users", tags=["users"])
@router.post("", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
user_data: UserCreate,
use_cases: FromDishka[UserUseCases] = FromDishka()
):
"""Создать пользователя"""
user = await use_cases.create_user(
telegram_id=user_data.telegram_id,
role=user_data.role
)
return UserResponse.from_entity(user)
@router.get("/me", response_model=UserResponse)
async def get_current_user_info(
current_user: FromDishka[User] = FromDishka()
):
"""Получить информацию о текущем пользователе"""
return UserResponse.from_entity(current_user)
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
user_id: UUID,
use_cases: FromDishka[UserUseCases] = FromDishka()
):
"""Получить пользователя по ID"""
user = await use_cases.get_user(user_id)
return UserResponse.from_entity(user)
@router.put("/{user_id}", response_model=UserResponse)
async def update_user(
user_id: UUID,
user_data: UserUpdate,
use_cases: FromDishka[UserUseCases] = FromDishka()
):
"""Обновить пользователя"""
user = await use_cases.update_user(
user_id=user_id,
telegram_id=user_data.telegram_id,
role=user_data.role
)
return UserResponse.from_entity(user)
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
user_id: UUID,
use_cases: FromDishka[UserUseCases] = FromDishka()
):
"""Удалить пользователя"""
await use_cases.delete_user(user_id)
return JSONResponse(status_code=status.HTTP_204_NO_CONTENT, content=None)
@router.get("", response_model=List[UserResponse])
async def list_users(
skip: int = 0,
limit: int = 100,
use_cases: FromDishka[UserUseCases] = FromDishka()
):
"""Получить список пользователей"""
users = await use_cases.list_users(skip=skip, limit=limit)
return [UserResponse.from_entity(user) for user in users]