Забудьте про ручное обновление индекса
Представьте ситуацию: вы настроили RAG-систему для поиска по документации проекта. Через неделю разработчики обновили 50 файлов. Ваш индекс теперь как старый телефонный справочник - красиво, но бесполезно. Переиндексировать все вручную? Нет, спасибо.
Именно поэтому обычные RAG-системы раздражают. Они требуют постоянного внимания, как капризный кот. А мы сделаем систему, которая следит за изменениями сама. Без облаков, без API, без ежемесячных счетов.
Если вы ищете базовый RAG - прочтите RAG за 15 минут. Эта статья для тех, кому надоело перезапускать индексацию.
Что получится в итоге
Система, которая:
- Мониторит папку с документами в реальном времени
- Автоматически обновляет векторный индекс при любых изменениях
- Отвечает на вопросы на естественном языке
- Работает полностью локально (даже эмбеддинги)
- Не требует GPU или мощного железа
Архитектура: просто, но не примитивно
Мы не будем использовать LangChain - он слишком жирный для нашей задачи. Вместо этого соберем систему из легких компонентов:
| Компонент | Что делает | Альтернатива |
|---|---|---|
| Qdrant | Векторная база данных | Chroma, Weaviate |
| Sentence Transformers | Генерация эмбеддингов | OpenAI API (платно) |
| Watchdog | Отслеживание файлов | pyinotify (только Linux) |
| FastMCP | MCP-сервер для интеграции | Свой REST API |
1Устанавливаем зависимости
Сначала подготовим окружение. Не устанавливайте все в глобальное окружение - потом не разберетесь.
# Создаем виртуальное окружение
python -m venv rag_env
source rag_env/bin/activate # для Windows: rag_env\Scripts\activate
# Основные зависимости
pip install qdrant-client sentence-transformers watchdog fastapi uvicorn
# Для работы с документами
pip install pypdf python-docx markdown
# MCP сервер (опционально, но рекомендую)
pip install mcp2Запускаем Qdrant локально
Qdrant можно запустить через Docker или как бинарник. Docker проще, но требует больше памяти.
# Способ 1: Docker (рекомендую)
docker run -p 6333:6333 -p 6334:6334 \
-v $(pwd)/qdrant_storage:/qdrant/storage:z \
qdrant/qdrant
# Способ 2: Бинарник (если не любите Docker)
# Скачиваем с GitHub: https://github.com/qdrant/qdrant/releases
# Запускаем: ./qdrantПроверяем, что сервер работает: откройте http://localhost:6333/dashboard в браузере. Должна появиться админка Qdrant.
3Создаем ядро системы
Теперь самое интересное - пишем код. Начнем с обработчика документов.
# document_processor.py
import os
from pathlib import Path
from typing import List, Dict, Any
import hashlib
from datetime import datetime
from sentence_transformers import SentenceTransformer
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import pypdf
from docx import Document
import markdown
class DocumentProcessor:
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
"""Инициализация процессора документов."""
self.model = SentenceTransformer(model_name)
self.client = QdrantClient(host="localhost", port=6333)
self.collection_name = "documents"
# Создаем коллекцию, если ее нет
try:
self.client.get_collection(self.collection_name)
except Exception:
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=self.model.get_sentence_embedding_dimension(),
distance=Distance.COSINE
)
)
def _calculate_file_hash(self, file_path: str) -> str:
"""Рассчитываем хэш файла для отслеживания изменений."""
with open(file_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
def _extract_text(self, file_path: str) -> str:
"""Извлекаем текст из разных форматов файлов."""
ext = os.path.splitext(file_path)[1].lower()
if ext == ".pdf":
text = ""
with open(file_path, "rb") as f:
pdf_reader = pypdf.PdfReader(f)
for page in pdf_reader.pages:
text += page.extract_text() + "\n"
return text
elif ext == ".docx":
doc = Document(file_path)
return "\n".join([para.text for para in doc.paragraphs])
elif ext == ".md":
with open(file_path, "r", encoding="utf-8") as f:
md_text = f.read()
return markdown.markdown(md_text)
else: # Предполагаем текстовый файл
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
def _chunk_text(self, text: str, chunk_size: int = 500) -> List[str]:
"""Разбиваем текст на чанки."""
words = text.split()
chunks = []
current_chunk = []
current_length = 0
for word in words:
current_chunk.append(word)
current_length += len(word) + 1
if current_length >= chunk_size:
chunks.append(" ".join(current_chunk))
current_chunk = []
current_length = 0
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
def process_file(self, file_path: str) -> Dict[str, Any]:
"""Обрабатываем один файл и добавляем в Qdrant."""
file_hash = self._calculate_file_hash(file_path)
text = self._extract_text(file_path)
chunks = self._chunk_text(text)
if not chunks:
return {"status": "empty", "file": file_path}
# Генерируем эмбеддинги для всех чанков
embeddings = self.model.encode(chunks)
points = []
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
point_id = hashlib.md5(f"{file_path}:{i}:{file_hash}".encode()).hexdigest()
points.append(
PointStruct(
id=point_id,
vector=embedding.tolist(),
payload={
"text": chunk,
"file_path": file_path,
"chunk_index": i,
"file_hash": file_hash,
"timestamp": datetime.now().isoformat()
}
)
)
# Удаляем старые версии этого файла (если они есть)
self.client.delete(
collection_name=self.collection_name,
points_selector=self.client.models.FilterSelector(
filter=self.client.models.Filter(
must=[
self.client.models.FieldCondition(
key="file_path",
match=self.client.models.MatchValue(value=file_path)
)
]
)
)
)
# Добавляем новые точки
self.client.upsert(
collection_name=self.collection_name,
points=points
)
return {
"status": "success",
"file": file_path,
"chunks": len(chunks),
"hash": file_hash
}
def search(self, query: str, limit: int = 5) -> List[Dict[str, Any]]:
"""Ищем по запросу."""
query_embedding = self.model.encode(query).tolist()
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_embedding,
limit=limit
)
return [
{
"score": hit.score,
"text": hit.payload["text"],
"file_path": hit.payload["file_path"],
"chunk_index": hit.payload["chunk_index"]
}
for hit in results
]
def delete_file(self, file_path: str) -> bool:
"""Удаляем файл из индекса."""
try:
self.client.delete(
collection_name=self.collection_name,
points_selector=self.client.models.FilterSelector(
filter=self.client.models.Filter(
must=[
self.client.models.FieldCondition(
key="file_path",
match=self.client.models.MatchValue(value=file_path)
)
]
)
)
)
return True
except Exception as e:
print(f"Ошибка при удалении {file_path}: {e}")
return FalseОшибка, которую все делают: хранят только текст, но не хранят метаданные. Обязательно сохраняйте file_hash и timestamp - они нужны для отслеживания изменений.
4Добавляем мониторинг файлов
Теперь самое важное - автоматическое отслеживание изменений. Используем Watchdog.
# file_monitor.py
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from pathlib import Path
from typing import Set
import threading
from document_processor import DocumentProcessor
class FileChangeHandler(FileSystemEventHandler):
def __init__(self, processor: DocumentProcessor, watch_dir: str):
self.processor = processor
self.watch_dir = Path(watch_dir).resolve()
self.processing_queue = []
self.processing_lock = threading.Lock()
self.processed_files: Set[str] = set()
# Запускаем фоновый поток для обработки
self.worker_thread = threading.Thread(target=self._process_queue, daemon=True)
self.worker_thread.start()
def _should_process(self, file_path: str) -> bool:
"""Проверяем, нужно ли обрабатывать файл."""
path = Path(file_path)
# Игнорируем скрытые файлы и временные файлы
if path.name.startswith(".") or path.name.startswith("~"):
return False
# Поддерживаемые расширения
supported_extensions = {".txt", ".md", ".pdf", ".docx", ".rst", ".py", ".js", ".html"}
if path.suffix.lower() not in supported_extensions:
return False
return True
def on_modified(self, event):
"""Обрабатываем изменение файла."""
if event.is_directory:
return
if self._should_process(event.src_path):
with self.processing_lock:
if event.src_path not in self.processing_queue:
self.processing_queue.append(event.src_path)
def on_created(self, event):
"""Обрабатываем создание файла."""
self.on_modified(event)
def on_deleted(self, event):
"""Обрабатываем удаление файла."""
if event.is_directory:
return
if self._should_process(event.src_path):
print(f"Удаляем из индекса: {event.src_path}")
self.processor.delete_file(event.src_path)
def _process_queue(self):
"""Фоновый обработчик очереди файлов."""
while True:
time.sleep(2) # Ждем 2 секунды между обработками
with self.processing_lock:
if not self.processing_queue:
continue
file_path = self.processing_queue.pop(0)
try:
result = self.processor.process_file(file_path)
print(f"Обработан: {file_path} ({result['chunks']} чанков)")
except Exception as e:
print(f"Ошибка обработки {file_path}: {e}")
def start_monitoring(watch_dir: str = "./documents"):
"""Запускаем мониторинг директории."""
processor = DocumentProcessor()
event_handler = FileChangeHandler(processor, watch_dir)
observer = Observer()
observer.schedule(event_handler, watch_dir, recursive=True)
observer.start()
print(f"Мониторим директорию: {watch_dir}")
print("Нажмите Ctrl+C для остановки")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()Ключевой момент здесь - задержка в 2 секунды перед обработкой. Зачем? Чтобы не обрабатывать файл, пока он еще сохраняется. Представьте, что вы редактируете документ в VS Code - он сохраняет каждые 0.5 секунды. Без задержки система будет пытаться обработать файл 10 раз в секунду.
5Добавляем API для поиска
Теперь сделаем простой REST API, чтобы можно было искать через HTTP.
# api_server.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List
import uvicorn
from document_processor import DocumentProcessor
app = FastAPI(title="Local RAG Search API")
processor = DocumentProcessor()
class SearchRequest(BaseModel):
query: str
limit: int = 5
class SearchResult(BaseModel):
score: float
text: str
file_path: str
chunk_index: int
class SearchResponse(BaseModel):
results: List[SearchResult]
@app.post("/search", response_model=SearchResponse)
def search_documents(request: SearchRequest):
"""Поиск по документам."""
try:
results = processor.search(request.query, request.limit)
return SearchResponse(results=results)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/process")
def process_file(file_path: str):
"""Принудительная обработка файла."""
try:
result = processor.process_file(file_path)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/file/{file_path:path}")
def delete_file(file_path: str):
"""Удаление файла из индекса."""
success = processor.delete_file(file_path)
return {"success": success}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)6Интеграция с MCP (опционально, но круто)
MCP (Model Context Protocol) позволяет подключать ваш поисковик к Claude Desktop, Cursor и другим AI-ассистентам. Вот минимальный сервер:
# mcp_server.py
from mcp import Client, Server
import asyncio
from document_processor import DocumentProcessor
processor = DocumentProcessor()
server = Server("local-rag-search")
@server.list_tools()
async def list_tools():
return [
{
"name": "search_documents",
"description": "Поиск по локальным документам",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Поисковый запрос"
},
"limit": {
"type": "integer",
"description": "Количество результатов",
"default": 3
}
},
"required": ["query"]
}
}
]
@server.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "search_documents":
query = arguments["query"]
limit = arguments.get("limit", 3)
results = processor.search(query, limit)
if not results:
return "Ничего не найдено"
response = "Найденные документы:\n\n"
for i, result in enumerate(results, 1):
response += f"{i}. {result['text'][:200]}...\n"
response += f" Файл: {result['file_path']}\n"
response += f" Сходство: {result['score']:.3f}\n\n"
return response
return f"Инструмент {name} не найден"
async def main():
async with server.run() as server_handle:
print("MCP сервер запущен")
print("Используйте с Claude Desktop или Cursor")
await server_handle.wait_closed()
if __name__ == "__main__":
asyncio.run(main())Запускаем все вместе
Создаем главный скрипт, который запускает все компоненты:
# run_all.py
import subprocess
import sys
import time
from threading import Thread
from file_monitor import start_monitoring
def run_api():
"""Запускаем API сервер."""
subprocess.run([sys.executable, "api_server.py"])
def run_mcp():
"""Запускаем MCP сервер."""
subprocess.run([sys.executable, "mcp_server.py"])
if __name__ == "__main__":
print("Запускаем локальный RAG-поисковик...")
print("1. Qdrant должен быть запущен на localhost:6333")
print("2. API сервер будет на http://localhost:8000")
print("3. Мониторинг директории './documents'")
print("4. Для MCP: используйте отдельный терминал")
print("\nНажмите Ctrl+C для остановки всех компонентов\n")
# Запускаем API в отдельном потоке
api_thread = Thread(target=run_api, daemon=True)
api_thread.start()
# Даем API время запуститься
time.sleep(3)
# Запускаем мониторинг (блокирующий вызов)
try:
start_monitoring()
except KeyboardInterrupt:
print("\nОстанавливаем систему...")
sys.exit(0)Ошибки, которые всех достали
| Ошибка | Почему возникает | Как исправить |
|---|---|---|
| Qdrant падает при добавлении большого файла | Не хватает памяти для batch-операций | Ограничьте размер чанков до 300 слов, обрабатывайте частями |
| Система обрабатывает один файл много раз | Watchdog ловит несколько событий при сохранении | Увеличьте задержку в _process_queue() до 5 секунд |
| Поиск возвращает нерелевантные результаты | Чанки слишком большие или слишком маленькие | Экспериментируйте с chunk_size от 200 до 800 слов |
| Модель эмбеддингов грузится по 30 секунд | Используется тяжелая модель | Вместо all-mpnet-base-v2 используйте all-MiniLM-L6-v2 |
Что делать дальше
Базовая система готова, но это только начало. Вот что можно улучшить:
- Добавить кэширование - не вычислять эмбеддинги для одинаковых запросов
- Поддержку большего количества форматов - Excel, PowerPoint, изображения с OCR
- Умное чанкование - разбивать по семантическим границам, а не просто по словам
- Гибридный поиск - комбинировать семантический поиск с полнотекстовым (используйте EmbeddingAdapters для экономии)
Если вам нужно что-то более мощное - посмотрите полное руководство по Agentic RAG. Там есть агенты, цепочки размышлений и сложные запросы.
Самое главное - эта система теперь ваша. Можете менять, ломать, улучшать. Она не зависит от OpenAI, не требует кредитной карты и работает даже без интернета. Попробуйте добавить туда свои документы и посмотрите, как она сама за ними следит. Первые пару дней кажется, что это магия. Потом привыкаешь. И уже не понимаешь, как жил без этого раньше.
P.S. Если система начнет слишком много есть (памяти или CPU) - не паникуйте. Просто ограничьте количество одновременно обрабатываемых файлов. И помните: даже самый умный поисковик не заменит кофе и 8 часов сна.