280 lines
11 KiB
Python
280 lines
11 KiB
Python
from aiogram import Router, types
|
||
from aiogram.types import Message
|
||
from datetime import datetime
|
||
import aiohttp
|
||
from tg_bot.config.settings import settings
|
||
from tg_bot.domain.services.user_service import UserService, User
|
||
from tg_bot.application.services.rag_service import RAGService
|
||
|
||
router = Router()
|
||
rag_service = RAGService()
|
||
user_service = UserService()
|
||
|
||
@router.message()
|
||
async def handle_question(message: Message):
|
||
user_id = message.from_user.id
|
||
question_text = message.text.strip()
|
||
if question_text.startswith('/'):
|
||
return
|
||
|
||
try:
|
||
user = await user_service.get_user_by_telegram_id(user_id)
|
||
|
||
if not user:
|
||
user = await user_service.get_or_create_user(
|
||
user_id,
|
||
message.from_user.username or "",
|
||
message.from_user.first_name or "",
|
||
message.from_user.last_name or ""
|
||
)
|
||
|
||
if user.is_premium:
|
||
await process_premium_question(message, user, question_text)
|
||
|
||
elif user.questions_used < settings.FREE_QUESTIONS_LIMIT:
|
||
await process_free_question(message, user, question_text)
|
||
|
||
else:
|
||
await handle_limit_exceeded(message, user)
|
||
|
||
except Exception as e:
|
||
print(f"Error processing question: {e}")
|
||
await message.answer(
|
||
"Произошла ошибка. Попробуйте позже.",
|
||
parse_mode="HTML"
|
||
)
|
||
|
||
|
||
async def process_premium_question(message: Message, user: User, question_text: str):
|
||
await user_service.update_user_questions(int(user.telegram_id))
|
||
user = await user_service.get_user_by_telegram_id(int(user.telegram_id))
|
||
|
||
await message.bot.send_chat_action(message.chat.id, "typing")
|
||
|
||
try:
|
||
rag_result = await rag_service.generate_answer_with_rag(
|
||
question_text,
|
||
str(message.from_user.id)
|
||
)
|
||
|
||
answer = rag_result.get("answer", "Извините, не удалось сгенерировать ответ.")
|
||
sources = rag_result.get("sources", [])
|
||
|
||
await save_conversation_to_backend(
|
||
str(message.from_user.id),
|
||
question_text,
|
||
answer,
|
||
sources
|
||
)
|
||
|
||
response = (
|
||
f"<b>Ваш вопрос:</b>\n"
|
||
f"<i>{question_text[:200]}</i>\n\n"
|
||
f"<b>Ответ:</b>\n{answer}\n\n"
|
||
)
|
||
|
||
if sources:
|
||
response += f"<b>Источники из коллекций:</b>\n"
|
||
collections_used = {}
|
||
for source in sources[:5]:
|
||
collection_name = source.get('collection', 'Неизвестно')
|
||
if collection_name not in collections_used:
|
||
collections_used[collection_name] = []
|
||
collections_used[collection_name].append(source.get('title', 'Без названия'))
|
||
|
||
for i, (collection_name, titles) in enumerate(collections_used.items(), 1):
|
||
response += f"{i}. <b>Коллекция:</b> {collection_name}\n"
|
||
for title in titles[:2]:
|
||
response += f" • {title}\n"
|
||
response += "\n<i>Используйте /mycollections для просмотра всех коллекций</i>\n\n"
|
||
|
||
response += (
|
||
f"<b>Статус:</b> Premium (вопросов безлимитно)\n"
|
||
f"<b>Всего вопросов:</b> {user.questions_used}"
|
||
)
|
||
|
||
except Exception as e:
|
||
print(f"Error generating answer: {e}")
|
||
response = (
|
||
f"<b>Ваш вопрос:</b>\n"
|
||
f"<i>{question_text[:200]}</i>\n\n"
|
||
f"Ошибка при генерации ответа. Попробуйте позже.\n\n"
|
||
f"<b>Статус:</b> Premium\n"
|
||
f"<b>Всего вопросов:</b> {user.questions_used}"
|
||
)
|
||
|
||
await message.answer(response, parse_mode="HTML")
|
||
|
||
|
||
async def process_free_question(message: Message, user: User, question_text: str):
|
||
await user_service.update_user_questions(int(user.telegram_id))
|
||
user = await user_service.get_user_by_telegram_id(int(user.telegram_id))
|
||
remaining = settings.FREE_QUESTIONS_LIMIT - user.questions_used
|
||
|
||
await message.bot.send_chat_action(message.chat.id, "typing")
|
||
|
||
try:
|
||
rag_result = await rag_service.generate_answer_with_rag(
|
||
question_text,
|
||
str(message.from_user.id)
|
||
)
|
||
|
||
answer = rag_result.get("answer", "Извините, не удалось сгенерировать ответ.")
|
||
sources = rag_result.get("sources", [])
|
||
|
||
await save_conversation_to_backend(
|
||
str(message.from_user.id),
|
||
question_text,
|
||
answer,
|
||
sources
|
||
)
|
||
|
||
response = (
|
||
f"<b>Ваш вопрос:</b>\n"
|
||
f"<i>{question_text[:200]}</i>\n\n"
|
||
f"<b>Ответ:</b>\n{answer}\n\n"
|
||
)
|
||
|
||
if sources:
|
||
response += f"<b>Источники из коллекций:</b>\n"
|
||
collections_used = {}
|
||
for source in sources[:5]:
|
||
collection_name = source.get('collection', 'Неизвестно')
|
||
if collection_name not in collections_used:
|
||
collections_used[collection_name] = []
|
||
collections_used[collection_name].append(source.get('title', 'Без названия'))
|
||
|
||
for i, (collection_name, titles) in enumerate(collections_used.items(), 1):
|
||
response += f"{i}. <b>Коллекция:</b> {collection_name}\n"
|
||
for title in titles[:2]:
|
||
response += f" • {title}\n"
|
||
response += "\n<i>Используйте /mycollections для просмотра всех коллекций</i>\n\n"
|
||
|
||
response += (
|
||
f"<b>Статус:</b> Бесплатный доступ\n"
|
||
f"<b>Использовано вопросов:</b> {user.questions_used}/{settings.FREE_QUESTIONS_LIMIT}\n"
|
||
f"<b>Осталось бесплатных:</b> {remaining}\n\n"
|
||
)
|
||
|
||
if remaining <= 3 and remaining > 0:
|
||
response += f"<i>Осталось мало вопросов! Для продолжения используйте /buy</i>\n\n"
|
||
|
||
response += f"<i>Для безлимитного доступа: /buy</i>"
|
||
|
||
except Exception as e:
|
||
print(f"Error generating answer: {e}")
|
||
response = (
|
||
f"<b>Ваш вопрос:</b>\n"
|
||
f"<i>{question_text[:200]}</i>\n\n"
|
||
f"Ошибка при генерации ответа. Попробуйте позже.\n\n"
|
||
f"<b>Статус:</b> Бесплатный доступ\n"
|
||
f"<b>Использовано вопросов:</b> {user.questions_used}/{settings.FREE_QUESTIONS_LIMIT}\n"
|
||
f"<b>Осталось бесплатных:</b> {remaining}\n\n"
|
||
f"<i>Для безлимитного доступа: /buy</i>"
|
||
)
|
||
|
||
await message.answer(response, parse_mode="HTML")
|
||
|
||
|
||
async def save_conversation_to_backend(telegram_id: str, question: str, answer: str, sources: list):
|
||
try:
|
||
from tg_bot.config.settings import settings
|
||
backend_url = settings.BACKEND_URL
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(
|
||
f"{backend_url}/users/telegram/{telegram_id}"
|
||
) as user_response:
|
||
if user_response.status != 200:
|
||
return
|
||
user_data = await user_response.json()
|
||
user_uuid = user_data.get("user_id")
|
||
|
||
async with session.get(
|
||
f"{backend_url}/collections/",
|
||
headers={"X-Telegram-ID": telegram_id}
|
||
) as collections_response:
|
||
collections = []
|
||
if collections_response.status == 200:
|
||
collections = await collections_response.json()
|
||
|
||
collection_id = None
|
||
if collections:
|
||
collection_id = collections[0].get("collection_id")
|
||
else:
|
||
async with session.post(
|
||
f"{backend_url}/collections",
|
||
json={
|
||
"name": "Основная коллекция",
|
||
"description": "Коллекция по умолчанию",
|
||
"is_public": False
|
||
},
|
||
headers={"X-Telegram-ID": telegram_id}
|
||
) as create_collection_response:
|
||
if create_collection_response.status in [200, 201]:
|
||
collection_data = await create_collection_response.json()
|
||
collection_id = collection_data.get("collection_id")
|
||
|
||
if not collection_id:
|
||
return
|
||
|
||
async with session.post(
|
||
f"{backend_url}/conversations",
|
||
json={"collection_id": str(collection_id)},
|
||
headers={"X-Telegram-ID": telegram_id}
|
||
) as conversation_response:
|
||
if conversation_response.status not in [200, 201]:
|
||
return
|
||
conversation_data = await conversation_response.json()
|
||
conversation_id = conversation_data.get("conversation_id")
|
||
|
||
if not conversation_id:
|
||
return
|
||
|
||
await session.post(
|
||
f"{backend_url}/messages",
|
||
json={
|
||
"conversation_id": str(conversation_id),
|
||
"content": question,
|
||
"role": "user"
|
||
},
|
||
headers={"X-Telegram-ID": telegram_id}
|
||
)
|
||
|
||
await session.post(
|
||
f"{backend_url}/messages",
|
||
json={
|
||
"conversation_id": str(conversation_id),
|
||
"content": answer,
|
||
"role": "assistant",
|
||
"sources": {"documents": sources}
|
||
},
|
||
headers={"X-Telegram-ID": telegram_id}
|
||
)
|
||
|
||
except Exception as e:
|
||
print(f"Error saving conversation: {e}")
|
||
|
||
|
||
async def handle_limit_exceeded(message: Message, user: User):
|
||
response = (
|
||
f"<b>Лимит бесплатных вопросов исчерпан!</b>\n\n"
|
||
|
||
f"<b>Ваша статистика:</b>\n"
|
||
f"• Использовано вопросов: {user.questions_used}\n"
|
||
f"• Бесплатный лимит: {settings.FREE_QUESTIONS_LIMIT}\n\n"
|
||
|
||
f"<b>Что делать дальше?</b>\n"
|
||
f"1. Купите подписку командой /buy\n"
|
||
f"2. Получите неограниченный доступ к вопросам\n"
|
||
f"3. Продолжайте использовать бот без ограничений\n\n"
|
||
|
||
f"<b>Подписка включает:</b>\n"
|
||
f"• Неограниченное количество вопросов\n"
|
||
f"• Приоритетную обработку\n"
|
||
f"• Доступ ко всем функциям\n\n"
|
||
|
||
f"<b>Нажмите /buy чтобы продолжить</b>"
|
||
)
|
||
|
||
await message.answer(response, parse_mode="HTML")
|