Когда 2 миллиона документов перестают быть шуткой
В теории RAG звучит просто: берёшь тексты, векторизуешь, кладёшь в базу, ищешь похожее. Пока у тебя пара тысяч документов — всё работает. Потом появляется проект с 50 тысячами страниц. Ещё терпимо. А потом приходит заказчик с архивом в 2.3 миллиона PDF-страниц (реальный кейс с Epstein Files, но это неважно).
И вот тут начинается ад.
Твой красивый пайплайн на LangChain с одним GPU и локальным FAISS умирает на третьем часу работы. Векторизация идёт со скоростью 100 страниц в час. Диск забивается терабайтами промежуточных данных. Поиск работает 15 секунд. А заказчик хочет ответ за 2.
Знакомо? Давайте разберём, как строить RAG, который не сломается на реальных объёмах.
Важно: речь не про учебные примеры на 100 документах. Речь про продакшен, где ошибка стоит денег, а перезапуск пайплайна занимает дни.
Архитектура, которая не упадёт под нагрузкой
Первая ошибка — пытаться запихнуть всё в одну коробку. Один сервер, одна база, один скрипт. На 2 миллионах страниц это гарантированный провал.
1 Разделяй и властвуй: три независимых этапа
Правильная архитектура для больших объёмов:
| Этап | Задача | Инструменты | Почему отдельно |
|---|---|---|---|
| Извлечение и очистка | PDF → текст, нормализация | Apache Tika, Unstructured.io, custom parsers | Требует CPU, много I/O, можно распараллелить на сотни ядер |
| Чанкинг и векторизация | Текст → эмбеддинги | SentenceTransformers, OpenAI/Bedrock API, NVIDIA NIM | Требует GPU, дорого, нужна очередь и retry-логика |
| Индексация и поиск | Эмбеддинги → поисковый индекс | Qdrant, Weaviate, Elasticsearch + FAISS | Работает в памяти, требует оптимизации под запросы |
Ключевая мысль: эти этапы должны жить отдельно. Не пытайся сделать end-to-end пайплайн, который всё делает сразу. Поломка на любом этапе не должна требовать перезапуска всего.
Извлечение: где теряется 80% времени
PDF — это проклятие человечества. Особенно сканированные PDF с таблицами, колонками и кривыми шрифтами.
2 Как НЕ надо делать извлечение текста
# ПЛОХО: последовательная обработка
for pdf_path in tqdm(pdf_files):
text = extract_text(pdf_path) # 2 секунды на файл
cleaned = clean_text(text) # ещё 0.5 секунды
save_to_json(cleaned) # I/O блокирует всё
# Итого: ~50 дней на 2 млн страниц
3 Как делать правильно: массовый параллелизм
# ХОРОШО: распределённая обработка
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
import PyPDF2 # или pdfplumber для сложных случаев
def process_pdf_batch(batch_files, output_dir):
"""Обрабатывает пачку PDF, пишет в отдельные файлы"""
results = []
for pdf_path in batch_files:
try:
with open(pdf_path, 'rb') as f:
pdf = PyPDF2.PdfReader(f)
text = ''
for page in pdf.pages:
text += page.extract_text() + '\n\n'
# Быстрая чистка без тяжёлых regex
text = text.replace('\r', ' ').replace('\n\n\n', '\n\n')
# Сохраняем сразу, не копим в памяти
output_path = f"{output_dir}/{pdf_path.stem}.txt"
with open(output_path, 'w', encoding='utf-8') as out_f:
out_f.write(text)
except Exception as e:
logger.error(f"Failed {pdf_path}: {e}")
continue
return len(batch_files)
# Разбиваем на батчи по 1000 файлов
batch_size = 1000
batches = [pdf_files[i:i+batch_size] for i in range(0, len(pdf_files), batch_size)]
# Запускаем на всех ядрах
with ProcessPoolExecutor(max_workers=mp.cpu_count() - 2) as executor:
futures = []
for batch in batches:
future = executor.submit(process_pdf_batch, batch, output_dir)
futures.append(future)
# Мониторим прогресс
for future in tqdm(as_completed(futures), total=len(futures)):
processed = future.result()
logger.info(f"Processed batch: {processed} files")
Эта схема обработает 2 миллиона PDF за 2-3 дня на 32-ядерном сервере вместо 50. Ключевые моменты:
- Каждый процесс пишет в свой файл — нет конфликтов
- Батчи по 1000 файлов — оптимально для баланса между overhead и памятью
- Ошибка в одном файле не ломает весь батч
- Можно останавливать и продолжать с последнего батча
Чанкинг: почему размер имеет значение
Самая частая ошибка — резать текст на куски по 500 токенов без учёта структуры. Получается, что предложение обрывается на середине, таблица делится между чанками, а заголовок оказывается в одном чанке, а содержание — в другом.
На 2 миллионах страниц плохой чанкинг гарантирует низкое качество поиска. И исправить это потом будет стоить дорого — перевекторизовать всё заново.
4 Умный чанкинг с учётом семантики
from langchain.text_splitter import RecursiveCharacterTextSplitter
from semantic_text_splitter import TextSplitter # альтернатива
import tiktoken # для точного подсчёта токенов
def smart_chunking(text, max_tokens=512, overlap=50):
"""Разбивает текст с учётом границ предложений и абзацев"""
# Сначала по абзацам (двойной перенос строки)
paragraphs = text.split('\n\n')
chunks = []
current_chunk = ''
encoder = tiktoken.get_encoding("cl100k_base") # для GPT-4
for paragraph in paragraphs:
paragraph = paragraph.strip()
if not paragraph:
continue
# Проверяем, не является ли paragraph заголовком
if is_heading(paragraph):
# Заголовок всегда идёт с следующим абзацем
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = paragraph + '\n\n'
continue
# Считаем токены
paragraph_tokens = len(encoder.encode(paragraph))
current_tokens = len(encoder.encode(current_chunk))
if current_tokens + paragraph_tokens <= max_tokens:
current_chunk += paragraph + '\n\n'
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = paragraph + '\n\n'
if current_chunk:
chunks.append(current_chunk.strip())
# Добавляем оверлап между чанками для контекста
if overlap > 0 and len(chunks) > 1:
overlapped_chunks = []
for i in range(len(chunks)):
if i == 0:
overlapped_chunks.append(chunks[i])
else:
# Берём последние overlap токенов из предыдущего чанка
prev_tokens = encoder.encode(chunks[i-1])
overlap_text = encoder.decode(prev_tokens[-overlap:])
overlapped_chunks.append(overlap_text + '\n\n' + chunks[i])
chunks = overlapped_chunks
return chunks
def is_heading(text):
"""Эвристика для определения заголовков"""
# Короткий текст (менее 50 символов)
# Заканчивается двоеточием
# Все слова с заглавной буквы (кроме предлогов)
# Содержит цифры с точкой (1.2, 3.4.5)
if len(text) < 50 and (text.endswith(':') or
text.isupper() or
any(char.isdigit() for char in text)):
return True
return False
Этот подход даёт на 20-30% более релевантные чанки, чем тупое разбиение по символам. Особенно важно для технической документации, где контекст критичен.
Векторизация: GPU не резиновые
2 миллиона чанков по 512 токенов — это примерно 1 миллиард токенов. Современные модели типа text-embedding-3-large (самая новая на февраль 2026) работают со скоростью 10-50 тысяч токенов в секунду на одном A100. Звучит быстро, пока не посчитаешь:
- 1 млрд токенов / 50к токенов/сек = 20 000 секунд
- Это 5.5 часов чистого времени инференса
- На практике с overhead, загрузкой моделей, ошибками — 8-12 часов
И это на одном дорогом GPU. Если его нет, считай дни.
5 Надёжная векторизация с очередями и retry
import asyncio
import aiohttp
from tenacity import retry, stop_after_attempt, wait_exponential
import numpy as np
from typing import List
import logging
logger = logging.getLogger(__name__)
class BatchEmbedder:
def __init__(self, model_name: str = "text-embedding-3-large",
batch_size: int = 100, max_concurrent: int = 10):
self.model_name = model_name
self.batch_size = batch_size # размер батча для API
self.max_concurrent = max_concurrent # параллельных запросов
self.semaphore = asyncio.Semaphore(max_concurrent)
@retry(stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=60))
async def embed_batch(self, session: aiohttp.ClientSession,
texts: List[str]) -> List[List[float]]:
"""Векторизует батч текстов с retry логикой"""
async with self.semaphore:
payload = {
"model": self.model_name,
"input": texts,
"encoding_format": "float"
}
try:
async with session.post(
"https://api.openai.com/v1/embeddings",
json=payload,
headers={"Authorization": f"Bearer {API_KEY}"},
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status == 429:
# Rate limit — ждём и ретраим
await asyncio.sleep(60)
raise Exception("Rate limit")
if response.status != 200:
error_text = await response.text()
raise Exception(f"API error: {error_text}")
data = await response.json()
return [item["embedding"] for item in data["data"]]
except Exception as e:
logger.warning(f"Batch failed: {e}")
raise # для retry
async def process_all(self, texts: List[str]) -> np.ndarray:
"""Обрабатывает все тексты, возвращает матрицу эмбеддингов"""
# Разбиваем на батчи
batches = [texts[i:i+self.batch_size]
for i in range(0, len(texts), self.batch_size)]
all_embeddings = []
async with aiohttp.ClientSession() as session:
tasks = []
for batch in batches:
task = self.embed_batch(session, batch)
tasks.append(task)
# Собираем результаты по мере готовности
for i, task in enumerate(asyncio.as_completed(tasks)):
try:
embeddings = await task
all_embeddings.extend(embeddings)
# Прогресс каждые 10%
if i % max(1, len(batches) // 10) == 0:
logger.info(f"Processed {i}/{len(batches)} batches")
except Exception as e:
logger.error(f"Failed batch {i} after retries: {e}")
# Можно сохранить failed batch для повторной обработки
return np.array(all_embeddings)
# Использование:
async def main():
embedder = BatchEmbedder(batch_size=100, max_concurrent=5)
embeddings = await embedder.process_all(all_chunks)
# embeddings.shape = (num_chunks, embedding_dim)
Эта система выдержит падение API, rate limits и сетевые проблемы. Главное — сохраняй прогресс на диск после каждого батча. Если всё упадёт на 90%, не придётся начинать сначала.
Индексация: выбор между быстрым и точным
Теперь у тебя 2 миллиона векторов по 3072 измерения (у text-embedding-3-large). Это примерно 24 ГБ в float32. Плюс метаданные. Плюс индексы для быстрого поиска.
Варианты:
| Решение | Плюсы | Минусы | Для 2 млн векторов |
|---|---|---|---|
| FAISS (Flat) | Точность 100%, простота | Медленно (O(n)), много памяти | Не подходит, поиск за секунды |
| FAISS (IVF) | Быстро, умеренная точность | Нужно обучать кластеры | Оптимально, 10-50ms на запрос |
| Qdrant/Weaviate | Готовое решение, фильтры, масштабирование | Дополнительная инфраструктура | Лучше для продакшена |
| HNSW (через annoy) | Очень быстро, хорошая точность | Много памяти, долгое построение | Хорошо, если память есть |
Мой выбор для таких объёмов — FAISS IVF с гибридным поиском. Почему? Потому что pure векторный поиск на 2 миллионах документов часто даёт релевантные, но не точные результаты. Добавь BM25 для текстового поиска — и точность подскочит на 30-40%.
6 Оптимизированный FAISS индекс с фильтрацией
import faiss
import numpy as np
import pickle
from pathlib import Path
class OptimizedFAISSIndex:
def __init__(self, dimension=3072, nlist=4096):
"""
dimension: размерность эмбеддингов
nlist: количество кластеров для IVF (больше = точнее, но медленнее)
Для 2 млн векторов: 4096-8192 кластеров оптимально
"""
self.dimension = dimension
self.nlist = nlist
# IVF индекс с квантованием (экономит память)
self.quantizer = faiss.IndexFlatIP(dimension)
self.index = faiss.IndexIVFFlat(self.quantizer, dimension, nlist, faiss.METRIC_INNER_PRODUCT)
# Для метаданных (id документа, позиция чанка и т.д.)
self.metadata = []
def train_index(self, sample_vectors: np.ndarray):
"""Обучение на подвыборке (1-5% данных)"""
# FAISS требует нормализованные векторы для косинусного сходства
faiss.normalize_L2(sample_vectors)
# Обучение кластеров
self.index.train(sample_vectors)
print(f"Index trained on {len(sample_vectors)} vectors")
def add_vectors(self, vectors: np.ndarray, metadata_list: list):
"""Добавление векторов с метаданными"""
if len(vectors) != len(metadata_list):
raise ValueError("Vectors and metadata count mismatch")
# Нормализация
faiss.normalize_L2(vectors)
# Добавление в индекс
self.index.add(vectors)
self.metadata.extend(metadata_list)
print(f"Added {len(vectors)} vectors, total: {self.index.ntotal}")
def search(self, query_vector: np.ndarray, k: int = 10,
doc_filter: list = None) -> tuple:
"""
Поиск с возможной фильтрацией по документам
query_vector: эмбеддинг запроса
k: сколько результатов вернуть
doc_filter: список разрешённых doc_id (опционально)
"""
# Нормализация запроса
query_vector = query_vector.reshape(1, -1)
faiss.normalize_L2(query_vector)
# Если есть фильтр — используем поиск с маской
if doc_filter is not None:
# Преобразуем фильтр в битовую маску FAISS
# (упрощённо, в реальности сложнее)
return self._search_with_filter(query_vector, k, doc_filter)
# Обычный поиск
distances, indices = self.index.search(query_vector, k)
# Извлекаем метаданные
results = []
for dist, idx in zip(distances[0], indices[0]):
if idx != -1: # -1 означает отсутствие результата
results.append({
'metadata': self.metadata[idx],
'score': float(dist),
'index': int(idx)
})
return results
def save(self, path: Path):
"""Сохранение индекса и метаданных"""
# Сохраняем индекс FAISS
faiss.write_index(self.index, str(path / "index.faiss"))
# Сохраняем метаданные отдельно
with open(path / "metadata.pkl", 'wb') as f:
pickle.dump(self.metadata, f)
print(f"Index saved to {path}")
def load(self, path: Path):
"""Загрузка индекса"""
self.index = faiss.read_index(str(path / "index.faiss"))
with open(path / "metadata.pkl", 'rb') as f:
self.metadata = pickle.load(f)
print(f"Index loaded from {path}, vectors: {self.index.ntotal}")
# Пример использования:
index = OptimizedFAISSIndex(dimension=3072, nlist=4096)
# Обучаем на 50k случайных векторов (2.5% от 2 млн)
train_sample = embeddings[np.random.choice(len(embeddings), 50000, replace=False)]
index.train_index(train_sample)
# Добавляем батчами по 100k (чтобы не переполнять память)
for i in range(0, len(embeddings), 100000):
batch = embeddings[i:i+100000]
meta_batch = metadata_list[i:i+100000]
index.add_vectors(batch, meta_batch)
# Сохраняем
index.save(Path("/data/faiss_index"))
Типичные ошибки, которые сломают твой пайплайн
Эти ошибки я видел в десятках проектов. Учись на чужих, а не на своих.
Ошибка 1: Хранение всего в памяти
2 миллиона чанков по 500 токенов — это примерно 50 ГБ текста. Плюс эмбеддинги — ещё 24 ГБ. Плюс индексы. Итого 80+ ГБ RAM. Решение: обрабатывай батчами, сбрасывай на диск после каждого этапа.
Ошибка 2: Отсутствие idempotency
Если пайплайн упал на 90%, ты должен продолжить с этого места, а не начинать сначала. Добавляй checkpoint'ы после каждого батча. Сохраняй прогресс в простой файл.
Ошибка 3: Игнорирование дубликатов
В 2 миллионах документов обязательно будут дубликаты или почти дубликаты. Они засоряют индекс и ухудшают поиск. Добавь дедупликацию по simhash или MinHash на этапе очистки.
Ошибка 4: Один размер чанков для всех документов
Технические спецификации и художественные тексты требуют разного подхода. Для первых — мелкие чанки с точной информацией. Для вторых — крупные, чтобы сохранить контекст. Определяй тип документа и адаптируй чанкинг.
Ошибка 5: Поиск только по векторам
На больших объёмах pure векторный поиск начинает «производить шум». Добавляй гибридный подход с текстовым поиском (BM25/Elasticsearch). Релевантность вырастет значительно.
Что дальше? RAG — это только начало
Когда ты построил работающий RAG на 2 миллионах документов, появляются новые вопросы:
- Как обновлять индекс, когда добавляются новые документы? (Подсказка: не перестраивать полностью)
- Как оценивать качество поиска? Precision/recall на таком объёме считаются неделями
- Как оптимизировать стоимость инференса LLM для генерации ответов?
- Как добавить мультимодальность (поиск по изображениям в PDF)?
Ответы на эти вопросы — в дорожной карте RAG на 2026 год. Там разобраны продвинутые техники вроде агентов, графов знаний и оптимизации под разные типы запросов.
Главный урок: масштабирование RAG — это не про добавление больше GPU. Это про архитектурные решения, которые позволяют системе расти без полной перестройки. Разделяй этапы, добавляй контрольные точки, проектируй с учётом сбоев.
И да, 2 миллиона документов — это ещё не предел. Встречал системы на 50+ миллионах. Но это уже совсем другая история.