Локальный RAG-поисковик с автообновлением на Python и Qdrant - гайд | AiManual
AiManual Logo Ai / Manual.
02 Янв 2026 Гайд

Заурядный RAG уже не работает: собираем поисковик, который обновляется сам

Пошаговое руководство по созданию локального семантического поиска с автоматическим отслеживанием изменений файлов. Python, Qdrant, MCP сервер.

Забудьте про ручное обновление индекса

Представьте ситуацию: вы настроили RAG-систему для поиска по документации проекта. Через неделю разработчики обновили 50 файлов. Ваш индекс теперь как старый телефонный справочник - красиво, но бесполезно. Переиндексировать все вручную? Нет, спасибо.

Именно поэтому обычные RAG-системы раздражают. Они требуют постоянного внимания, как капризный кот. А мы сделаем систему, которая следит за изменениями сама. Без облаков, без API, без ежемесячных счетов.

Если вы ищете базовый RAG - прочтите RAG за 15 минут. Эта статья для тех, кому надоело перезапускать индексацию.

Что получится в итоге

Система, которая:

  • Мониторит папку с документами в реальном времени
  • Автоматически обновляет векторный индекс при любых изменениях
  • Отвечает на вопросы на естественном языке
  • Работает полностью локально (даже эмбеддинги)
  • Не требует GPU или мощного железа

Архитектура: просто, но не примитивно

Мы не будем использовать LangChain - он слишком жирный для нашей задачи. Вместо этого соберем систему из легких компонентов:

КомпонентЧто делаетАльтернатива
QdrantВекторная база данныхChroma, Weaviate
Sentence TransformersГенерация эмбеддинговOpenAI API (платно)
WatchdogОтслеживание файловpyinotify (только Linux)
FastMCPMCP-сервер для интеграцииСвой REST API

1Устанавливаем зависимости

Сначала подготовим окружение. Не устанавливайте все в глобальное окружение - потом не разберетесь.

# Создаем виртуальное окружение
python -m venv rag_env
source rag_env/bin/activate  # для Windows: rag_env\Scripts\activate

# Основные зависимости
pip install qdrant-client sentence-transformers watchdog fastapi uvicorn

# Для работы с документами
pip install pypdf python-docx markdown

# MCP сервер (опционально, но рекомендую)
pip install mcp
💡
Если у вас слабое железо - используйте модель эмбеддингов поменьше. Вместо 'all-MiniLM-L6-v2' (80МБ) можно взять 'paraphrase-albert-small-v2' (44МБ). Подробнее про оптимизацию - в статье Как сделать локальный RAG для 60 ГБ писем на слабом железе.

2Запускаем Qdrant локально

Qdrant можно запустить через Docker или как бинарник. Docker проще, но требует больше памяти.

# Способ 1: Docker (рекомендую)
docker run -p 6333:6333 -p 6334:6334 \
    -v $(pwd)/qdrant_storage:/qdrant/storage:z \
    qdrant/qdrant

# Способ 2: Бинарник (если не любите Docker)
# Скачиваем с GitHub: https://github.com/qdrant/qdrant/releases
# Запускаем: ./qdrant

Проверяем, что сервер работает: откройте http://localhost:6333/dashboard в браузере. Должна появиться админка Qdrant.

3Создаем ядро системы

Теперь самое интересное - пишем код. Начнем с обработчика документов.

# document_processor.py
import os
from pathlib import Path
from typing import List, Dict, Any
import hashlib
from datetime import datetime

from sentence_transformers import SentenceTransformer
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct

import pypdf
from docx import Document
import markdown


class DocumentProcessor:
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        """Инициализация процессора документов."""
        self.model = SentenceTransformer(model_name)
        self.client = QdrantClient(host="localhost", port=6333)
        self.collection_name = "documents"
        
        # Создаем коллекцию, если ее нет
        try:
            self.client.get_collection(self.collection_name)
        except Exception:
            self.client.create_collection(
                collection_name=self.collection_name,
                vectors_config=VectorParams(
                    size=self.model.get_sentence_embedding_dimension(),
                    distance=Distance.COSINE
                )
            )
    
    def _calculate_file_hash(self, file_path: str) -> str:
        """Рассчитываем хэш файла для отслеживания изменений."""
        with open(file_path, "rb") as f:
            return hashlib.md5(f.read()).hexdigest()
    
    def _extract_text(self, file_path: str) -> str:
        """Извлекаем текст из разных форматов файлов."""
        ext = os.path.splitext(file_path)[1].lower()
        
        if ext == ".pdf":
            text = ""
            with open(file_path, "rb") as f:
                pdf_reader = pypdf.PdfReader(f)
                for page in pdf_reader.pages:
                    text += page.extract_text() + "\n"
            return text
        
        elif ext == ".docx":
            doc = Document(file_path)
            return "\n".join([para.text for para in doc.paragraphs])
        
        elif ext == ".md":
            with open(file_path, "r", encoding="utf-8") as f:
                md_text = f.read()
                return markdown.markdown(md_text)
        
        else:  # Предполагаем текстовый файл
            with open(file_path, "r", encoding="utf-8") as f:
                return f.read()
    
    def _chunk_text(self, text: str, chunk_size: int = 500) -> List[str]:
        """Разбиваем текст на чанки."""
        words = text.split()
        chunks = []
        current_chunk = []
        current_length = 0
        
        for word in words:
            current_chunk.append(word)
            current_length += len(word) + 1
            
            if current_length >= chunk_size:
                chunks.append(" ".join(current_chunk))
                current_chunk = []
                current_length = 0
        
        if current_chunk:
            chunks.append(" ".join(current_chunk))
        
        return chunks
    
    def process_file(self, file_path: str) -> Dict[str, Any]:
        """Обрабатываем один файл и добавляем в Qdrant."""
        file_hash = self._calculate_file_hash(file_path)
        text = self._extract_text(file_path)
        chunks = self._chunk_text(text)
        
        if not chunks:
            return {"status": "empty", "file": file_path}
        
        # Генерируем эмбеддинги для всех чанков
        embeddings = self.model.encode(chunks)
        
        points = []
        for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
            point_id = hashlib.md5(f"{file_path}:{i}:{file_hash}".encode()).hexdigest()
            
            points.append(
                PointStruct(
                    id=point_id,
                    vector=embedding.tolist(),
                    payload={
                        "text": chunk,
                        "file_path": file_path,
                        "chunk_index": i,
                        "file_hash": file_hash,
                        "timestamp": datetime.now().isoformat()
                    }
                )
            )
        
        # Удаляем старые версии этого файла (если они есть)
        self.client.delete(
            collection_name=self.collection_name,
            points_selector=self.client.models.FilterSelector(
                filter=self.client.models.Filter(
                    must=[
                        self.client.models.FieldCondition(
                            key="file_path",
                            match=self.client.models.MatchValue(value=file_path)
                        )
                    ]
                )
            )
        )
        
        # Добавляем новые точки
        self.client.upsert(
            collection_name=self.collection_name,
            points=points
        )
        
        return {
            "status": "success",
            "file": file_path,
            "chunks": len(chunks),
            "hash": file_hash
        }
    
    def search(self, query: str, limit: int = 5) -> List[Dict[str, Any]]:
        """Ищем по запросу."""
        query_embedding = self.model.encode(query).tolist()
        
        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_embedding,
            limit=limit
        )
        
        return [
            {
                "score": hit.score,
                "text": hit.payload["text"],
                "file_path": hit.payload["file_path"],
                "chunk_index": hit.payload["chunk_index"]
            }
            for hit in results
        ]
    
    def delete_file(self, file_path: str) -> bool:
        """Удаляем файл из индекса."""
        try:
            self.client.delete(
                collection_name=self.collection_name,
                points_selector=self.client.models.FilterSelector(
                    filter=self.client.models.Filter(
                        must=[
                            self.client.models.FieldCondition(
                                key="file_path",
                                match=self.client.models.MatchValue(value=file_path)
                            )
                        ]
                    )
                )
            )
            return True
        except Exception as e:
            print(f"Ошибка при удалении {file_path}: {e}")
            return False

Ошибка, которую все делают: хранят только текст, но не хранят метаданные. Обязательно сохраняйте file_hash и timestamp - они нужны для отслеживания изменений.

4Добавляем мониторинг файлов

Теперь самое важное - автоматическое отслеживание изменений. Используем Watchdog.

# file_monitor.py
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from pathlib import Path
from typing import Set
import threading

from document_processor import DocumentProcessor


class FileChangeHandler(FileSystemEventHandler):
    def __init__(self, processor: DocumentProcessor, watch_dir: str):
        self.processor = processor
        self.watch_dir = Path(watch_dir).resolve()
        self.processing_queue = []
        self.processing_lock = threading.Lock()
        self.processed_files: Set[str] = set()
        
        # Запускаем фоновый поток для обработки
        self.worker_thread = threading.Thread(target=self._process_queue, daemon=True)
        self.worker_thread.start()
    
    def _should_process(self, file_path: str) -> bool:
        """Проверяем, нужно ли обрабатывать файл."""
        path = Path(file_path)
        
        # Игнорируем скрытые файлы и временные файлы
        if path.name.startswith(".") or path.name.startswith("~"):
            return False
        
        # Поддерживаемые расширения
        supported_extensions = {".txt", ".md", ".pdf", ".docx", ".rst", ".py", ".js", ".html"}
        if path.suffix.lower() not in supported_extensions:
            return False
        
        return True
    
    def on_modified(self, event):
        """Обрабатываем изменение файла."""
        if event.is_directory:
            return
        
        if self._should_process(event.src_path):
            with self.processing_lock:
                if event.src_path not in self.processing_queue:
                    self.processing_queue.append(event.src_path)
    
    def on_created(self, event):
        """Обрабатываем создание файла."""
        self.on_modified(event)
    
    def on_deleted(self, event):
        """Обрабатываем удаление файла."""
        if event.is_directory:
            return
        
        if self._should_process(event.src_path):
            print(f"Удаляем из индекса: {event.src_path}")
            self.processor.delete_file(event.src_path)
    
    def _process_queue(self):
        """Фоновый обработчик очереди файлов."""
        while True:
            time.sleep(2)  # Ждем 2 секунды между обработками
            
            with self.processing_lock:
                if not self.processing_queue:
                    continue
                
                file_path = self.processing_queue.pop(0)
            
            try:
                result = self.processor.process_file(file_path)
                print(f"Обработан: {file_path} ({result['chunks']} чанков)")
            except Exception as e:
                print(f"Ошибка обработки {file_path}: {e}")


def start_monitoring(watch_dir: str = "./documents"):
    """Запускаем мониторинг директории."""
    processor = DocumentProcessor()
    event_handler = FileChangeHandler(processor, watch_dir)
    observer = Observer()
    observer.schedule(event_handler, watch_dir, recursive=True)
    observer.start()
    
    print(f"Мониторим директорию: {watch_dir}")
    print("Нажмите Ctrl+C для остановки")
    
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    
    observer.join()

Ключевой момент здесь - задержка в 2 секунды перед обработкой. Зачем? Чтобы не обрабатывать файл, пока он еще сохраняется. Представьте, что вы редактируете документ в VS Code - он сохраняет каждые 0.5 секунды. Без задержки система будет пытаться обработать файл 10 раз в секунду.

5Добавляем API для поиска

Теперь сделаем простой REST API, чтобы можно было искать через HTTP.

# api_server.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List
import uvicorn

from document_processor import DocumentProcessor

app = FastAPI(title="Local RAG Search API")
processor = DocumentProcessor()


class SearchRequest(BaseModel):
    query: str
    limit: int = 5


class SearchResult(BaseModel):
    score: float
    text: str
    file_path: str
    chunk_index: int


class SearchResponse(BaseModel):
    results: List[SearchResult]


@app.post("/search", response_model=SearchResponse)
def search_documents(request: SearchRequest):
    """Поиск по документам."""
    try:
        results = processor.search(request.query, request.limit)
        return SearchResponse(results=results)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.post("/process")
def process_file(file_path: str):
    """Принудительная обработка файла."""
    try:
        result = processor.process_file(file_path)
        return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.delete("/file/{file_path:path}")
def delete_file(file_path: str):
    """Удаление файла из индекса."""
    success = processor.delete_file(file_path)
    return {"success": success}


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

6Интеграция с MCP (опционально, но круто)

MCP (Model Context Protocol) позволяет подключать ваш поисковик к Claude Desktop, Cursor и другим AI-ассистентам. Вот минимальный сервер:

# mcp_server.py
from mcp import Client, Server
import asyncio

from document_processor import DocumentProcessor

processor = DocumentProcessor()

server = Server("local-rag-search")


@server.list_tools()
async def list_tools():
    return [
        {
            "name": "search_documents",
            "description": "Поиск по локальным документам",
            "inputSchema": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "Поисковый запрос"
                    },
                    "limit": {
                        "type": "integer",
                        "description": "Количество результатов",
                        "default": 3
                    }
                },
                "required": ["query"]
            }
        }
    ]


@server.call_tool()
async def call_tool(name: str, arguments: dict):
    if name == "search_documents":
        query = arguments["query"]
        limit = arguments.get("limit", 3)
        
        results = processor.search(query, limit)
        
        if not results:
            return "Ничего не найдено"
        
        response = "Найденные документы:\n\n"
        for i, result in enumerate(results, 1):
            response += f"{i}. {result['text'][:200]}...\n"
            response += f"   Файл: {result['file_path']}\n"
            response += f"   Сходство: {result['score']:.3f}\n\n"
        
        return response
    
    return f"Инструмент {name} не найден"


async def main():
    async with server.run() as server_handle:
        print("MCP сервер запущен")
        print("Используйте с Claude Desktop или Cursor")
        await server_handle.wait_closed()


if __name__ == "__main__":
    asyncio.run(main())
💡
MCP превращает ваш поисковик в инструмент для AI-ассистентов. Claude сможет искать по вашим документам прямо из чата. Подробнее про MCP читайте в статье про Ragex.

Запускаем все вместе

Создаем главный скрипт, который запускает все компоненты:

# run_all.py
import subprocess
import sys
import time
from threading import Thread

from file_monitor import start_monitoring


def run_api():
    """Запускаем API сервер."""
    subprocess.run([sys.executable, "api_server.py"])


def run_mcp():
    """Запускаем MCP сервер."""
    subprocess.run([sys.executable, "mcp_server.py"])


if __name__ == "__main__":
    print("Запускаем локальный RAG-поисковик...")
    print("1. Qdrant должен быть запущен на localhost:6333")
    print("2. API сервер будет на http://localhost:8000")
    print("3. Мониторинг директории './documents'")
    print("4. Для MCP: используйте отдельный терминал")
    print("\nНажмите Ctrl+C для остановки всех компонентов\n")
    
    # Запускаем API в отдельном потоке
    api_thread = Thread(target=run_api, daemon=True)
    api_thread.start()
    
    # Даем API время запуститься
    time.sleep(3)
    
    # Запускаем мониторинг (блокирующий вызов)
    try:
        start_monitoring()
    except KeyboardInterrupt:
        print("\nОстанавливаем систему...")
        sys.exit(0)

Ошибки, которые всех достали

ОшибкаПочему возникаетКак исправить
Qdrant падает при добавлении большого файлаНе хватает памяти для batch-операцийОграничьте размер чанков до 300 слов, обрабатывайте частями
Система обрабатывает один файл много разWatchdog ловит несколько событий при сохраненииУвеличьте задержку в _process_queue() до 5 секунд
Поиск возвращает нерелевантные результатыЧанки слишком большие или слишком маленькиеЭкспериментируйте с chunk_size от 200 до 800 слов
Модель эмбеддингов грузится по 30 секундИспользуется тяжелая модельВместо all-mpnet-base-v2 используйте all-MiniLM-L6-v2

Что делать дальше

Базовая система готова, но это только начало. Вот что можно улучшить:

  • Добавить кэширование - не вычислять эмбеддинги для одинаковых запросов
  • Поддержку большего количества форматов - Excel, PowerPoint, изображения с OCR
  • Умное чанкование - разбивать по семантическим границам, а не просто по словам
  • Гибридный поиск - комбинировать семантический поиск с полнотекстовым (используйте EmbeddingAdapters для экономии)

Если вам нужно что-то более мощное - посмотрите полное руководство по Agentic RAG. Там есть агенты, цепочки размышлений и сложные запросы.

Самое главное - эта система теперь ваша. Можете менять, ломать, улучшать. Она не зависит от OpenAI, не требует кредитной карты и работает даже без интернета. Попробуйте добавить туда свои документы и посмотрите, как она сама за ними следит. Первые пару дней кажется, что это магия. Потом привыкаешь. И уже не понимаешь, как жил без этого раньше.

P.S. Если система начнет слишком много есть (памяти или CPU) - не паникуйте. Просто ограничьте количество одновременно обрабатываемых файлов. И помните: даже самый умный поисковик не заменит кофе и 8 часов сна.