from aiogram import Router, types from aiogram.types import Message from datetime import datetime import aiohttp from tg_bot.config.settings import settings from tg_bot.infrastructure.database.database import SessionLocal from tg_bot.infrastructure.database.models import UserModel from tg_bot.application.services.rag_service import RAGService router = Router() BACKEND_URL = "http://localhost:8001/api/v1" rag_service = RAGService() @router.message() async def handle_question(message: Message): user_id = message.from_user.id question_text = message.text.strip() if question_text.startswith('/'): return session = SessionLocal() try: user = session.query(UserModel).filter_by( telegram_id=str(user_id) ).first() if not user: user = UserModel( telegram_id=str(user_id), username=message.from_user.username or "", first_name=message.from_user.first_name or "", last_name=message.from_user.last_name or "" ) session.add(user) session.commit() await ensure_user_in_backend(str(user_id), message.from_user) if user.is_premium: await process_premium_question(message, user, question_text, session) elif user.questions_used < settings.FREE_QUESTIONS_LIMIT: await process_free_question(message, user, question_text, session) else: await handle_limit_exceeded(message, user) except Exception as e: print(f"Error processing question: {e}") await message.answer( "Произошла ошибка. Попробуйте позже.", parse_mode="HTML" ) finally: session.close() async def ensure_user_in_backend(telegram_id: str, telegram_user): try: async with aiohttp.ClientSession() as session: async with session.get( f"{BACKEND_URL}/users/telegram/{telegram_id}" ) as response: if response.status == 200: return async with session.post( f"{BACKEND_URL}/users", json={"telegram_id": telegram_id, "role": "user"} ) as create_response: if create_response.status in [200, 201]: print(f"Пользователь {telegram_id} создан в backend") except Exception as e: print(f"Error creating user in backend: {e}") async def process_premium_question(message: Message, user: UserModel, question_text: str, session): user.questions_used += 1 session.commit() await message.bot.send_chat_action(message.chat.id, "typing") try: rag_result = await rag_service.generate_answer_with_rag( question_text, str(message.from_user.id) ) answer = rag_result.get("answer", "Извините, не удалось сгенерировать ответ.") sources = rag_result.get("sources", []) await save_conversation_to_backend( str(message.from_user.id), question_text, answer, sources ) response = ( f"Ваш вопрос:\n" f"{question_text[:200]}\n\n" f"Ответ:\n{answer}\n\n" ) if sources: response += f"Источники из коллекций:\n" collections_used = {} for source in sources[:5]: collection_name = source.get('collection', 'Неизвестно') if collection_name not in collections_used: collections_used[collection_name] = [] collections_used[collection_name].append(source.get('title', 'Без названия')) for i, (collection_name, titles) in enumerate(collections_used.items(), 1): response += f"{i}. Коллекция: {collection_name}\n" for title in titles[:2]: response += f" • {title}\n" response += "\nИспользуйте /mycollections для просмотра всех коллекций\n\n" response += ( f"Статус: Premium (вопросов безлимитно)\n" f"Всего вопросов: {user.questions_used}" ) except Exception as e: print(f"Error generating answer: {e}") response = ( f"Ваш вопрос:\n" f"{question_text[:200]}\n\n" f"Ошибка при генерации ответа. Попробуйте позже.\n\n" f"Статус: Premium\n" f"Всего вопросов: {user.questions_used}" ) await message.answer(response, parse_mode="HTML") async def process_free_question(message: Message, user: UserModel, question_text: str, session): user.questions_used += 1 remaining = settings.FREE_QUESTIONS_LIMIT - user.questions_used session.commit() await message.bot.send_chat_action(message.chat.id, "typing") try: rag_result = await rag_service.generate_answer_with_rag( question_text, str(message.from_user.id) ) answer = rag_result.get("answer", "Извините, не удалось сгенерировать ответ.") sources = rag_result.get("sources", []) await save_conversation_to_backend( str(message.from_user.id), question_text, answer, sources ) response = ( f"Ваш вопрос:\n" f"{question_text[:200]}\n\n" f"Ответ:\n{answer}\n\n" ) if sources: response += f"Источники из коллекций:\n" collections_used = {} for source in sources[:5]: collection_name = source.get('collection', 'Неизвестно') if collection_name not in collections_used: collections_used[collection_name] = [] collections_used[collection_name].append(source.get('title', 'Без названия')) for i, (collection_name, titles) in enumerate(collections_used.items(), 1): response += f"{i}. Коллекция: {collection_name}\n" for title in titles[:2]: response += f" • {title}\n" response += "\nИспользуйте /mycollections для просмотра всех коллекций\n\n" response += ( f"Статус: Бесплатный доступ\n" f"Использовано вопросов: {user.questions_used}/{settings.FREE_QUESTIONS_LIMIT}\n" f"Осталось бесплатных: {remaining}\n\n" ) if remaining <= 3 and remaining > 0: response += f"Осталось мало вопросов! Для продолжения используйте /buy\n\n" response += f"Для безлимитного доступа: /buy" except Exception as e: print(f"Error generating answer: {e}") response = ( f"Ваш вопрос:\n" f"{question_text[:200]}\n\n" f"Ошибка при генерации ответа. Попробуйте позже.\n\n" f"Статус: Бесплатный доступ\n" f"Использовано вопросов: {user.questions_used}/{settings.FREE_QUESTIONS_LIMIT}\n" f"Осталось бесплатных: {remaining}\n\n" f"Для безлимитного доступа: /buy" ) await message.answer(response, parse_mode="HTML") async def save_conversation_to_backend(telegram_id: str, question: str, answer: str, sources: list): try: async with aiohttp.ClientSession() as session: async with session.get( f"{BACKEND_URL}/users/telegram/{telegram_id}" ) as user_response: if user_response.status != 200: return user_data = await user_response.json() user_uuid = user_data.get("user_id") async with session.get( f"{BACKEND_URL}/collections/", headers={"X-Telegram-ID": telegram_id} ) as collections_response: collections = [] if collections_response.status == 200: collections = await collections_response.json() collection_id = None if collections: collection_id = collections[0].get("collection_id") else: async with session.post( f"{BACKEND_URL}/collections", json={ "name": "Основная коллекция", "description": "Коллекция по умолчанию", "is_public": False }, headers={"X-Telegram-ID": telegram_id} ) as create_collection_response: if create_collection_response.status in [200, 201]: collection_data = await create_collection_response.json() collection_id = collection_data.get("collection_id") if not collection_id: return async with session.post( f"{BACKEND_URL}/conversations", json={"collection_id": str(collection_id)}, headers={"X-Telegram-ID": telegram_id} ) as conversation_response: if conversation_response.status not in [200, 201]: return conversation_data = await conversation_response.json() conversation_id = conversation_data.get("conversation_id") if not conversation_id: return await session.post( f"{BACKEND_URL}/messages", json={ "conversation_id": str(conversation_id), "content": question, "role": "user" }, headers={"X-Telegram-ID": telegram_id} ) await session.post( f"{BACKEND_URL}/messages", json={ "conversation_id": str(conversation_id), "content": answer, "role": "assistant", "sources": {"documents": sources} }, headers={"X-Telegram-ID": telegram_id} ) except Exception as e: print(f"Error saving conversation: {e}") async def handle_limit_exceeded(message: Message, user: UserModel): response = ( f"Лимит бесплатных вопросов исчерпан!\n\n" f"Ваша статистика:\n" f"• Использовано вопросов: {user.questions_used}\n" f"• Бесплатный лимит: {settings.FREE_QUESTIONS_LIMIT}\n\n" f"Что делать дальше?\n" f"1. Купите подписку командой /buy\n" f"2. Получите неограниченный доступ к вопросам\n" f"3. Продолжайте использовать бот без ограничений\n\n" f"Подписка включает:\n" f"• Неограниченное количество вопросов\n" f"• Приоритетную обработку\n" f"• Доступ ко всем функциям\n\n" f"Нажмите /buy чтобы продолжить" ) await message.answer(response, parse_mode="HTML")