2025-12-22 21:17:17 +03:00

301 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from aiogram import Router, types
from aiogram.types import Message
from datetime import datetime
import aiohttp
from tg_bot.config.settings import settings
from tg_bot.infrastructure.database.database import AsyncSessionLocal
from tg_bot.infrastructure.database.models import UserModel
from tg_bot.domain.services.user_service import UserService
from tg_bot.application.services.rag_service import RAGService
router = Router()
BACKEND_URL = "http://localhost:8001/api/v1"
rag_service = RAGService()
@router.message()
async def handle_question(message: Message):
user_id = message.from_user.id
question_text = message.text.strip()
if question_text.startswith('/'):
return
async with AsyncSessionLocal() as session:
try:
user_service = UserService(session)
user = await user_service.get_user_by_telegram_id(user_id)
if not user:
user = await user_service.get_or_create_user(
user_id,
message.from_user.username or "",
message.from_user.first_name or "",
message.from_user.last_name or ""
)
await ensure_user_in_backend(str(user_id), message.from_user)
if user.is_premium:
await process_premium_question(message, user, question_text, user_service)
elif user.questions_used < settings.FREE_QUESTIONS_LIMIT:
await process_free_question(message, user, question_text, user_service)
else:
await handle_limit_exceeded(message, user)
except Exception as e:
print(f"Error processing question: {e}")
await message.answer(
"Произошла ошибка. Попробуйте позже.",
parse_mode="HTML"
)
async def ensure_user_in_backend(telegram_id: str, telegram_user):
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{BACKEND_URL}/users/telegram/{telegram_id}"
) as response:
if response.status == 200:
return
async with session.post(
f"{BACKEND_URL}/users",
json={"telegram_id": telegram_id, "role": "user"}
) as create_response:
if create_response.status in [200, 201]:
print(f"Пользователь {telegram_id} создан в backend")
except Exception as e:
print(f"Error creating user in backend: {e}")
async def process_premium_question(message: Message, user: UserModel, question_text: str, user_service: UserService):
await user_service.update_user_questions(user.telegram_id)
await message.bot.send_chat_action(message.chat.id, "typing")
try:
rag_result = await rag_service.generate_answer_with_rag(
question_text,
str(message.from_user.id)
)
answer = rag_result.get("answer", "Извините, не удалось сгенерировать ответ.")
sources = rag_result.get("sources", [])
await save_conversation_to_backend(
str(message.from_user.id),
question_text,
answer,
sources
)
response = (
f"<b>Ваш вопрос:</b>\n"
f"<i>{question_text[:200]}</i>\n\n"
f"<b>Ответ:</b>\n{answer}\n\n"
)
if sources:
response += f"<b>Источники из коллекций:</b>\n"
collections_used = {}
for source in sources[:5]:
collection_name = source.get('collection', 'Неизвестно')
if collection_name not in collections_used:
collections_used[collection_name] = []
collections_used[collection_name].append(source.get('title', 'Без названия'))
for i, (collection_name, titles) in enumerate(collections_used.items(), 1):
response += f"{i}. <b>Коллекция:</b> {collection_name}\n"
for title in titles[:2]:
response += f"{title}\n"
response += "\n<i>Используйте /mycollections для просмотра всех коллекций</i>\n\n"
response += (
f"<b>Статус:</b> Premium (вопросов безлимитно)\n"
f"<b>Всего вопросов:</b> {user.questions_used}"
)
except Exception as e:
print(f"Error generating answer: {e}")
response = (
f"<b>Ваш вопрос:</b>\n"
f"<i>{question_text[:200]}</i>\n\n"
f"Ошибка при генерации ответа. Попробуйте позже.\n\n"
f"<b>Статус:</b> Premium\n"
f"<b>Всего вопросов:</b> {user.questions_used}"
)
await message.answer(response, parse_mode="HTML")
async def process_free_question(message: Message, user: UserModel, question_text: str, user_service: UserService):
await user_service.update_user_questions(user.telegram_id)
user = await user_service.get_user_by_telegram_id(user.telegram_id)
remaining = settings.FREE_QUESTIONS_LIMIT - user.questions_used
await message.bot.send_chat_action(message.chat.id, "typing")
try:
rag_result = await rag_service.generate_answer_with_rag(
question_text,
str(message.from_user.id)
)
answer = rag_result.get("answer", "Извините, не удалось сгенерировать ответ.")
sources = rag_result.get("sources", [])
await save_conversation_to_backend(
str(message.from_user.id),
question_text,
answer,
sources
)
response = (
f"<b>Ваш вопрос:</b>\n"
f"<i>{question_text[:200]}</i>\n\n"
f"<b>Ответ:</b>\n{answer}\n\n"
)
if sources:
response += f"<b>Источники из коллекций:</b>\n"
collections_used = {}
for source in sources[:5]:
collection_name = source.get('collection', 'Неизвестно')
if collection_name not in collections_used:
collections_used[collection_name] = []
collections_used[collection_name].append(source.get('title', 'Без названия'))
for i, (collection_name, titles) in enumerate(collections_used.items(), 1):
response += f"{i}. <b>Коллекция:</b> {collection_name}\n"
for title in titles[:2]:
response += f"{title}\n"
response += "\n<i>Используйте /mycollections для просмотра всех коллекций</i>\n\n"
response += (
f"<b>Статус:</b> Бесплатный доступ\n"
f"<b>Использовано вопросов:</b> {user.questions_used}/{settings.FREE_QUESTIONS_LIMIT}\n"
f"<b>Осталось бесплатных:</b> {remaining}\n\n"
)
if remaining <= 3 and remaining > 0:
response += f"<i>Осталось мало вопросов! Для продолжения используйте /buy</i>\n\n"
response += f"<i>Для безлимитного доступа: /buy</i>"
except Exception as e:
print(f"Error generating answer: {e}")
response = (
f"<b>Ваш вопрос:</b>\n"
f"<i>{question_text[:200]}</i>\n\n"
f"Ошибка при генерации ответа. Попробуйте позже.\n\n"
f"<b>Статус:</b> Бесплатный доступ\n"
f"<b>Использовано вопросов:</b> {user.questions_used}/{settings.FREE_QUESTIONS_LIMIT}\n"
f"<b>Осталось бесплатных:</b> {remaining}\n\n"
f"<i>Для безлимитного доступа: /buy</i>"
)
await message.answer(response, parse_mode="HTML")
async def save_conversation_to_backend(telegram_id: str, question: str, answer: str, sources: list):
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{BACKEND_URL}/users/telegram/{telegram_id}"
) as user_response:
if user_response.status != 200:
return
user_data = await user_response.json()
user_uuid = user_data.get("user_id")
async with session.get(
f"{BACKEND_URL}/collections/",
headers={"X-Telegram-ID": telegram_id}
) as collections_response:
collections = []
if collections_response.status == 200:
collections = await collections_response.json()
collection_id = None
if collections:
collection_id = collections[0].get("collection_id")
else:
async with session.post(
f"{BACKEND_URL}/collections",
json={
"name": "Основная коллекция",
"description": "Коллекция по умолчанию",
"is_public": False
},
headers={"X-Telegram-ID": telegram_id}
) as create_collection_response:
if create_collection_response.status in [200, 201]:
collection_data = await create_collection_response.json()
collection_id = collection_data.get("collection_id")
if not collection_id:
return
async with session.post(
f"{BACKEND_URL}/conversations",
json={"collection_id": str(collection_id)},
headers={"X-Telegram-ID": telegram_id}
) as conversation_response:
if conversation_response.status not in [200, 201]:
return
conversation_data = await conversation_response.json()
conversation_id = conversation_data.get("conversation_id")
if not conversation_id:
return
await session.post(
f"{BACKEND_URL}/messages",
json={
"conversation_id": str(conversation_id),
"content": question,
"role": "user"
},
headers={"X-Telegram-ID": telegram_id}
)
await session.post(
f"{BACKEND_URL}/messages",
json={
"conversation_id": str(conversation_id),
"content": answer,
"role": "assistant",
"sources": {"documents": sources}
},
headers={"X-Telegram-ID": telegram_id}
)
except Exception as e:
print(f"Error saving conversation: {e}")
async def handle_limit_exceeded(message: Message, user: UserModel):
response = (
f"<b>Лимит бесплатных вопросов исчерпан!</b>\n\n"
f"<b>Ваша статистика:</b>\n"
f"• Использовано вопросов: {user.questions_used}\n"
f"• Бесплатный лимит: {settings.FREE_QUESTIONS_LIMIT}\n\n"
f"<b>Что делать дальше?</b>\n"
f"1. Купите подписку командой /buy\n"
f"2. Получите неограниченный доступ к вопросам\n"
f"3. Продолжайте использовать бот без ограничений\n\n"
f"<b>Подписка включает:</b>\n"
f"• Неограниченное количество вопросов\n"
f"• Приоритетную обработку\n"
f"• Доступ ко всем функциям\n\n"
f"<b>Нажмите /buy чтобы продолжить</b>"
)
await message.answer(response, parse_mode="HTML")