forked from HSE_team/BetterCallPraskovia
746 lines
32 KiB
Python
746 lines
32 KiB
Python
from aiogram import Router, F
|
||
from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery
|
||
from aiogram.filters import Command, StateFilter
|
||
from aiogram.fsm.context import FSMContext
|
||
import aiohttp
|
||
from tg_bot.config.settings import settings
|
||
from tg_bot.infrastructure.http_client import create_http_session
|
||
from tg_bot.infrastructure.telegram.states.collection_states import (
|
||
CollectionAccessStates,
|
||
CollectionEditStates
|
||
)
|
||
|
||
router = Router()
|
||
|
||
|
||
async def get_user_collections(telegram_id: str):
|
||
try:
|
||
async with create_http_session() as session:
|
||
async with session.get(
|
||
f"{settings.BACKEND_URL}/collections/",
|
||
headers={"X-Telegram-ID": telegram_id}
|
||
) as response:
|
||
if response.status == 200:
|
||
return await response.json()
|
||
return []
|
||
except Exception as e:
|
||
print(f"Error getting collections: {e}")
|
||
return []
|
||
|
||
|
||
async def get_collection_documents(collection_id: str, telegram_id: str):
|
||
try:
|
||
collection_id = str(collection_id).strip()
|
||
url = f"{settings.BACKEND_URL}/documents/collection/{collection_id}"
|
||
print(f"DEBUG get_collection_documents: URL={url}, collection_id={collection_id}, telegram_id={telegram_id}")
|
||
|
||
async with create_http_session() as session:
|
||
async with session.get(
|
||
url,
|
||
headers={"X-Telegram-ID": telegram_id}
|
||
) as response:
|
||
if response.status == 200:
|
||
return await response.json()
|
||
elif response.status == 422:
|
||
error_text = await response.text()
|
||
print(f"Validation error getting documents: {response.status} - {error_text}, collection_id: {collection_id}, URL: {url}")
|
||
return []
|
||
else:
|
||
error_text = await response.text()
|
||
print(f"Error getting documents: {response.status} - {error_text}, collection_id: {collection_id}, URL: {url}")
|
||
return []
|
||
except Exception as e:
|
||
print(f"Exception getting documents: {e}, collection_id: {collection_id}, type: {type(collection_id)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return []
|
||
|
||
|
||
async def search_in_collection(collection_id: str, query: str, telegram_id: str):
|
||
try:
|
||
async with create_http_session() as session:
|
||
async with session.get(
|
||
f"{settings.BACKEND_URL}/documents/collection/{collection_id}",
|
||
params={"search": query},
|
||
headers={"X-Telegram-ID": telegram_id}
|
||
) as response:
|
||
if response.status == 200:
|
||
return await response.json()
|
||
return []
|
||
except Exception as e:
|
||
print(f"Error searching: {e}")
|
||
return []
|
||
|
||
|
||
async def get_collection_info(collection_id: str, telegram_id: str):
|
||
"""Получить информацию о коллекции"""
|
||
try:
|
||
collection_id = str(collection_id).strip()
|
||
url = f"{settings.BACKEND_URL}/collections/{collection_id}"
|
||
print(f"DEBUG get_collection_info: URL={url}, collection_id={collection_id}, telegram_id={telegram_id}")
|
||
|
||
async with create_http_session() as session:
|
||
async with session.get(
|
||
url,
|
||
headers={"X-Telegram-ID": telegram_id}
|
||
) as response:
|
||
if response.status == 200:
|
||
return await response.json()
|
||
elif response.status == 422:
|
||
error_text = await response.text()
|
||
print(f"Validation error getting collection info: {response.status} - {error_text}, collection_id: {collection_id}, URL: {url}")
|
||
return None
|
||
else:
|
||
error_text = await response.text()
|
||
print(f"Error getting collection info: {response.status} - {error_text}, collection_id: {collection_id}, URL: {url}")
|
||
return None
|
||
except Exception as e:
|
||
print(f"Exception getting collection info: {e}, collection_id: {collection_id}, type: {type(collection_id)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return None
|
||
|
||
|
||
async def get_collection_access_list(collection_id: str, telegram_id: str):
|
||
"""Получить список пользователей с доступом к коллекции"""
|
||
try:
|
||
async with create_http_session() as session:
|
||
async with session.get(
|
||
f"{settings.BACKEND_URL}/collections/{collection_id}/access",
|
||
headers={"X-Telegram-ID": telegram_id}
|
||
) as response:
|
||
if response.status == 200:
|
||
return await response.json()
|
||
return []
|
||
except Exception as e:
|
||
print(f"Error getting access list: {e}")
|
||
return []
|
||
|
||
|
||
async def grant_collection_access(collection_id: str, telegram_id: str, owner_telegram_id: str):
|
||
"""Предоставить доступ к коллекции"""
|
||
try:
|
||
url = f"{settings.BACKEND_URL}/collections/{collection_id}/access/telegram/{telegram_id}"
|
||
print(f"DEBUG grant_collection_access: URL={url}, target_telegram_id={telegram_id}, owner_telegram_id={owner_telegram_id}")
|
||
|
||
async with create_http_session() as session:
|
||
async with session.post(
|
||
url,
|
||
headers={"X-Telegram-ID": owner_telegram_id}
|
||
) as response:
|
||
if response.status == 201:
|
||
result = await response.json()
|
||
print(f"DEBUG: Access granted successfully: {result}")
|
||
return result
|
||
else:
|
||
error_text = await response.text()
|
||
print(f"ERROR granting access: status={response.status}, error={error_text}, target_telegram_id={telegram_id}")
|
||
return None
|
||
except Exception as e:
|
||
print(f"Exception granting access: {e}, target_telegram_id={telegram_id}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return None
|
||
|
||
|
||
async def revoke_collection_access(collection_id: str, telegram_id: str, owner_telegram_id: str):
|
||
"""Отозвать доступ к коллекции"""
|
||
try:
|
||
async with create_http_session() as session:
|
||
async with session.delete(
|
||
f"{settings.BACKEND_URL}/collections/{collection_id}/access/telegram/{telegram_id}",
|
||
headers={"X-Telegram-ID": owner_telegram_id}
|
||
) as response:
|
||
return response.status == 204
|
||
except Exception as e:
|
||
print(f"Error revoking access: {e}")
|
||
return False
|
||
|
||
|
||
@router.message(Command("mycollections"))
|
||
async def cmd_mycollections(message: Message):
|
||
telegram_id = str(message.from_user.id)
|
||
collections = await get_user_collections(telegram_id)
|
||
|
||
if not collections:
|
||
await message.answer(
|
||
"<b>У вас пока нет коллекций</b>\n\n"
|
||
"Обратитесь к администратору для создания коллекций и добавления документов.",
|
||
parse_mode="HTML"
|
||
)
|
||
return
|
||
|
||
response = "<b>Ваши коллекции документов:</b>\n\n"
|
||
keyboard_buttons = []
|
||
|
||
for i, collection in enumerate(collections[:10], 1):
|
||
name = collection.get("name", "Без названия")
|
||
description = collection.get("description", "")
|
||
collection_id = collection.get("collection_id")
|
||
|
||
response += f"{i}. <b>{name}</b>\n"
|
||
if description:
|
||
response += f" <i>{description[:50]}...</i>\n"
|
||
response += f" ID: <code>{collection_id}</code>\n\n"
|
||
|
||
keyboard_buttons.append([
|
||
InlineKeyboardButton(
|
||
text=f"{name}",
|
||
callback_data=f"collection:{collection_id}"
|
||
)
|
||
])
|
||
|
||
keyboard = InlineKeyboardMarkup(inline_keyboard=keyboard_buttons)
|
||
|
||
response += "<i>Нажмите на коллекцию, чтобы посмотреть документы</i>"
|
||
|
||
await message.answer(response, parse_mode="HTML", reply_markup=keyboard)
|
||
|
||
|
||
@router.message(Command("search"))
|
||
async def cmd_search(message: Message):
|
||
parts = message.text.split(maxsplit=2)
|
||
if len(parts) < 3:
|
||
telegram_id = str(message.from_user.id)
|
||
collections = await get_user_collections(telegram_id)
|
||
|
||
if not collections:
|
||
await message.answer(
|
||
"<b>Использование:</b> /search <collection_id> <запрос>\n\n"
|
||
"У вас пока нет коллекций. Обратитесь к администратору.",
|
||
parse_mode="HTML"
|
||
)
|
||
return
|
||
|
||
response = "<b>Выберите коллекцию для поиска:</b>\n\n"
|
||
response += "Использование: /search <collection_id> <запрос>\n\n"
|
||
response += "<b>Доступные коллекции:</b>\n"
|
||
for collection in collections[:5]:
|
||
name = collection.get("name", "Без названия")
|
||
collection_id = collection.get("collection_id")
|
||
response += f"• <b>{name}</b>\n <code>{collection_id}</code>\n\n"
|
||
response += "Пример: /search " + collections[0].get("collection_id", "")[:8] + "... Как оформить договор?"
|
||
|
||
await message.answer(response, parse_mode="HTML")
|
||
return
|
||
|
||
collection_id = parts[1]
|
||
query = parts[2]
|
||
telegram_id = str(message.from_user.id)
|
||
|
||
await message.bot.send_chat_action(message.chat.id, "typing")
|
||
|
||
results = await search_in_collection(collection_id, query, telegram_id)
|
||
|
||
if not results:
|
||
await message.answer(
|
||
f"<b>Ничего не найдено</b>\n\n"
|
||
f"По запросу <i>\"{query}\"</i> в коллекции ничего не найдено.\n\n"
|
||
f"Попробуйте другой запрос или используйте /mycollections для просмотра доступных коллекций.",
|
||
parse_mode="HTML"
|
||
)
|
||
return
|
||
|
||
response = f"<b>Результаты поиска:</b> \"{query}\"\n\n"
|
||
for i, doc in enumerate(results[:5], 1):
|
||
title = doc.get("title", "Без названия")
|
||
content = doc.get("content", "")[:200]
|
||
response += f"{i}. <b>{title}</b>\n"
|
||
response += f" <i>{content}...</i>\n\n"
|
||
|
||
await message.answer(response, parse_mode="HTML")
|
||
|
||
|
||
@router.callback_query(lambda c: c.data.startswith("collection:") and not c.data.startswith("collection:documents:") and not c.data.startswith("collection:edit:") and not c.data.startswith("collection:access:") and not c.data.startswith("collection:view_access:"))
|
||
async def show_collection_menu(callback: CallbackQuery):
|
||
"""Показать меню коллекции с опциями в зависимости от прав"""
|
||
parts = callback.data.split(":", 1)
|
||
if len(parts) < 2:
|
||
await callback.message.answer(
|
||
"<b>Ошибка</b>\n\nНеверный формат данных.",
|
||
parse_mode="HTML"
|
||
)
|
||
await callback.answer()
|
||
return
|
||
|
||
collection_id = parts[1]
|
||
telegram_id = str(callback.from_user.id)
|
||
|
||
print(f"DEBUG: collection_id from callback (menu): {collection_id}, callback_data: {callback.data}")
|
||
|
||
await callback.answer("Загружаю информацию...")
|
||
|
||
collection_info = await get_collection_info(collection_id, telegram_id)
|
||
if not collection_info:
|
||
await callback.message.answer(
|
||
"<b>Ошибка</b>\n\nНе удалось загрузить информацию о коллекции.",
|
||
parse_mode="HTML"
|
||
)
|
||
return
|
||
|
||
owner_id = collection_info.get("owner_id")
|
||
collection_name = collection_info.get("name", "Коллекция")
|
||
|
||
try:
|
||
async with create_http_session() as session:
|
||
async with session.get(
|
||
f"{settings.BACKEND_URL}/users/telegram/{telegram_id}"
|
||
) as response:
|
||
if response.status == 200:
|
||
user_info = await response.json()
|
||
current_user_id = user_info.get("user_id")
|
||
is_owner = str(owner_id) == str(current_user_id)
|
||
else:
|
||
is_owner = False
|
||
except:
|
||
is_owner = False
|
||
|
||
keyboard_buttons = []
|
||
|
||
collection_id_str = str(collection_id)
|
||
|
||
if is_owner:
|
||
keyboard_buttons = [
|
||
[InlineKeyboardButton(text="Просмотр документов", callback_data=f"collection:documents:{collection_id_str}")],
|
||
[InlineKeyboardButton(text="Редактировать коллекцию", callback_data=f"collection:edit:{collection_id_str}")],
|
||
[InlineKeyboardButton(text="Управление доступом", callback_data=f"collection:access:{collection_id_str}")],
|
||
[InlineKeyboardButton(text="Загрузить документ", callback_data=f"document:upload:{collection_id_str}")],
|
||
[InlineKeyboardButton(text="Назад к коллекциям", callback_data="collections:list")]
|
||
]
|
||
else:
|
||
keyboard_buttons = [
|
||
[InlineKeyboardButton(text="Просмотр документов", callback_data=f"collection:documents:{collection_id_str}")],
|
||
[InlineKeyboardButton(text="Просмотр доступа", callback_data=f"collection:view_access:{collection_id_str}")],
|
||
[InlineKeyboardButton(text="Назад к коллекциям", callback_data="collections:list")]
|
||
]
|
||
|
||
keyboard = InlineKeyboardMarkup(inline_keyboard=keyboard_buttons)
|
||
|
||
role_text = "<b>Владелец</b>" if is_owner else "<b>Доступ</b>"
|
||
response = f"<b>{collection_name}</b>\n\n"
|
||
response += f"{role_text}\n\n"
|
||
response += f"ID: <code>{collection_id}</code>\n\n"
|
||
response += "Выберите действие:"
|
||
|
||
await callback.message.answer(response, parse_mode="HTML", reply_markup=keyboard)
|
||
|
||
|
||
@router.callback_query(lambda c: c.data.startswith("collection:documents:"))
|
||
async def show_collection_documents(callback: CallbackQuery):
|
||
"""Показать документы коллекции"""
|
||
try:
|
||
parts = callback.data.split(":", 2)
|
||
if len(parts) < 3:
|
||
raise ValueError("Неверный формат callback_data")
|
||
|
||
collection_id = parts[2]
|
||
telegram_id = str(callback.from_user.id)
|
||
|
||
print(f"DEBUG: collection_id from callback: {collection_id}, callback_data: {callback.data}")
|
||
|
||
await callback.answer("Загружаю документы...")
|
||
|
||
collection_info = await get_collection_info(collection_id, telegram_id)
|
||
if not collection_info:
|
||
await callback.message.answer(
|
||
"<b>Ошибка</b>\n\nНе удалось загрузить информацию о коллекции. Проверьте, что у вас есть доступ к этой коллекции.",
|
||
parse_mode="HTML"
|
||
)
|
||
return
|
||
|
||
documents = await get_collection_documents(collection_id, telegram_id)
|
||
|
||
if not documents:
|
||
await callback.message.answer(
|
||
f"<b>Коллекция пуста</b>\n\n"
|
||
f"В этой коллекции пока нет документов.",
|
||
parse_mode="HTML"
|
||
)
|
||
return
|
||
except IndexError:
|
||
await callback.message.answer(
|
||
"<b>Ошибка</b>\n\nНеверный формат данных.",
|
||
parse_mode="HTML"
|
||
)
|
||
await callback.answer()
|
||
return
|
||
except Exception as e:
|
||
print(f"Error in show_collection_documents: {e}")
|
||
await callback.message.answer(
|
||
f"<b>Ошибка</b>\n\nПроизошла ошибка при загрузке документов: {str(e)}",
|
||
parse_mode="HTML"
|
||
)
|
||
await callback.answer()
|
||
return
|
||
|
||
response = f"<b>Документы в коллекции:</b>\n\n"
|
||
keyboard_buttons = []
|
||
|
||
for i, doc in enumerate(documents[:10], 1):
|
||
doc_id = doc.get("document_id")
|
||
title = doc.get("title", "Без названия")
|
||
content_preview = doc.get("content", "")[:100]
|
||
response += f"{i}. <b>{title}</b>\n"
|
||
if content_preview:
|
||
response += f" <i>{content_preview}...</i>\n"
|
||
response += "\n"
|
||
|
||
keyboard_buttons.append([
|
||
InlineKeyboardButton(
|
||
text=f"{title[:30]}",
|
||
callback_data=f"document:view:{doc_id}"
|
||
)
|
||
])
|
||
|
||
if len(documents) > 10:
|
||
response += f"\n<i>Показано 10 из {len(documents)} документов</i>"
|
||
|
||
|
||
collection_id_for_back = str(collection_info.get("collection_id", collection_id))
|
||
keyboard_buttons.append([
|
||
InlineKeyboardButton(text="Назад", callback_data=f"collection:{collection_id_for_back}")
|
||
])
|
||
|
||
keyboard = InlineKeyboardMarkup(inline_keyboard=keyboard_buttons)
|
||
await callback.message.answer(response, parse_mode="HTML", reply_markup=keyboard)
|
||
|
||
|
||
@router.callback_query(lambda c: c.data.startswith("collection:access:"))
|
||
async def show_access_management(callback: CallbackQuery):
|
||
"""Показать меню управления доступом (только для владельца)"""
|
||
collection_id = callback.data.split(":")[2]
|
||
telegram_id = str(callback.from_user.id)
|
||
|
||
await callback.answer("Загружаю список доступа...")
|
||
|
||
access_list = await get_collection_access_list(collection_id, telegram_id)
|
||
|
||
response = "<b>Управление доступом</b>\n\n"
|
||
response += "<b>Пользователи с доступом:</b>\n\n"
|
||
|
||
keyboard_buttons = []
|
||
|
||
if access_list:
|
||
for i, access in enumerate(access_list[:10], 1):
|
||
user = access.get("user", {})
|
||
user_telegram_id = user.get("telegram_id", "N/A")
|
||
role = user.get("role", "user")
|
||
response += f"{i}. <code>{user_telegram_id}</code> ({role})\n"
|
||
|
||
keyboard_buttons.append([
|
||
InlineKeyboardButton(
|
||
text=f" Удалить {user_telegram_id}",
|
||
callback_data=f"access:remove:{collection_id}:{user_telegram_id}"
|
||
)
|
||
])
|
||
else:
|
||
response += "<i>Нет пользователей с доступом</i>\n\n"
|
||
|
||
keyboard_buttons.extend([
|
||
[InlineKeyboardButton(text="Добавить доступ", callback_data=f"access:add:{collection_id}")],
|
||
[InlineKeyboardButton(text="Назад", callback_data=f"collection:{collection_id}")]
|
||
])
|
||
|
||
keyboard = InlineKeyboardMarkup(inline_keyboard=keyboard_buttons)
|
||
await callback.message.answer(response, parse_mode="HTML", reply_markup=keyboard)
|
||
|
||
|
||
@router.callback_query(lambda c: c.data.startswith("collection:view_access:"))
|
||
async def show_access_list(callback: CallbackQuery):
|
||
"""Показать список пользователей с доступом (read-only для пользователей с доступом)"""
|
||
collection_id = callback.data.split(":")[2]
|
||
telegram_id = str(callback.from_user.id)
|
||
|
||
await callback.answer("Загружаю список доступа...")
|
||
|
||
access_list = await get_collection_access_list(collection_id, telegram_id)
|
||
|
||
response = "<b>Пользователи с доступом</b>\n\n"
|
||
|
||
if access_list:
|
||
for i, access in enumerate(access_list[:20], 1):
|
||
user = access.get("user", {})
|
||
user_telegram_id = user.get("telegram_id", "N/A")
|
||
role = user.get("role", "user")
|
||
response += f"{i}. <code>{user_telegram_id}</code> ({role})\n"
|
||
else:
|
||
response += "<i>Нет пользователей с доступом</i>\n"
|
||
|
||
keyboard = InlineKeyboardMarkup(inline_keyboard=[[
|
||
InlineKeyboardButton(text="Назад", callback_data=f"collection:{collection_id}")
|
||
]])
|
||
await callback.message.answer(response, parse_mode="HTML", reply_markup=keyboard)
|
||
|
||
|
||
@router.callback_query(lambda c: c.data.startswith("access:add:"))
|
||
async def add_access_prompt(callback: CallbackQuery, state: FSMContext):
|
||
"""Запросить пересылку сообщения для добавления доступа"""
|
||
collection_id = callback.data.split(":")[2]
|
||
telegram_id = str(callback.from_user.id)
|
||
|
||
await state.update_data(collection_id=collection_id)
|
||
await state.set_state(CollectionAccessStates.waiting_for_username)
|
||
|
||
await callback.message.answer(
|
||
"<b>Добавить доступ</b>\n\n"
|
||
"Перешлите любое сообщение от пользователя, которому нужно предоставить доступ.\n\n"
|
||
"<i>Просто перешлите сообщение от нужного пользователя.</i>",
|
||
parse_mode="HTML"
|
||
)
|
||
await callback.answer()
|
||
|
||
|
||
@router.message(StateFilter(CollectionAccessStates.waiting_for_username))
|
||
async def process_add_access(message: Message, state: FSMContext):
|
||
"""Обработать добавление доступа через пересылку сообщения"""
|
||
telegram_id = str(message.from_user.id)
|
||
data = await state.get_data()
|
||
collection_id = data.get("collection_id")
|
||
|
||
if not collection_id:
|
||
await message.answer("Ошибка: не указана коллекция")
|
||
await state.clear()
|
||
return
|
||
|
||
target_telegram_id = None
|
||
|
||
if message.forward_from:
|
||
target_telegram_id = str(message.forward_from.id)
|
||
elif message.forward_from_chat:
|
||
await message.answer(
|
||
"<b>Ошибка</b>\n\n"
|
||
"Пожалуйста, перешлите сообщение от пользователя, а не из группы или канала.",
|
||
parse_mode="HTML"
|
||
)
|
||
await state.clear()
|
||
return
|
||
elif message.forward_date:
|
||
await message.answer(
|
||
"<b>Информация о пересылке скрыта</b>\n\n"
|
||
"Пользователь скрыл информацию о пересылке в настройках приватности Telegram.\n\n"
|
||
"Попросите пользователя временно разрешить пересылку сообщений.",
|
||
parse_mode="HTML"
|
||
)
|
||
await state.clear()
|
||
return
|
||
else:
|
||
await message.answer(
|
||
"<b>Ошибка</b>\n\n"
|
||
"Пожалуйста, перешлите сообщение от пользователя, которому нужно предоставить доступ.\n\n"
|
||
"<i>Просто перешлите любое сообщение от нужного пользователя.</i>",
|
||
parse_mode="HTML"
|
||
)
|
||
await state.clear()
|
||
return
|
||
|
||
if not target_telegram_id:
|
||
await message.answer(
|
||
"<b>Ошибка</b>\n\n"
|
||
"Не удалось определить Telegram ID пользователя.",
|
||
parse_mode="HTML"
|
||
)
|
||
await state.clear()
|
||
return
|
||
|
||
print(f"DEBUG: Attempting to grant access: collection_id={collection_id}, target_telegram_id={target_telegram_id}, owner_telegram_id={telegram_id}")
|
||
result = await grant_collection_access(collection_id, target_telegram_id, telegram_id)
|
||
|
||
if result:
|
||
user_info = ""
|
||
if message.forward_from:
|
||
user_name = message.forward_from.first_name or ""
|
||
user_username = f"@{message.forward_from.username}" if message.forward_from.username else ""
|
||
user_info = f"{user_name} {user_username}".strip() or target_telegram_id
|
||
else:
|
||
user_info = target_telegram_id
|
||
|
||
await message.answer(
|
||
f"<b>Доступ предоставлен</b>\n\n"
|
||
f"Пользователю <code>{target_telegram_id}</code> предоставлен доступ к коллекции.\n\n"
|
||
f"Пользователь: {user_info}\n\n"
|
||
f"<i>Примечание: Если пользователь еще не взаимодействовал с ботом, он был автоматически создан в системе.</i>",
|
||
parse_mode="HTML"
|
||
)
|
||
else:
|
||
await message.answer(
|
||
"<b>Ошибка</b>\n\n"
|
||
"Не удалось предоставить доступ. Возможно:\n"
|
||
"• Доступ уже предоставлен\n"
|
||
"• Произошла ошибка на сервере\n"
|
||
"• Вы не являетесь владельцем коллекции\n\n"
|
||
"Проверьте логи сервера для получения подробной информации.",
|
||
parse_mode="HTML"
|
||
)
|
||
|
||
await state.clear()
|
||
|
||
|
||
@router.callback_query(lambda c: c.data.startswith("access:remove:"))
|
||
async def remove_access(callback: CallbackQuery):
|
||
"""Удалить доступ пользователя"""
|
||
parts = callback.data.split(":")
|
||
collection_id = parts[2]
|
||
target_telegram_id = parts[3]
|
||
owner_telegram_id = str(callback.from_user.id)
|
||
|
||
await callback.answer("Удаляю доступ...")
|
||
|
||
result = await revoke_collection_access(collection_id, target_telegram_id, owner_telegram_id)
|
||
|
||
if result:
|
||
await callback.message.answer(
|
||
f"<b>Доступ отозван</b>\n\n"
|
||
f"Доступ пользователя <code>{target_telegram_id}</code> отозван.",
|
||
parse_mode="HTML"
|
||
)
|
||
else:
|
||
await callback.message.answer(
|
||
"<b>Ошибка</b>\n\n"
|
||
"Не удалось отозвать доступ.",
|
||
parse_mode="HTML"
|
||
)
|
||
|
||
|
||
@router.callback_query(lambda c: c.data.startswith("collection:edit:"))
|
||
async def edit_collection_prompt(callback: CallbackQuery, state: FSMContext):
|
||
"""Запросить данные для редактирования коллекции"""
|
||
collection_id = callback.data.split(":")[2]
|
||
telegram_id = str(callback.from_user.id)
|
||
|
||
collection_info = await get_collection_info(collection_id, telegram_id)
|
||
if not collection_info:
|
||
await callback.message.answer(
|
||
"<b>Ошибка</b>\n\nНе удалось загрузить информацию о коллекции.",
|
||
parse_mode="HTML"
|
||
)
|
||
await callback.answer()
|
||
return
|
||
|
||
await state.update_data(collection_id=collection_id)
|
||
await state.set_state(CollectionEditStates.waiting_for_name)
|
||
|
||
await callback.message.answer(
|
||
"<b>Редактирование коллекции</b>\n\n"
|
||
"Отправьте новое название коллекции или /skip чтобы оставить текущее.\n\n"
|
||
f"Текущее название: <b>{collection_info.get('name', 'Без названия')}</b>",
|
||
parse_mode="HTML"
|
||
)
|
||
await callback.answer()
|
||
|
||
|
||
@router.message(StateFilter(CollectionEditStates.waiting_for_name))
|
||
async def process_edit_collection_name(message: Message, state: FSMContext):
|
||
"""Обработать новое название коллекции"""
|
||
telegram_id = str(message.from_user.id)
|
||
data = await state.get_data()
|
||
collection_id = data.get("collection_id")
|
||
|
||
if message.text and message.text.strip() == "/skip":
|
||
new_name = None
|
||
else:
|
||
new_name = message.text.strip() if message.text else None
|
||
|
||
await state.update_data(name=new_name)
|
||
await state.set_state(CollectionEditStates.waiting_for_description)
|
||
|
||
collection_info = await get_collection_info(collection_id, telegram_id)
|
||
current_description = collection_info.get("description", "") if collection_info else ""
|
||
|
||
await message.answer(
|
||
"<b>Описание коллекции</b>\n\n"
|
||
"Отправьте новое описание коллекции или /skip чтобы оставить текущее.\n\n"
|
||
f"Текущее описание: <i>{current_description[:100] if current_description else 'Нет описания'}...</i>",
|
||
parse_mode="HTML"
|
||
)
|
||
|
||
|
||
@router.message(StateFilter(CollectionEditStates.waiting_for_description))
|
||
async def process_edit_collection_description(message: Message, state: FSMContext):
|
||
"""Обработать новое описание коллекции"""
|
||
telegram_id = str(message.from_user.id)
|
||
data = await state.get_data()
|
||
collection_id = data.get("collection_id")
|
||
name = data.get("name")
|
||
|
||
if message.text and message.text.strip() == "/skip":
|
||
new_description = None
|
||
else:
|
||
new_description = message.text.strip() if message.text else None
|
||
|
||
try:
|
||
update_data = {}
|
||
if name:
|
||
update_data["name"] = name
|
||
if new_description:
|
||
update_data["description"] = new_description
|
||
|
||
async with create_http_session() as session:
|
||
async with session.put(
|
||
f"{settings.BACKEND_URL}/collections/{collection_id}",
|
||
json=update_data,
|
||
headers={"X-Telegram-ID": telegram_id}
|
||
) as response:
|
||
if response.status == 200:
|
||
await message.answer(
|
||
"<b>Коллекция обновлена</b>\n\n"
|
||
"Изменения сохранены.",
|
||
parse_mode="HTML"
|
||
)
|
||
else:
|
||
error_text = await response.text()
|
||
await message.answer(
|
||
f"<b>Ошибка</b>\n\n"
|
||
f"Не удалось обновить коллекцию: {error_text}",
|
||
parse_mode="HTML"
|
||
)
|
||
except Exception as e:
|
||
await message.answer(
|
||
f"<b>Ошибка</b>\n\n"
|
||
f"Произошла ошибка: {str(e)}",
|
||
parse_mode="HTML"
|
||
)
|
||
|
||
await state.clear()
|
||
|
||
|
||
@router.callback_query(lambda c: c.data == "collections:list")
|
||
async def back_to_collections(callback: CallbackQuery):
|
||
"""Вернуться к списку коллекций"""
|
||
telegram_id = str(callback.from_user.id)
|
||
collections = await get_user_collections(telegram_id)
|
||
|
||
if not collections:
|
||
await callback.message.answer(
|
||
"<b>У вас пока нет коллекций</b>\n\n"
|
||
"Обратитесь к администратору для создания коллекций и добавления документов.",
|
||
parse_mode="HTML"
|
||
)
|
||
return
|
||
|
||
response = "<b>Ваши коллекции документов:</b>\n\n"
|
||
keyboard_buttons = []
|
||
|
||
for i, collection in enumerate(collections[:10], 1):
|
||
name = collection.get("name", "Без названия")
|
||
description = collection.get("description", "")
|
||
collection_id = collection.get("collection_id")
|
||
|
||
response += f"{i}. <b>{name}</b>\n"
|
||
if description:
|
||
response += f" <i>{description[:50]}...</i>\n"
|
||
response += f" ID: <code>{collection_id}</code>\n\n"
|
||
|
||
keyboard_buttons.append([
|
||
InlineKeyboardButton(
|
||
text=f"{name}",
|
||
callback_data=f"collection:{collection_id}"
|
||
)
|
||
])
|
||
|
||
keyboard = InlineKeyboardMarkup(inline_keyboard=keyboard_buttons)
|
||
response += "<i>Нажмите на коллекцию, чтобы посмотреть документы</i>"
|
||
|
||
await callback.message.answer(response, parse_mode="HTML", reply_markup=keyboard)
|
||
|
||
|