diff --git a/tg_bot/application/__init__.py b/tg_bot/application/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tg_bot/application/services/__init__.py b/tg_bot/application/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tg_bot/application/services/rag_service.py b/tg_bot/application/services/rag_service.py new file mode 100644 index 0000000..c5771ed --- /dev/null +++ b/tg_bot/application/services/rag_service.py @@ -0,0 +1,139 @@ +import aiohttp +from tg_bot.infrastructure.external.deepseek_client import DeepSeekClient +from tg_bot.config.settings import settings + +BACKEND_URL = "http://localhost:8001/api/v1" + + +class RAGService: + + def __init__(self): + self.deepseek_client = DeepSeekClient() + + async def search_documents_in_collections( + self, + user_telegram_id: str, + query: str, + limit_per_collection: int = 5 + ) -> list[dict]: + try: + async with aiohttp.ClientSession() as session: + async with session.get( + f"{BACKEND_URL}/users/telegram/{user_telegram_id}" + ) as user_response: + if user_response.status != 200: + return [] + + user_data = await user_response.json() + user_uuid = str(user_data.get("user_id")) + + if not user_uuid: + return [] + + async with session.get( + f"{BACKEND_URL}/collections/", + headers={"X-Telegram-ID": user_telegram_id} + ) as collections_response: + if collections_response.status != 200: + return [] + + collections = await collections_response.json() + + all_documents = [] + for collection in collections: + collection_id = collection.get("collection_id") + if not collection_id: + continue + + try: + async with aiohttp.ClientSession() as search_session: + async with search_session.get( + f"{BACKEND_URL}/documents/collection/{collection_id}", + params={"search": query, "limit": limit_per_collection}, + headers={"X-Telegram-ID": user_telegram_id} + ) as search_response: + if search_response.status == 200: + documents = await search_response.json() + for doc in documents: + doc["collection_name"] = collection.get("name", "Unknown") + all_documents.append(doc) + except Exception as e: + print(f"Error searching collection {collection_id}: {e}") + continue + + return all_documents[:20] + + except Exception as e: + print(f"Error searching documents: {e}") + return [] + + async def generate_answer_with_rag( + self, + question: str, + user_telegram_id: str + ) -> dict: + documents = await self.search_documents_in_collections( + user_telegram_id, + question + ) + + context_parts = [] + sources = [] + + for doc in documents[:5]: + title = doc.get("title", "Без названия") + content = doc.get("content", "")[:1000] + collection_name = doc.get("collection_name", "Unknown") + + context_parts.append(f"Документ: {title}\nКоллекция: {collection_name}\nСодержание: {content[:500]}...") + sources.append({ + "title": title, + "collection": collection_name, + "document_id": doc.get("document_id") + }) + + context = "\n\n".join(context_parts) if context_parts else "Релевантные документы не найдены." + + system_prompt = """Ты - помощник-юрист, который отвечает на вопросы на основе предоставленных документов. +Используй информацию из документов для формирования точного и полезного ответа. +Если в документах нет информации для ответа, честно скажи об этом.""" + + user_prompt = f"""Контекст из документов: +{context} + +Вопрос пользователя: {question} + +Ответь на вопрос, используя информацию из предоставленных документов. Если информации недостаточно, укажи это.""" + + try: + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt} + ] + + response = await self.deepseek_client.chat_completion( + messages=messages, + temperature=0.7, + max_tokens=2000 + ) + + return { + "answer": response.get("content", "Failed to generate answer"), + "sources": sources, + "usage": response.get("usage", {}) + } + + except Exception as e: + print(f"Error generating answer: {e}") + if documents: + return { + "answer": f"Found {len(documents)} documents but failed to generate answer", + "sources": sources[:3], + "usage": {} + } + else: + return { + "answer": "No relevant documents found", + "sources": [], + "usage": {} + } diff --git a/tg_bot/config/settings.py b/tg_bot/config/settings.py index 9bacdbf..908bfd5 100644 --- a/tg_bot/config/settings.py +++ b/tg_bot/config/settings.py @@ -26,6 +26,9 @@ class Settings(BaseSettings): YOOKASSA_RETURN_URL: str = "https://t.me/vibelawyer_bot" YOOKASSA_WEBHOOK_SECRET: Optional[str] = None + DEEPSEEK_API_KEY: Optional[str] = None + DEEPSEEK_API_URL: str = "https://api.deepseek.com/v1/chat/completions" + ADMIN_IDS_STR: str = "" @property diff --git a/tg_bot/domain/__init__.py b/tg_bot/domain/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tg_bot/domain/services/__init__.py b/tg_bot/domain/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tg_bot/domain/services/user_service.py b/tg_bot/domain/services/user_service.py new file mode 100644 index 0000000..17d4e33 --- /dev/null +++ b/tg_bot/domain/services/user_service.py @@ -0,0 +1,29 @@ +from sqlalchemy.orm import Session +from datetime import datetime, timedelta +from tg_bot.infrastructure.database.models import UserModel + + +class UserService: + + def __init__(self, session: Session): + self.session = session + + async def activate_premium(self, telegram_id: int) -> bool: + try: + user = self.session.query(UserModel) \ + .filter(UserModel.telegram_id == str(telegram_id)) \ + .first() + if user: + user.is_premium = True + if user.premium_until and user.premium_until > datetime.now(): + user.premium_until = user.premium_until + timedelta(days=30) + else: + user.premium_until = datetime.now() + timedelta(days=30) + self.session.commit() + return True + else: + return False + except Exception as e: + print(f"Error activating premium: {e}") + self.session.rollback() + return False diff --git a/tg_bot/infrastructure/external/__init__.py b/tg_bot/infrastructure/external/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tg_bot/infrastructure/external/deepseek_client.py b/tg_bot/infrastructure/external/deepseek_client.py new file mode 100644 index 0000000..68bc3aa --- /dev/null +++ b/tg_bot/infrastructure/external/deepseek_client.py @@ -0,0 +1,172 @@ +import json +from typing import Optional, AsyncIterator +import httpx +from tg_bot.config.settings import settings + + +class DeepSeekAPIError(Exception): + pass + + +class DeepSeekClient: + + def __init__(self, api_key: str | None = None, api_url: str | None = None): + self.api_key = api_key or settings.DEEPSEEK_API_KEY + self.api_url = api_url or settings.DEEPSEEK_API_URL + self.timeout = 60.0 + + def _get_headers(self) -> dict[str, str]: + if not self.api_key: + raise DeepSeekAPIError("API key not set") + + return { + "Content-Type": "application/json", + "Authorization": f"Bearer {self.api_key}" + } + + async def chat_completion( + self, + messages: list[dict[str, str]], + model: str = "deepseek-chat", + temperature: float = 0.7, + max_tokens: Optional[int] = None, + stream: bool = False + ) -> dict: + if not self.api_key: + return { + "content": "API key not configured", + "usage": { + "prompt_tokens": 0, + "completion_tokens": 0, + "total_tokens": 0 + } + } + + payload = { + "model": model, + "messages": messages, + "temperature": temperature, + "stream": stream + } + + if max_tokens is not None: + payload["max_tokens"] = max_tokens + + try: + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.post( + self.api_url, + headers=self._get_headers(), + json=payload + ) + response.raise_for_status() + + data = response.json() + + if "choices" in data and len(data["choices"]) > 0: + content = data["choices"][0]["message"]["content"] + else: + raise DeepSeekAPIError("Invalid response format") + + usage = data.get("usage", {}) + + return { + "content": content, + "usage": { + "prompt_tokens": usage.get("prompt_tokens", 0), + "completion_tokens": usage.get("completion_tokens", 0), + "total_tokens": usage.get("total_tokens", 0) + } + } + + except httpx.HTTPStatusError as e: + error_msg = f"API error: {e.response.status_code}" + try: + error_data = e.response.json() + if "error" in error_data: + error_msg = error_data['error'].get('message', error_msg) + except: + pass + raise DeepSeekAPIError(error_msg) from e + except httpx.RequestError as e: + raise DeepSeekAPIError(f"Connection error: {str(e)}") from e + except Exception as e: + raise DeepSeekAPIError(str(e)) from e + + async def stream_chat_completion( + self, + messages: list[dict[str, str]], + model: str = "deepseek-chat", + temperature: float = 0.7, + max_tokens: Optional[int] = None + ) -> AsyncIterator[str]: + if not self.api_key: + yield "API key not configured" + return + + payload = { + "model": model, + "messages": messages, + "temperature": temperature, + "stream": True + } + + if max_tokens is not None: + payload["max_tokens"] = max_tokens + + try: + async with httpx.AsyncClient(timeout=self.timeout) as client: + async with client.stream( + "POST", + self.api_url, + headers=self._get_headers(), + json=payload + ) as response: + response.raise_for_status() + + async for line in response.aiter_lines(): + if not line.strip(): + continue + + if line.startswith("data: "): + line = line[6:] + + if line.strip() == "[DONE]": + break + + try: + data = json.loads(line) + + if "choices" in data and len(data["choices"]) > 0: + delta = data["choices"][0].get("delta", {}) + content = delta.get("content", "") + if content: + yield content + except json.JSONDecodeError: + continue + + except httpx.HTTPStatusError as e: + error_msg = f"API error: {e.response.status_code}" + try: + error_data = e.response.json() + if "error" in error_data: + error_msg = error_data['error'].get('message', error_msg) + except: + pass + raise DeepSeekAPIError(error_msg) from e + except httpx.RequestError as e: + raise DeepSeekAPIError(f"Connection error: {str(e)}") from e + except Exception as e: + raise DeepSeekAPIError(str(e)) from e + + async def health_check(self) -> bool: + if not self.api_key: + return False + + try: + test_messages = [{"role": "user", "content": "test"}] + await self.chat_completion(test_messages, max_tokens=1) + return True + except Exception: + return False + diff --git a/tg_bot/infrastructure/telegram/bot.py b/tg_bot/infrastructure/telegram/bot.py index 9f7a44b..55596c6 100644 --- a/tg_bot/infrastructure/telegram/bot.py +++ b/tg_bot/infrastructure/telegram/bot.py @@ -7,7 +7,8 @@ from tg_bot.config.settings import settings from tg_bot.infrastructure.telegram.handlers import ( start_handler, help_handler, - stats_handler + stats_handler, + question_handler ) logger = logging.getLogger(__name__) @@ -22,6 +23,7 @@ async def create_bot() -> tuple[Bot, Dispatcher]: dp.include_router(start_handler.router) dp.include_router(help_handler.router) dp.include_router(stats_handler.router) + dp.include_router(question_handler.router) return bot, dp diff --git a/tg_bot/infrastructure/telegram/handlers/question_handler.py b/tg_bot/infrastructure/telegram/handlers/question_handler.py new file mode 100644 index 0000000..b2b45f0 --- /dev/null +++ b/tg_bot/infrastructure/telegram/handlers/question_handler.py @@ -0,0 +1,306 @@ +from aiogram import Router, types +from aiogram.types import Message +from datetime import datetime +import aiohttp +from tg_bot.config.settings import settings +from tg_bot.infrastructure.database.database import SessionLocal +from tg_bot.infrastructure.database.models import UserModel +from tg_bot.application.services.rag_service import RAGService + +router = Router() +BACKEND_URL = "http://localhost:8001/api/v1" +rag_service = RAGService() + +@router.message() +async def handle_question(message: Message): + user_id = message.from_user.id + question_text = message.text.strip() + if question_text.startswith('/'): + return + + session = SessionLocal() + try: + user = session.query(UserModel).filter_by( + telegram_id=str(user_id) + ).first() + + if not user: + user = UserModel( + telegram_id=str(user_id), + username=message.from_user.username or "", + first_name=message.from_user.first_name or "", + last_name=message.from_user.last_name or "" + ) + session.add(user) + session.commit() + + await ensure_user_in_backend(str(user_id), message.from_user) + + if user.is_premium: + await process_premium_question(message, user, question_text, session) + + elif user.questions_used < settings.FREE_QUESTIONS_LIMIT: + await process_free_question(message, user, question_text, session) + + else: + await handle_limit_exceeded(message, user) + + except Exception as e: + print(f"Error processing question: {e}") + await message.answer( + "Произошла ошибка. Попробуйте позже.", + parse_mode="HTML" + ) + finally: + session.close() + + +async def ensure_user_in_backend(telegram_id: str, telegram_user): + try: + async with aiohttp.ClientSession() as session: + async with session.get( + f"{BACKEND_URL}/users/telegram/{telegram_id}" + ) as response: + if response.status == 200: + return + + async with session.post( + f"{BACKEND_URL}/users", + json={"telegram_id": telegram_id, "role": "user"} + ) as create_response: + if create_response.status in [200, 201]: + print(f"Пользователь {telegram_id} создан в backend") + except Exception as e: + print(f"Error creating user in backend: {e}") + + +async def process_premium_question(message: Message, user: UserModel, question_text: str, session): + user.questions_used += 1 + session.commit() + + await message.bot.send_chat_action(message.chat.id, "typing") + + try: + rag_result = await rag_service.generate_answer_with_rag( + question_text, + str(message.from_user.id) + ) + + answer = rag_result.get("answer", "Извините, не удалось сгенерировать ответ.") + sources = rag_result.get("sources", []) + + await save_conversation_to_backend( + str(message.from_user.id), + question_text, + answer, + sources + ) + + response = ( + f"Ваш вопрос:\n" + f"{question_text[:200]}\n\n" + f"Ответ:\n{answer}\n\n" + ) + + if sources: + response += f"Источники из коллекций:\n" + collections_used = {} + for source in sources[:5]: + collection_name = source.get('collection', 'Неизвестно') + if collection_name not in collections_used: + collections_used[collection_name] = [] + collections_used[collection_name].append(source.get('title', 'Без названия')) + + for i, (collection_name, titles) in enumerate(collections_used.items(), 1): + response += f"{i}. Коллекция: {collection_name}\n" + for title in titles[:2]: + response += f" • {title}\n" + response += "\nИспользуйте /mycollections для просмотра всех коллекций\n\n" + + response += ( + f"Статус: Premium (вопросов безлимитно)\n" + f"Всего вопросов: {user.questions_used}" + ) + + except Exception as e: + print(f"Error generating answer: {e}") + response = ( + f"Ваш вопрос:\n" + f"{question_text[:200]}\n\n" + f"Ошибка при генерации ответа. Попробуйте позже.\n\n" + f"Статус: Premium\n" + f"Всего вопросов: {user.questions_used}" + ) + + await message.answer(response, parse_mode="HTML") + + +async def process_free_question(message: Message, user: UserModel, question_text: str, session): + user.questions_used += 1 + remaining = settings.FREE_QUESTIONS_LIMIT - user.questions_used + session.commit() + + await message.bot.send_chat_action(message.chat.id, "typing") + + try: + rag_result = await rag_service.generate_answer_with_rag( + question_text, + str(message.from_user.id) + ) + + answer = rag_result.get("answer", "Извините, не удалось сгенерировать ответ.") + sources = rag_result.get("sources", []) + + await save_conversation_to_backend( + str(message.from_user.id), + question_text, + answer, + sources + ) + + response = ( + f"Ваш вопрос:\n" + f"{question_text[:200]}\n\n" + f"Ответ:\n{answer}\n\n" + ) + + if sources: + response += f"Источники из коллекций:\n" + collections_used = {} + for source in sources[:5]: + collection_name = source.get('collection', 'Неизвестно') + if collection_name not in collections_used: + collections_used[collection_name] = [] + collections_used[collection_name].append(source.get('title', 'Без названия')) + + for i, (collection_name, titles) in enumerate(collections_used.items(), 1): + response += f"{i}. Коллекция: {collection_name}\n" + for title in titles[:2]: + response += f" • {title}\n" + response += "\nИспользуйте /mycollections для просмотра всех коллекций\n\n" + + response += ( + f"Статус: Бесплатный доступ\n" + f"Использовано вопросов: {user.questions_used}/{settings.FREE_QUESTIONS_LIMIT}\n" + f"Осталось бесплатных: {remaining}\n\n" + ) + + if remaining <= 3 and remaining > 0: + response += f"Осталось мало вопросов! Для продолжения используйте /buy\n\n" + + response += f"Для безлимитного доступа: /buy" + + except Exception as e: + print(f"Error generating answer: {e}") + response = ( + f"Ваш вопрос:\n" + f"{question_text[:200]}\n\n" + f"Ошибка при генерации ответа. Попробуйте позже.\n\n" + f"Статус: Бесплатный доступ\n" + f"Использовано вопросов: {user.questions_used}/{settings.FREE_QUESTIONS_LIMIT}\n" + f"Осталось бесплатных: {remaining}\n\n" + f"Для безлимитного доступа: /buy" + ) + + await message.answer(response, parse_mode="HTML") + + +async def save_conversation_to_backend(telegram_id: str, question: str, answer: str, sources: list): + try: + async with aiohttp.ClientSession() as session: + async with session.get( + f"{BACKEND_URL}/users/telegram/{telegram_id}" + ) as user_response: + if user_response.status != 200: + return + user_data = await user_response.json() + user_uuid = user_data.get("user_id") + + async with session.get( + f"{BACKEND_URL}/collections/", + headers={"X-Telegram-ID": telegram_id} + ) as collections_response: + collections = [] + if collections_response.status == 200: + collections = await collections_response.json() + + collection_id = None + if collections: + collection_id = collections[0].get("collection_id") + else: + async with session.post( + f"{BACKEND_URL}/collections", + json={ + "name": "Основная коллекция", + "description": "Коллекция по умолчанию", + "is_public": False + }, + headers={"X-Telegram-ID": telegram_id} + ) as create_collection_response: + if create_collection_response.status in [200, 201]: + collection_data = await create_collection_response.json() + collection_id = collection_data.get("collection_id") + + if not collection_id: + return + + async with session.post( + f"{BACKEND_URL}/conversations", + json={"collection_id": str(collection_id)}, + headers={"X-Telegram-ID": telegram_id} + ) as conversation_response: + if conversation_response.status not in [200, 201]: + return + conversation_data = await conversation_response.json() + conversation_id = conversation_data.get("conversation_id") + + if not conversation_id: + return + + await session.post( + f"{BACKEND_URL}/messages", + json={ + "conversation_id": str(conversation_id), + "content": question, + "role": "user" + }, + headers={"X-Telegram-ID": telegram_id} + ) + + await session.post( + f"{BACKEND_URL}/messages", + json={ + "conversation_id": str(conversation_id), + "content": answer, + "role": "assistant", + "sources": {"documents": sources} + }, + headers={"X-Telegram-ID": telegram_id} + ) + + except Exception as e: + print(f"Error saving conversation: {e}") + + +async def handle_limit_exceeded(message: Message, user: UserModel): + response = ( + f"Лимит бесплатных вопросов исчерпан!\n\n" + + f"Ваша статистика:\n" + f"• Использовано вопросов: {user.questions_used}\n" + f"• Бесплатный лимит: {settings.FREE_QUESTIONS_LIMIT}\n\n" + + f"Что делать дальше?\n" + f"1. Купите подписку командой /buy\n" + f"2. Получите неограниченный доступ к вопросам\n" + f"3. Продолжайте использовать бот без ограничений\n\n" + + f"Подписка включает:\n" + f"• Неограниченное количество вопросов\n" + f"• Приоритетную обработку\n" + f"• Доступ ко всем функциям\n\n" + + f"Нажмите /buy чтобы продолжить" + ) + + await message.answer(response, parse_mode="HTML")