from aiogram import Router, types
from aiogram.types import Message
from datetime import datetime
import aiohttp
from tg_bot.config.settings import settings
from tg_bot.infrastructure.database.database import SessionLocal
from tg_bot.infrastructure.database.models import UserModel
from tg_bot.application.services.rag_service import RAGService
router = Router()
BACKEND_URL = "http://localhost:8001/api/v1"
rag_service = RAGService()
@router.message()
async def handle_question(message: Message):
user_id = message.from_user.id
question_text = message.text.strip()
if question_text.startswith('/'):
return
session = SessionLocal()
try:
user = session.query(UserModel).filter_by(
telegram_id=str(user_id)
).first()
if not user:
user = UserModel(
telegram_id=str(user_id),
username=message.from_user.username or "",
first_name=message.from_user.first_name or "",
last_name=message.from_user.last_name or ""
)
session.add(user)
session.commit()
await ensure_user_in_backend(str(user_id), message.from_user)
if user.is_premium:
await process_premium_question(message, user, question_text, session)
elif user.questions_used < settings.FREE_QUESTIONS_LIMIT:
await process_free_question(message, user, question_text, session)
else:
await handle_limit_exceeded(message, user)
except Exception as e:
print(f"Error processing question: {e}")
await message.answer(
"Произошла ошибка. Попробуйте позже.",
parse_mode="HTML"
)
finally:
session.close()
async def ensure_user_in_backend(telegram_id: str, telegram_user):
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{BACKEND_URL}/users/telegram/{telegram_id}"
) as response:
if response.status == 200:
return
async with session.post(
f"{BACKEND_URL}/users",
json={"telegram_id": telegram_id, "role": "user"}
) as create_response:
if create_response.status in [200, 201]:
print(f"Пользователь {telegram_id} создан в backend")
except Exception as e:
print(f"Error creating user in backend: {e}")
async def process_premium_question(message: Message, user: UserModel, question_text: str, session):
user.questions_used += 1
session.commit()
await message.bot.send_chat_action(message.chat.id, "typing")
try:
rag_result = await rag_service.generate_answer_with_rag(
question_text,
str(message.from_user.id)
)
answer = rag_result.get("answer", "Извините, не удалось сгенерировать ответ.")
sources = rag_result.get("sources", [])
await save_conversation_to_backend(
str(message.from_user.id),
question_text,
answer,
sources
)
response = (
f"Ваш вопрос:\n"
f"{question_text[:200]}\n\n"
f"Ответ:\n{answer}\n\n"
)
if sources:
response += f"Источники из коллекций:\n"
collections_used = {}
for source in sources[:5]:
collection_name = source.get('collection', 'Неизвестно')
if collection_name not in collections_used:
collections_used[collection_name] = []
collections_used[collection_name].append(source.get('title', 'Без названия'))
for i, (collection_name, titles) in enumerate(collections_used.items(), 1):
response += f"{i}. Коллекция: {collection_name}\n"
for title in titles[:2]:
response += f" • {title}\n"
response += "\nИспользуйте /mycollections для просмотра всех коллекций\n\n"
response += (
f"Статус: Premium (вопросов безлимитно)\n"
f"Всего вопросов: {user.questions_used}"
)
except Exception as e:
print(f"Error generating answer: {e}")
response = (
f"Ваш вопрос:\n"
f"{question_text[:200]}\n\n"
f"Ошибка при генерации ответа. Попробуйте позже.\n\n"
f"Статус: Premium\n"
f"Всего вопросов: {user.questions_used}"
)
await message.answer(response, parse_mode="HTML")
async def process_free_question(message: Message, user: UserModel, question_text: str, session):
user.questions_used += 1
remaining = settings.FREE_QUESTIONS_LIMIT - user.questions_used
session.commit()
await message.bot.send_chat_action(message.chat.id, "typing")
try:
rag_result = await rag_service.generate_answer_with_rag(
question_text,
str(message.from_user.id)
)
answer = rag_result.get("answer", "Извините, не удалось сгенерировать ответ.")
sources = rag_result.get("sources", [])
await save_conversation_to_backend(
str(message.from_user.id),
question_text,
answer,
sources
)
response = (
f"Ваш вопрос:\n"
f"{question_text[:200]}\n\n"
f"Ответ:\n{answer}\n\n"
)
if sources:
response += f"Источники из коллекций:\n"
collections_used = {}
for source in sources[:5]:
collection_name = source.get('collection', 'Неизвестно')
if collection_name not in collections_used:
collections_used[collection_name] = []
collections_used[collection_name].append(source.get('title', 'Без названия'))
for i, (collection_name, titles) in enumerate(collections_used.items(), 1):
response += f"{i}. Коллекция: {collection_name}\n"
for title in titles[:2]:
response += f" • {title}\n"
response += "\nИспользуйте /mycollections для просмотра всех коллекций\n\n"
response += (
f"Статус: Бесплатный доступ\n"
f"Использовано вопросов: {user.questions_used}/{settings.FREE_QUESTIONS_LIMIT}\n"
f"Осталось бесплатных: {remaining}\n\n"
)
if remaining <= 3 and remaining > 0:
response += f"Осталось мало вопросов! Для продолжения используйте /buy\n\n"
response += f"Для безлимитного доступа: /buy"
except Exception as e:
print(f"Error generating answer: {e}")
response = (
f"Ваш вопрос:\n"
f"{question_text[:200]}\n\n"
f"Ошибка при генерации ответа. Попробуйте позже.\n\n"
f"Статус: Бесплатный доступ\n"
f"Использовано вопросов: {user.questions_used}/{settings.FREE_QUESTIONS_LIMIT}\n"
f"Осталось бесплатных: {remaining}\n\n"
f"Для безлимитного доступа: /buy"
)
await message.answer(response, parse_mode="HTML")
async def save_conversation_to_backend(telegram_id: str, question: str, answer: str, sources: list):
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{BACKEND_URL}/users/telegram/{telegram_id}"
) as user_response:
if user_response.status != 200:
return
user_data = await user_response.json()
user_uuid = user_data.get("user_id")
async with session.get(
f"{BACKEND_URL}/collections/",
headers={"X-Telegram-ID": telegram_id}
) as collections_response:
collections = []
if collections_response.status == 200:
collections = await collections_response.json()
collection_id = None
if collections:
collection_id = collections[0].get("collection_id")
else:
async with session.post(
f"{BACKEND_URL}/collections",
json={
"name": "Основная коллекция",
"description": "Коллекция по умолчанию",
"is_public": False
},
headers={"X-Telegram-ID": telegram_id}
) as create_collection_response:
if create_collection_response.status in [200, 201]:
collection_data = await create_collection_response.json()
collection_id = collection_data.get("collection_id")
if not collection_id:
return
async with session.post(
f"{BACKEND_URL}/conversations",
json={"collection_id": str(collection_id)},
headers={"X-Telegram-ID": telegram_id}
) as conversation_response:
if conversation_response.status not in [200, 201]:
return
conversation_data = await conversation_response.json()
conversation_id = conversation_data.get("conversation_id")
if not conversation_id:
return
await session.post(
f"{BACKEND_URL}/messages",
json={
"conversation_id": str(conversation_id),
"content": question,
"role": "user"
},
headers={"X-Telegram-ID": telegram_id}
)
await session.post(
f"{BACKEND_URL}/messages",
json={
"conversation_id": str(conversation_id),
"content": answer,
"role": "assistant",
"sources": {"documents": sources}
},
headers={"X-Telegram-ID": telegram_id}
)
except Exception as e:
print(f"Error saving conversation: {e}")
async def handle_limit_exceeded(message: Message, user: UserModel):
response = (
f"Лимит бесплатных вопросов исчерпан!\n\n"
f"Ваша статистика:\n"
f"• Использовано вопросов: {user.questions_used}\n"
f"• Бесплатный лимит: {settings.FREE_QUESTIONS_LIMIT}\n\n"
f"Что делать дальше?\n"
f"1. Купите подписку командой /buy\n"
f"2. Получите неограниченный доступ к вопросам\n"
f"3. Продолжайте использовать бот без ограничений\n\n"
f"Подписка включает:\n"
f"• Неограниченное количество вопросов\n"
f"• Приоритетную обработку\n"
f"• Доступ ко всем функциям\n\n"
f"Нажмите /buy чтобы продолжить"
)
await message.answer(response, parse_mode="HTML")