142 lines
6.1 KiB
Python
142 lines
6.1 KiB
Python
"""
|
||
Use cases для работы с коллекциями
|
||
"""
|
||
from uuid import UUID
|
||
from typing import Optional
|
||
from src.domain.entities.collection import Collection
|
||
from src.domain.entities.collection_access import CollectionAccess
|
||
from src.domain.repositories.collection_repository import ICollectionRepository
|
||
from src.domain.repositories.collection_access_repository import ICollectionAccessRepository
|
||
from src.domain.repositories.user_repository import IUserRepository
|
||
from src.shared.exceptions import NotFoundError, ForbiddenError
|
||
|
||
|
||
class CollectionUseCases:
|
||
"""Use cases для коллекций"""
|
||
|
||
def __init__(
|
||
self,
|
||
collection_repository: ICollectionRepository,
|
||
access_repository: ICollectionAccessRepository,
|
||
user_repository: IUserRepository
|
||
):
|
||
self.collection_repository = collection_repository
|
||
self.access_repository = access_repository
|
||
self.user_repository = user_repository
|
||
|
||
async def create_collection(
|
||
self,
|
||
name: str,
|
||
owner_id: UUID,
|
||
description: str = "",
|
||
is_public: bool = False
|
||
) -> Collection:
|
||
"""Создать коллекцию"""
|
||
owner = await self.user_repository.get_by_id(owner_id)
|
||
if not owner:
|
||
raise NotFoundError(f"Пользователь {owner_id} не найден")
|
||
|
||
collection = Collection(
|
||
name=name,
|
||
owner_id=owner_id,
|
||
description=description,
|
||
is_public=is_public
|
||
)
|
||
return await self.collection_repository.create(collection)
|
||
|
||
async def get_collection(self, collection_id: UUID) -> Collection:
|
||
"""Получить коллекцию по ID"""
|
||
collection = await self.collection_repository.get_by_id(collection_id)
|
||
if not collection:
|
||
raise NotFoundError(f"Коллекция {collection_id} не найдена")
|
||
return collection
|
||
|
||
async def update_collection(
|
||
self,
|
||
collection_id: UUID,
|
||
user_id: UUID,
|
||
name: str | None = None,
|
||
description: str | None = None,
|
||
is_public: bool | None = None
|
||
) -> Collection:
|
||
"""Обновить коллекцию"""
|
||
collection = await self.get_collection(collection_id)
|
||
|
||
if collection.owner_id != user_id:
|
||
raise ForbiddenError("Только владелец может изменять коллекцию")
|
||
|
||
if name is not None:
|
||
collection.name = name
|
||
if description is not None:
|
||
collection.description = description
|
||
if is_public is not None:
|
||
collection.is_public = is_public
|
||
|
||
return await self.collection_repository.update(collection)
|
||
|
||
async def delete_collection(self, collection_id: UUID, user_id: UUID) -> bool:
|
||
"""Удалить коллекцию"""
|
||
collection = await self.get_collection(collection_id)
|
||
|
||
if collection.owner_id != user_id:
|
||
raise ForbiddenError("Только владелец может удалять коллекцию")
|
||
|
||
return await self.collection_repository.delete(collection_id)
|
||
|
||
async def grant_access(self, collection_id: UUID, user_id: UUID, owner_id: UUID) -> CollectionAccess:
|
||
"""Предоставить доступ пользователю к коллекции"""
|
||
collection = await self.get_collection(collection_id)
|
||
|
||
if collection.owner_id != owner_id:
|
||
raise ForbiddenError("Только владелец может предоставлять доступ")
|
||
|
||
user = await self.user_repository.get_by_id(user_id)
|
||
if not user:
|
||
raise NotFoundError(f"Пользователь {user_id} не найден")
|
||
|
||
existing_access = await self.access_repository.get_by_user_and_collection(user_id, collection_id)
|
||
if existing_access:
|
||
return existing_access
|
||
|
||
access = CollectionAccess(user_id=user_id, collection_id=collection_id)
|
||
return await self.access_repository.create(access)
|
||
|
||
async def revoke_access(self, collection_id: UUID, user_id: UUID, owner_id: UUID) -> bool:
|
||
"""Отозвать доступ пользователя к коллекции"""
|
||
collection = await self.get_collection(collection_id)
|
||
|
||
if collection.owner_id != owner_id:
|
||
raise ForbiddenError("Только владелец может отзывать доступ")
|
||
|
||
return await self.access_repository.delete_by_user_and_collection(user_id, collection_id)
|
||
|
||
async def check_access(self, collection_id: UUID, user_id: UUID) -> bool:
|
||
"""Проверить доступ пользователя к коллекции"""
|
||
collection = await self.get_collection(collection_id)
|
||
|
||
if collection.owner_id == user_id:
|
||
return True
|
||
|
||
if collection.is_public:
|
||
return True
|
||
|
||
access = await self.access_repository.get_by_user_and_collection(user_id, collection_id)
|
||
return access is not None
|
||
|
||
async def list_user_collections(self, user_id: UUID, skip: int = 0, limit: int = 100) -> list[Collection]:
|
||
"""Получить коллекции, доступные пользователю"""
|
||
owned = await self.collection_repository.list_by_owner(user_id, skip=skip, limit=limit)
|
||
|
||
public = await self.collection_repository.list_public(skip=skip, limit=limit)
|
||
|
||
accesses = await self.access_repository.list_by_user(user_id)
|
||
accessed_collections = []
|
||
for access in accesses:
|
||
collection = await self.collection_repository.get_by_id(access.collection_id)
|
||
if collection:
|
||
accessed_collections.append(collection)
|
||
|
||
all_collections = {c.collection_id: c for c in owned + public + accessed_collections}
|
||
return list(all_collections.values())[skip:skip+limit]
|
||
|