from aiogram import Router, F
from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery
from aiogram.filters import Command, StateFilter
from aiogram.fsm.context import FSMContext
import aiohttp
from urllib.parse import unquote
from tg_bot.config.settings import settings
from tg_bot.infrastructure.http_client import create_http_session
from tg_bot.infrastructure.telegram.states.collection_states import (
CollectionAccessStates,
CollectionEditStates
)
def decode_title(title: str) -> str:
if not title:
return "Без названия"
try:
decoded = unquote(title)
if decoded != title or '%' not in title:
return decoded
return title
except Exception:
return title
router = Router()
async def get_user_collections(telegram_id: str):
try:
async with create_http_session() as session:
async with session.get(
f"{settings.BACKEND_URL}/collections/",
headers={"X-Telegram-ID": telegram_id}
) as response:
if response.status == 200:
return await response.json()
return []
except Exception as e:
print(f"Error getting collections: {e}")
return []
async def get_collection_documents(collection_id: str, telegram_id: str):
try:
collection_id = str(collection_id).strip()
url = f"{settings.BACKEND_URL}/documents/collection/{collection_id}"
print(f"DEBUG get_collection_documents: URL={url}, collection_id={collection_id}, telegram_id={telegram_id}")
async with create_http_session() as session:
async with session.get(
url,
headers={"X-Telegram-ID": telegram_id}
) as response:
if response.status == 200:
return await response.json()
elif response.status == 422:
error_text = await response.text()
print(f"Validation error getting documents: {response.status} - {error_text}, collection_id: {collection_id}, URL: {url}")
return []
else:
error_text = await response.text()
print(f"Error getting documents: {response.status} - {error_text}, collection_id: {collection_id}, URL: {url}")
return []
except Exception as e:
print(f"Exception getting documents: {e}, collection_id: {collection_id}, type: {type(collection_id)}")
import traceback
traceback.print_exc()
return []
async def search_in_collection(collection_id: str, query: str, telegram_id: str):
try:
async with create_http_session() as session:
async with session.get(
f"{settings.BACKEND_URL}/documents/collection/{collection_id}",
params={"search": query},
headers={"X-Telegram-ID": telegram_id}
) as response:
if response.status == 200:
return await response.json()
return []
except Exception as e:
print(f"Error searching: {e}")
return []
async def get_collection_info(collection_id: str, telegram_id: str):
"""Получить информацию о коллекции"""
try:
collection_id = str(collection_id).strip()
url = f"{settings.BACKEND_URL}/collections/{collection_id}"
print(f"DEBUG get_collection_info: URL={url}, collection_id={collection_id}, telegram_id={telegram_id}")
async with create_http_session() as session:
async with session.get(
url,
headers={"X-Telegram-ID": telegram_id}
) as response:
if response.status == 200:
return await response.json()
elif response.status == 422:
error_text = await response.text()
print(f"Validation error getting collection info: {response.status} - {error_text}, collection_id: {collection_id}, URL: {url}")
return None
else:
error_text = await response.text()
print(f"Error getting collection info: {response.status} - {error_text}, collection_id: {collection_id}, URL: {url}")
return None
except Exception as e:
print(f"Exception getting collection info: {e}, collection_id: {collection_id}, type: {type(collection_id)}")
import traceback
traceback.print_exc()
return None
async def get_collection_access_list(collection_id: str, telegram_id: str):
"""Получить список пользователей с доступом к коллекции"""
try:
async with create_http_session() as session:
async with session.get(
f"{settings.BACKEND_URL}/collections/{collection_id}/access",
headers={"X-Telegram-ID": telegram_id}
) as response:
if response.status == 200:
return await response.json()
return []
except Exception as e:
print(f"Error getting access list: {e}")
return []
async def grant_collection_access(collection_id: str, telegram_id: str, owner_telegram_id: str):
"""Предоставить доступ к коллекции"""
try:
url = f"{settings.BACKEND_URL}/collections/{collection_id}/access/telegram/{telegram_id}"
print(f"DEBUG grant_collection_access: URL={url}, target_telegram_id={telegram_id}, owner_telegram_id={owner_telegram_id}")
async with create_http_session() as session:
async with session.post(
url,
headers={"X-Telegram-ID": owner_telegram_id}
) as response:
if response.status == 201:
result = await response.json()
print(f"DEBUG: Access granted successfully: {result}")
return result
else:
error_text = await response.text()
print(f"ERROR granting access: status={response.status}, error={error_text}, target_telegram_id={telegram_id}")
return None
except Exception as e:
print(f"Exception granting access: {e}, target_telegram_id={telegram_id}")
import traceback
traceback.print_exc()
return None
async def revoke_collection_access(collection_id: str, telegram_id: str, owner_telegram_id: str):
"""Отозвать доступ к коллекции"""
try:
async with create_http_session() as session:
async with session.delete(
f"{settings.BACKEND_URL}/collections/{collection_id}/access/telegram/{telegram_id}",
headers={"X-Telegram-ID": owner_telegram_id}
) as response:
return response.status == 204
except Exception as e:
print(f"Error revoking access: {e}")
return False
@router.message(Command("mycollections"))
async def cmd_mycollections(message: Message):
telegram_id = str(message.from_user.id)
collections = await get_user_collections(telegram_id)
if not collections:
await message.answer(
"У вас пока нет коллекций\n\n"
"Обратитесь к администратору для создания коллекций и добавления документов.",
parse_mode="HTML"
)
return
response = "Ваши коллекции документов:\n\n"
keyboard_buttons = []
for i, collection in enumerate(collections[:10], 1):
name = collection.get("name", "Без названия")
description = collection.get("description", "")
collection_id = collection.get("collection_id")
response += f"{i}. {name}\n"
if description:
response += f" {description[:50]}...\n"
response += f" ID: {collection_id}\n\n"
keyboard_buttons.append([
InlineKeyboardButton(
text=f"{name}",
callback_data=f"collection:{collection_id}"
)
])
keyboard = InlineKeyboardMarkup(inline_keyboard=keyboard_buttons)
response += "Нажмите на коллекцию, чтобы посмотреть документы"
await message.answer(response, parse_mode="HTML", reply_markup=keyboard)
@router.message(Command("search"))
async def cmd_search(message: Message):
parts = message.text.split(maxsplit=2)
if len(parts) < 3:
telegram_id = str(message.from_user.id)
collections = await get_user_collections(telegram_id)
if not collections:
await message.answer(
"Использование: /search <collection_id> <запрос>\n\n"
"У вас пока нет коллекций. Обратитесь к администратору.",
parse_mode="HTML"
)
return
response = "Выберите коллекцию для поиска:\n\n"
response += "Использование: /search <collection_id> <запрос>\n\n"
response += "Доступные коллекции:\n"
for collection in collections[:5]:
name = collection.get("name", "Без названия")
collection_id = collection.get("collection_id")
response += f"• {name}\n {collection_id}\n\n"
response += "Пример: /search " + collections[0].get("collection_id", "")[:8] + "... Как оформить договор?"
await message.answer(response, parse_mode="HTML")
return
collection_id = parts[1]
query = parts[2]
telegram_id = str(message.from_user.id)
await message.bot.send_chat_action(message.chat.id, "typing")
results = await search_in_collection(collection_id, query, telegram_id)
if not results:
await message.answer(
f"Ничего не найдено\n\n"
f"По запросу \"{query}\" в коллекции ничего не найдено.\n\n"
f"Попробуйте другой запрос или используйте /mycollections для просмотра доступных коллекций.",
parse_mode="HTML"
)
return
response = f"Результаты поиска: \"{query}\"\n\n"
for i, doc in enumerate(results[:5], 1):
title = decode_title(doc.get("title", "Без названия"))
content = doc.get("content", "")[:200]
response += f"{i}. {title}\n"
response += f" {content}...\n\n"
await message.answer(response, parse_mode="HTML")
@router.callback_query(lambda c: c.data.startswith("collection:") and not c.data.startswith("collection:documents:") and not c.data.startswith("collection:edit:") and not c.data.startswith("collection:access:") and not c.data.startswith("collection:view_access:"))
async def show_collection_menu(callback: CallbackQuery):
"""Показать меню коллекции с опциями в зависимости от прав"""
parts = callback.data.split(":", 1)
if len(parts) < 2:
await callback.message.answer(
"Ошибка\n\nНеверный формат данных.",
parse_mode="HTML"
)
await callback.answer()
return
collection_id = parts[1]
telegram_id = str(callback.from_user.id)
print(f"DEBUG: collection_id from callback (menu): {collection_id}, callback_data: {callback.data}")
await callback.answer("Загружаю информацию...")
collection_info = await get_collection_info(collection_id, telegram_id)
if not collection_info:
await callback.message.answer(
"Ошибка\n\nНе удалось загрузить информацию о коллекции.",
parse_mode="HTML"
)
return
owner_id = collection_info.get("owner_id")
collection_name = collection_info.get("name", "Коллекция")
try:
async with create_http_session() as session:
async with session.get(
f"{settings.BACKEND_URL}/users/telegram/{telegram_id}"
) as response:
if response.status == 200:
user_info = await response.json()
current_user_id = user_info.get("user_id")
is_owner = str(owner_id) == str(current_user_id)
else:
is_owner = False
except:
is_owner = False
keyboard_buttons = []
collection_id_str = str(collection_id)
if is_owner:
keyboard_buttons = [
[InlineKeyboardButton(text="Просмотр документов", callback_data=f"collection:documents:{collection_id_str}")],
[InlineKeyboardButton(text="Редактировать коллекцию", callback_data=f"collection:edit:{collection_id_str}")],
[InlineKeyboardButton(text="Управление доступом", callback_data=f"collection:access:{collection_id_str}")],
[InlineKeyboardButton(text="Загрузить документ", callback_data=f"document:upload:{collection_id_str}")],
[InlineKeyboardButton(text="Назад к коллекциям", callback_data="collections:list")]
]
else:
keyboard_buttons = [
[InlineKeyboardButton(text="Просмотр документов", callback_data=f"collection:documents:{collection_id_str}")],
[InlineKeyboardButton(text="Просмотр доступа", callback_data=f"collection:view_access:{collection_id_str}")],
[InlineKeyboardButton(text="Назад к коллекциям", callback_data="collections:list")]
]
keyboard = InlineKeyboardMarkup(inline_keyboard=keyboard_buttons)
role_text = "Владелец" if is_owner else "Доступ"
response = f"{collection_name}\n\n"
response += f"{role_text}\n\n"
response += f"ID: {collection_id}\n\n"
response += "Выберите действие:"
await callback.message.answer(response, parse_mode="HTML", reply_markup=keyboard)
@router.callback_query(lambda c: c.data.startswith("collection:documents:"))
async def show_collection_documents(callback: CallbackQuery):
"""Показать документы коллекции"""
try:
parts = callback.data.split(":", 2)
if len(parts) < 3:
raise ValueError("Неверный формат callback_data")
collection_id = parts[2]
telegram_id = str(callback.from_user.id)
print(f"DEBUG: collection_id from callback: {collection_id}, callback_data: {callback.data}")
await callback.answer("Загружаю документы...")
collection_info = await get_collection_info(collection_id, telegram_id)
if not collection_info:
await callback.message.answer(
"Ошибка\n\nНе удалось загрузить информацию о коллекции. Проверьте, что у вас есть доступ к этой коллекции.",
parse_mode="HTML"
)
return
documents = await get_collection_documents(collection_id, telegram_id)
if not documents:
await callback.message.answer(
f"Коллекция пуста\n\n"
f"В этой коллекции пока нет документов.",
parse_mode="HTML"
)
return
except IndexError:
await callback.message.answer(
"Ошибка\n\nНеверный формат данных.",
parse_mode="HTML"
)
await callback.answer()
return
except Exception as e:
print(f"Error in show_collection_documents: {e}")
await callback.message.answer(
f"Ошибка\n\nПроизошла ошибка при загрузке документов: {str(e)}",
parse_mode="HTML"
)
await callback.answer()
return
response = f"Документы в коллекции:\n\n"
keyboard_buttons = []
for i, doc in enumerate(documents[:10], 1):
doc_id = doc.get("document_id")
title = decode_title(doc.get("title", "Без названия"))
content_preview = doc.get("content", "")[:100]
response += f"{i}. {title}\n"
if content_preview:
response += f" {content_preview}...\n"
response += "\n"
keyboard_buttons.append([
InlineKeyboardButton(
text=f"{title[:30]}",
callback_data=f"document:view:{doc_id}"
)
])
if len(documents) > 10:
response += f"\nПоказано 10 из {len(documents)} документов"
collection_id_for_back = str(collection_info.get("collection_id", collection_id))
keyboard_buttons.append([
InlineKeyboardButton(text="Назад", callback_data=f"collection:{collection_id_for_back}")
])
keyboard = InlineKeyboardMarkup(inline_keyboard=keyboard_buttons)
await callback.message.answer(response, parse_mode="HTML", reply_markup=keyboard)
@router.callback_query(lambda c: c.data.startswith("collection:access:"))
async def show_access_management(callback: CallbackQuery):
"""Показать меню управления доступом (только для владельца)"""
collection_id = callback.data.split(":")[2]
telegram_id = str(callback.from_user.id)
await callback.answer("Загружаю список доступа...")
access_list = await get_collection_access_list(collection_id, telegram_id)
response = "Управление доступом\n\n"
response += "Пользователи с доступом:\n\n"
keyboard_buttons = []
if access_list:
for i, access in enumerate(access_list[:10], 1):
user = access.get("user", {})
user_telegram_id = user.get("telegram_id", "N/A")
role = user.get("role", "user")
response += f"{i}. {user_telegram_id} ({role})\n"
keyboard_buttons.append([
InlineKeyboardButton(
text=f" Удалить {user_telegram_id}",
callback_data=f"access:remove:{collection_id}:{user_telegram_id}"
)
])
else:
response += "Нет пользователей с доступом\n\n"
keyboard_buttons.extend([
[InlineKeyboardButton(text="Добавить доступ", callback_data=f"access:add:{collection_id}")],
[InlineKeyboardButton(text="Назад", callback_data=f"collection:{collection_id}")]
])
keyboard = InlineKeyboardMarkup(inline_keyboard=keyboard_buttons)
await callback.message.answer(response, parse_mode="HTML", reply_markup=keyboard)
@router.callback_query(lambda c: c.data.startswith("collection:view_access:"))
async def show_access_list(callback: CallbackQuery):
"""Показать список пользователей с доступом (read-only для пользователей с доступом)"""
collection_id = callback.data.split(":")[2]
telegram_id = str(callback.from_user.id)
await callback.answer("Загружаю список доступа...")
access_list = await get_collection_access_list(collection_id, telegram_id)
response = "Пользователи с доступом\n\n"
if access_list:
for i, access in enumerate(access_list[:20], 1):
user = access.get("user", {})
user_telegram_id = user.get("telegram_id", "N/A")
role = user.get("role", "user")
response += f"{i}. {user_telegram_id} ({role})\n"
else:
response += "Нет пользователей с доступом\n"
keyboard = InlineKeyboardMarkup(inline_keyboard=[[
InlineKeyboardButton(text="Назад", callback_data=f"collection:{collection_id}")
]])
await callback.message.answer(response, parse_mode="HTML", reply_markup=keyboard)
@router.callback_query(lambda c: c.data.startswith("access:add:"))
async def add_access_prompt(callback: CallbackQuery, state: FSMContext):
"""Запросить пересылку сообщения для добавления доступа"""
collection_id = callback.data.split(":")[2]
telegram_id = str(callback.from_user.id)
await state.update_data(collection_id=collection_id)
await state.set_state(CollectionAccessStates.waiting_for_username)
await callback.message.answer(
"Добавить доступ\n\n"
"Перешлите любое сообщение от пользователя, которому нужно предоставить доступ.\n\n"
"Просто перешлите сообщение от нужного пользователя.",
parse_mode="HTML"
)
await callback.answer()
@router.message(StateFilter(CollectionAccessStates.waiting_for_username))
async def process_add_access(message: Message, state: FSMContext):
"""Обработать добавление доступа через пересылку сообщения"""
telegram_id = str(message.from_user.id)
data = await state.get_data()
collection_id = data.get("collection_id")
if not collection_id:
await message.answer("Ошибка: не указана коллекция")
await state.clear()
return
target_telegram_id = None
if message.forward_from:
target_telegram_id = str(message.forward_from.id)
elif message.forward_from_chat:
await message.answer(
"Ошибка\n\n"
"Пожалуйста, перешлите сообщение от пользователя, а не из группы или канала.",
parse_mode="HTML"
)
await state.clear()
return
elif message.forward_date:
await message.answer(
"Информация о пересылке скрыта\n\n"
"Пользователь скрыл информацию о пересылке в настройках приватности Telegram.\n\n"
"Попросите пользователя временно разрешить пересылку сообщений.",
parse_mode="HTML"
)
await state.clear()
return
else:
await message.answer(
"Ошибка\n\n"
"Пожалуйста, перешлите сообщение от пользователя, которому нужно предоставить доступ.\n\n"
"Просто перешлите любое сообщение от нужного пользователя.",
parse_mode="HTML"
)
await state.clear()
return
if not target_telegram_id:
await message.answer(
"Ошибка\n\n"
"Не удалось определить Telegram ID пользователя.",
parse_mode="HTML"
)
await state.clear()
return
print(f"DEBUG: Attempting to grant access: collection_id={collection_id}, target_telegram_id={target_telegram_id}, owner_telegram_id={telegram_id}")
result = await grant_collection_access(collection_id, target_telegram_id, telegram_id)
if result:
user_info = ""
if message.forward_from:
user_name = message.forward_from.first_name or ""
user_username = f"@{message.forward_from.username}" if message.forward_from.username else ""
user_info = f"{user_name} {user_username}".strip() or target_telegram_id
else:
user_info = target_telegram_id
await message.answer(
f"Доступ предоставлен\n\n"
f"Пользователю {target_telegram_id} предоставлен доступ к коллекции.\n\n"
f"Пользователь: {user_info}\n\n"
f"Примечание: Если пользователь еще не взаимодействовал с ботом, он был автоматически создан в системе.",
parse_mode="HTML"
)
else:
await message.answer(
"Ошибка\n\n"
"Не удалось предоставить доступ. Возможно:\n"
"• Доступ уже предоставлен\n"
"• Произошла ошибка на сервере\n"
"• Вы не являетесь владельцем коллекции\n\n"
"Проверьте логи сервера для получения подробной информации.",
parse_mode="HTML"
)
await state.clear()
@router.callback_query(lambda c: c.data.startswith("access:remove:"))
async def remove_access(callback: CallbackQuery):
"""Удалить доступ пользователя"""
parts = callback.data.split(":")
collection_id = parts[2]
target_telegram_id = parts[3]
owner_telegram_id = str(callback.from_user.id)
await callback.answer("Удаляю доступ...")
result = await revoke_collection_access(collection_id, target_telegram_id, owner_telegram_id)
if result:
await callback.message.answer(
f"Доступ отозван\n\n"
f"Доступ пользователя {target_telegram_id} отозван.",
parse_mode="HTML"
)
else:
await callback.message.answer(
"Ошибка\n\n"
"Не удалось отозвать доступ.",
parse_mode="HTML"
)
@router.callback_query(lambda c: c.data.startswith("collection:edit:"))
async def edit_collection_prompt(callback: CallbackQuery, state: FSMContext):
"""Запросить данные для редактирования коллекции"""
collection_id = callback.data.split(":")[2]
telegram_id = str(callback.from_user.id)
collection_info = await get_collection_info(collection_id, telegram_id)
if not collection_info:
await callback.message.answer(
"Ошибка\n\nНе удалось загрузить информацию о коллекции.",
parse_mode="HTML"
)
await callback.answer()
return
await state.update_data(collection_id=collection_id)
await state.set_state(CollectionEditStates.waiting_for_name)
await callback.message.answer(
"Редактирование коллекции\n\n"
"Отправьте новое название коллекции или /skip чтобы оставить текущее.\n\n"
f"Текущее название: {collection_info.get('name', 'Без названия')}",
parse_mode="HTML"
)
await callback.answer()
@router.message(StateFilter(CollectionEditStates.waiting_for_name))
async def process_edit_collection_name(message: Message, state: FSMContext):
"""Обработать новое название коллекции"""
telegram_id = str(message.from_user.id)
data = await state.get_data()
collection_id = data.get("collection_id")
if message.text and message.text.strip() == "/skip":
new_name = None
else:
new_name = message.text.strip() if message.text else None
await state.update_data(name=new_name)
await state.set_state(CollectionEditStates.waiting_for_description)
collection_info = await get_collection_info(collection_id, telegram_id)
current_description = collection_info.get("description", "") if collection_info else ""
await message.answer(
"Описание коллекции\n\n"
"Отправьте новое описание коллекции или /skip чтобы оставить текущее.\n\n"
f"Текущее описание: {current_description[:100] if current_description else 'Нет описания'}...",
parse_mode="HTML"
)
@router.message(StateFilter(CollectionEditStates.waiting_for_description))
async def process_edit_collection_description(message: Message, state: FSMContext):
"""Обработать новое описание коллекции"""
telegram_id = str(message.from_user.id)
data = await state.get_data()
collection_id = data.get("collection_id")
name = data.get("name")
if message.text and message.text.strip() == "/skip":
new_description = None
else:
new_description = message.text.strip() if message.text else None
try:
update_data = {}
if name:
update_data["name"] = name
if new_description:
update_data["description"] = new_description
async with create_http_session() as session:
async with session.put(
f"{settings.BACKEND_URL}/collections/{collection_id}",
json=update_data,
headers={"X-Telegram-ID": telegram_id}
) as response:
if response.status == 200:
await message.answer(
"Коллекция обновлена\n\n"
"Изменения сохранены.",
parse_mode="HTML"
)
else:
error_text = await response.text()
await message.answer(
f"Ошибка\n\n"
f"Не удалось обновить коллекцию: {error_text}",
parse_mode="HTML"
)
except Exception as e:
await message.answer(
f"Ошибка\n\n"
f"Произошла ошибка: {str(e)}",
parse_mode="HTML"
)
await state.clear()
@router.callback_query(lambda c: c.data == "collections:list")
async def back_to_collections(callback: CallbackQuery):
"""Вернуться к списку коллекций"""
telegram_id = str(callback.from_user.id)
collections = await get_user_collections(telegram_id)
if not collections:
await callback.message.answer(
"У вас пока нет коллекций\n\n"
"Обратитесь к администратору для создания коллекций и добавления документов.",
parse_mode="HTML"
)
return
response = "Ваши коллекции документов:\n\n"
keyboard_buttons = []
for i, collection in enumerate(collections[:10], 1):
name = collection.get("name", "Без названия")
description = collection.get("description", "")
collection_id = collection.get("collection_id")
response += f"{i}. {name}\n"
if description:
response += f" {description[:50]}...\n"
response += f" ID: {collection_id}\n\n"
keyboard_buttons.append([
InlineKeyboardButton(
text=f"{name}",
callback_data=f"collection:{collection_id}"
)
])
keyboard = InlineKeyboardMarkup(inline_keyboard=keyboard_buttons)
response += "Нажмите на коллекцию, чтобы посмотреть документы"
await callback.message.answer(response, parse_mode="HTML", reply_markup=keyboard)