RAG-пайплайн для 2+ млн страниц: архитектура, оптимизация, код 2026 | AiManual
AiManual Logo Ai / Manual.
11 Фев 2026 Гайд

RAG на 2 миллионах страниц: как не сломать всё и не разориться

Пошаговое руководство по построению RAG-системы на 2+ миллионах документов. Архитектура, оптимизация производительности, код и типичные ошибки.

Когда 2 миллиона документов перестают быть шуткой

В теории RAG звучит просто: берёшь тексты, векторизуешь, кладёшь в базу, ищешь похожее. Пока у тебя пара тысяч документов — всё работает. Потом появляется проект с 50 тысячами страниц. Ещё терпимо. А потом приходит заказчик с архивом в 2.3 миллиона PDF-страниц (реальный кейс с Epstein Files, но это неважно).

И вот тут начинается ад.

Твой красивый пайплайн на LangChain с одним GPU и локальным FAISS умирает на третьем часу работы. Векторизация идёт со скоростью 100 страниц в час. Диск забивается терабайтами промежуточных данных. Поиск работает 15 секунд. А заказчик хочет ответ за 2.

Знакомо? Давайте разберём, как строить RAG, который не сломается на реальных объёмах.

Важно: речь не про учебные примеры на 100 документах. Речь про продакшен, где ошибка стоит денег, а перезапуск пайплайна занимает дни.

Архитектура, которая не упадёт под нагрузкой

Первая ошибка — пытаться запихнуть всё в одну коробку. Один сервер, одна база, один скрипт. На 2 миллионах страниц это гарантированный провал.

1 Разделяй и властвуй: три независимых этапа

Правильная архитектура для больших объёмов:

Этап Задача Инструменты Почему отдельно
Извлечение и очистка PDF → текст, нормализация Apache Tika, Unstructured.io, custom parsers Требует CPU, много I/O, можно распараллелить на сотни ядер
Чанкинг и векторизация Текст → эмбеддинги SentenceTransformers, OpenAI/Bedrock API, NVIDIA NIM Требует GPU, дорого, нужна очередь и retry-логика
Индексация и поиск Эмбеддинги → поисковый индекс Qdrant, Weaviate, Elasticsearch + FAISS Работает в памяти, требует оптимизации под запросы

Ключевая мысль: эти этапы должны жить отдельно. Не пытайся сделать end-to-end пайплайн, который всё делает сразу. Поломка на любом этапе не должна требовать перезапуска всего.

Извлечение: где теряется 80% времени

PDF — это проклятие человечества. Особенно сканированные PDF с таблицами, колонками и кривыми шрифтами.

💡
На 2 миллионах страниц даже разница в 0.1 секунды на страницу даёт 55 часов дополнительной работы. Каждая оптимизация здесь окупается в разы.

2 Как НЕ надо делать извлечение текста

# ПЛОХО: последовательная обработка
for pdf_path in tqdm(pdf_files):
    text = extract_text(pdf_path)  # 2 секунды на файл
    cleaned = clean_text(text)     # ещё 0.5 секунды
    save_to_json(cleaned)          # I/O блокирует всё
# Итого: ~50 дней на 2 млн страниц

3 Как делать правильно: массовый параллелизм

# ХОРОШО: распределённая обработка
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
import PyPDF2  # или pdfplumber для сложных случаев

def process_pdf_batch(batch_files, output_dir):
    """Обрабатывает пачку PDF, пишет в отдельные файлы"""
    results = []
    for pdf_path in batch_files:
        try:
            with open(pdf_path, 'rb') as f:
                pdf = PyPDF2.PdfReader(f)
                text = ''
                for page in pdf.pages:
                    text += page.extract_text() + '\n\n'
                
                # Быстрая чистка без тяжёлых regex
                text = text.replace('\r', ' ').replace('\n\n\n', '\n\n')
                
                # Сохраняем сразу, не копим в памяти
                output_path = f"{output_dir}/{pdf_path.stem}.txt"
                with open(output_path, 'w', encoding='utf-8') as out_f:
                    out_f.write(text)
                    
        except Exception as e:
            logger.error(f"Failed {pdf_path}: {e}")
            continue
    return len(batch_files)

# Разбиваем на батчи по 1000 файлов
batch_size = 1000
batches = [pdf_files[i:i+batch_size] for i in range(0, len(pdf_files), batch_size)]

# Запускаем на всех ядрах
with ProcessPoolExecutor(max_workers=mp.cpu_count() - 2) as executor:
    futures = []
    for batch in batches:
        future = executor.submit(process_pdf_batch, batch, output_dir)
        futures.append(future)
    
    # Мониторим прогресс
    for future in tqdm(as_completed(futures), total=len(futures)):
        processed = future.result()
        logger.info(f"Processed batch: {processed} files")

Эта схема обработает 2 миллиона PDF за 2-3 дня на 32-ядерном сервере вместо 50. Ключевые моменты:

  • Каждый процесс пишет в свой файл — нет конфликтов
  • Батчи по 1000 файлов — оптимально для баланса между overhead и памятью
  • Ошибка в одном файле не ломает весь батч
  • Можно останавливать и продолжать с последнего батча

Чанкинг: почему размер имеет значение

Самая частая ошибка — резать текст на куски по 500 токенов без учёта структуры. Получается, что предложение обрывается на середине, таблица делится между чанками, а заголовок оказывается в одном чанке, а содержание — в другом.

На 2 миллионах страниц плохой чанкинг гарантирует низкое качество поиска. И исправить это потом будет стоить дорого — перевекторизовать всё заново.

4 Умный чанкинг с учётом семантики

from langchain.text_splitter import RecursiveCharacterTextSplitter
from semantic_text_splitter import TextSplitter  # альтернатива
import tiktoken  # для точного подсчёта токенов

def smart_chunking(text, max_tokens=512, overlap=50):
    """Разбивает текст с учётом границ предложений и абзацев"""
    
    # Сначала по абзацам (двойной перенос строки)
    paragraphs = text.split('\n\n')
    chunks = []
    current_chunk = ''
    
    encoder = tiktoken.get_encoding("cl100k_base")  # для GPT-4
    
    for paragraph in paragraphs:
        paragraph = paragraph.strip()
        if not paragraph:
            continue
            
        # Проверяем, не является ли paragraph заголовком
        if is_heading(paragraph):
            # Заголовок всегда идёт с следующим абзацем
            if current_chunk:
                chunks.append(current_chunk.strip())
            current_chunk = paragraph + '\n\n'
            continue
            
        # Считаем токены
        paragraph_tokens = len(encoder.encode(paragraph))
        current_tokens = len(encoder.encode(current_chunk))
        
        if current_tokens + paragraph_tokens <= max_tokens:
            current_chunk += paragraph + '\n\n'
        else:
            if current_chunk:
                chunks.append(current_chunk.strip())
            current_chunk = paragraph + '\n\n'
            
    if current_chunk:
        chunks.append(current_chunk.strip())
        
    # Добавляем оверлап между чанками для контекста
    if overlap > 0 and len(chunks) > 1:
        overlapped_chunks = []
        for i in range(len(chunks)):
            if i == 0:
                overlapped_chunks.append(chunks[i])
            else:
                # Берём последние overlap токенов из предыдущего чанка
                prev_tokens = encoder.encode(chunks[i-1])
                overlap_text = encoder.decode(prev_tokens[-overlap:])
                overlapped_chunks.append(overlap_text + '\n\n' + chunks[i])
        chunks = overlapped_chunks
        
    return chunks

def is_heading(text):
    """Эвристика для определения заголовков"""
    # Короткий текст (менее 50 символов)
    # Заканчивается двоеточием
    # Все слова с заглавной буквы (кроме предлогов)
    # Содержит цифры с точкой (1.2, 3.4.5)
    if len(text) < 50 and (text.endswith(':') or 
                           text.isupper() or 
                           any(char.isdigit() for char in text)):
        return True
    return False

Этот подход даёт на 20-30% более релевантные чанки, чем тупое разбиение по символам. Особенно важно для технической документации, где контекст критичен.

Векторизация: GPU не резиновые

2 миллиона чанков по 512 токенов — это примерно 1 миллиард токенов. Современные модели типа text-embedding-3-large (самая новая на февраль 2026) работают со скоростью 10-50 тысяч токенов в секунду на одном A100. Звучит быстро, пока не посчитаешь:

  • 1 млрд токенов / 50к токенов/сек = 20 000 секунд
  • Это 5.5 часов чистого времени инференса
  • На практике с overhead, загрузкой моделей, ошибками — 8-12 часов

И это на одном дорогом GPU. Если его нет, считай дни.

💡
Совет: для таких объёмов используй несколько GPU параллельно. Или сервисы вроде OpenAI/Bedrock — они дешевле, чем покупать свои A100, если векторизация разовая. Но считай стоимость: 1 млн токенов у OpenAI Embeddings стоит $0.13, итого $130 за весь датасет.

5 Надёжная векторизация с очередями и retry

import asyncio
import aiohttp
from tenacity import retry, stop_after_attempt, wait_exponential
import numpy as np
from typing import List
import logging

logger = logging.getLogger(__name__)

class BatchEmbedder:
    def __init__(self, model_name: str = "text-embedding-3-large", 
                 batch_size: int = 100, max_concurrent: int = 10):
        self.model_name = model_name
        self.batch_size = batch_size  # размер батча для API
        self.max_concurrent = max_concurrent  # параллельных запросов
        self.semaphore = asyncio.Semaphore(max_concurrent)
        
    @retry(stop=stop_after_attempt(3), 
           wait=wait_exponential(multiplier=1, min=4, max=60))
    async def embed_batch(self, session: aiohttp.ClientSession, 
                         texts: List[str]) -> List[List[float]]:
        """Векторизует батч текстов с retry логикой"""
        
        async with self.semaphore:
            payload = {
                "model": self.model_name,
                "input": texts,
                "encoding_format": "float"
            }
            
            try:
                async with session.post(
                    "https://api.openai.com/v1/embeddings",
                    json=payload,
                    headers={"Authorization": f"Bearer {API_KEY}"},
                    timeout=aiohttp.ClientTimeout(total=30)
                ) as response:
                    
                    if response.status == 429:
                        # Rate limit — ждём и ретраим
                        await asyncio.sleep(60)
                        raise Exception("Rate limit")
                        
                    if response.status != 200:
                        error_text = await response.text()
                        raise Exception(f"API error: {error_text}")
                        
                    data = await response.json()
                    return [item["embedding"] for item in data["data"]]
                    
            except Exception as e:
                logger.warning(f"Batch failed: {e}")
                raise  # для retry
    
    async def process_all(self, texts: List[str]) -> np.ndarray:
        """Обрабатывает все тексты, возвращает матрицу эмбеддингов"""
        
        # Разбиваем на батчи
        batches = [texts[i:i+self.batch_size] 
                  for i in range(0, len(texts), self.batch_size)]
        
        all_embeddings = []
        
        async with aiohttp.ClientSession() as session:
            tasks = []
            for batch in batches:
                task = self.embed_batch(session, batch)
                tasks.append(task)
            
            # Собираем результаты по мере готовности
            for i, task in enumerate(asyncio.as_completed(tasks)):
                try:
                    embeddings = await task
                    all_embeddings.extend(embeddings)
                    
                    # Прогресс каждые 10%
                    if i % max(1, len(batches) // 10) == 0:
                        logger.info(f"Processed {i}/{len(batches)} batches")
                        
                except Exception as e:
                    logger.error(f"Failed batch {i} after retries: {e}")
                    # Можно сохранить failed batch для повторной обработки
                    
        return np.array(all_embeddings)

# Использование:
async def main():
    embedder = BatchEmbedder(batch_size=100, max_concurrent=5)
    embeddings = await embedder.process_all(all_chunks)
    # embeddings.shape = (num_chunks, embedding_dim)

Эта система выдержит падение API, rate limits и сетевые проблемы. Главное — сохраняй прогресс на диск после каждого батча. Если всё упадёт на 90%, не придётся начинать сначала.

Индексация: выбор между быстрым и точным

Теперь у тебя 2 миллиона векторов по 3072 измерения (у text-embedding-3-large). Это примерно 24 ГБ в float32. Плюс метаданные. Плюс индексы для быстрого поиска.

Варианты:

Решение Плюсы Минусы Для 2 млн векторов
FAISS (Flat) Точность 100%, простота Медленно (O(n)), много памяти Не подходит, поиск за секунды
FAISS (IVF) Быстро, умеренная точность Нужно обучать кластеры Оптимально, 10-50ms на запрос
Qdrant/Weaviate Готовое решение, фильтры, масштабирование Дополнительная инфраструктура Лучше для продакшена
HNSW (через annoy) Очень быстро, хорошая точность Много памяти, долгое построение Хорошо, если память есть

Мой выбор для таких объёмов — FAISS IVF с гибридным поиском. Почему? Потому что pure векторный поиск на 2 миллионах документов часто даёт релевантные, но не точные результаты. Добавь BM25 для текстового поиска — и точность подскочит на 30-40%.

6 Оптимизированный FAISS индекс с фильтрацией

import faiss
import numpy as np
import pickle
from pathlib import Path

class OptimizedFAISSIndex:
    def __init__(self, dimension=3072, nlist=4096):
        """
        dimension: размерность эмбеддингов
        nlist: количество кластеров для IVF (больше = точнее, но медленнее)
        Для 2 млн векторов: 4096-8192 кластеров оптимально
        """
        self.dimension = dimension
        self.nlist = nlist
        
        # IVF индекс с квантованием (экономит память)
        self.quantizer = faiss.IndexFlatIP(dimension)
        self.index = faiss.IndexIVFFlat(self.quantizer, dimension, nlist, faiss.METRIC_INNER_PRODUCT)
        
        # Для метаданных (id документа, позиция чанка и т.д.)
        self.metadata = []
        
    def train_index(self, sample_vectors: np.ndarray):
        """Обучение на подвыборке (1-5% данных)"""
        # FAISS требует нормализованные векторы для косинусного сходства
        faiss.normalize_L2(sample_vectors)
        
        # Обучение кластеров
        self.index.train(sample_vectors)
        print(f"Index trained on {len(sample_vectors)} vectors")
        
    def add_vectors(self, vectors: np.ndarray, metadata_list: list):
        """Добавление векторов с метаданными"""
        if len(vectors) != len(metadata_list):
            raise ValueError("Vectors and metadata count mismatch")
            
        # Нормализация
        faiss.normalize_L2(vectors)
        
        # Добавление в индекс
        self.index.add(vectors)
        self.metadata.extend(metadata_list)
        
        print(f"Added {len(vectors)} vectors, total: {self.index.ntotal}")
        
    def search(self, query_vector: np.ndarray, k: int = 10, 
               doc_filter: list = None) -> tuple:
        """
        Поиск с возможной фильтрацией по документам
        query_vector: эмбеддинг запроса
        k: сколько результатов вернуть
        doc_filter: список разрешённых doc_id (опционально)
        """
        # Нормализация запроса
        query_vector = query_vector.reshape(1, -1)
        faiss.normalize_L2(query_vector)
        
        # Если есть фильтр — используем поиск с маской
        if doc_filter is not None:
            # Преобразуем фильтр в битовую маску FAISS
            # (упрощённо, в реальности сложнее)
            return self._search_with_filter(query_vector, k, doc_filter)
        
        # Обычный поиск
        distances, indices = self.index.search(query_vector, k)
        
        # Извлекаем метаданные
        results = []
        for dist, idx in zip(distances[0], indices[0]):
            if idx != -1:  # -1 означает отсутствие результата
                results.append({
                    'metadata': self.metadata[idx],
                    'score': float(dist),
                    'index': int(idx)
                })
                
        return results
    
    def save(self, path: Path):
        """Сохранение индекса и метаданных"""
        # Сохраняем индекс FAISS
        faiss.write_index(self.index, str(path / "index.faiss"))
        
        # Сохраняем метаданные отдельно
        with open(path / "metadata.pkl", 'wb') as f:
            pickle.dump(self.metadata, f)
        
        print(f"Index saved to {path}")
        
    def load(self, path: Path):
        """Загрузка индекса"""
        self.index = faiss.read_index(str(path / "index.faiss"))
        
        with open(path / "metadata.pkl", 'rb') as f:
            self.metadata = pickle.load(f)
        
        print(f"Index loaded from {path}, vectors: {self.index.ntotal}")

# Пример использования:
index = OptimizedFAISSIndex(dimension=3072, nlist=4096)

# Обучаем на 50k случайных векторов (2.5% от 2 млн)
train_sample = embeddings[np.random.choice(len(embeddings), 50000, replace=False)]
index.train_index(train_sample)

# Добавляем батчами по 100k (чтобы не переполнять память)
for i in range(0, len(embeddings), 100000):
    batch = embeddings[i:i+100000]
    meta_batch = metadata_list[i:i+100000]
    index.add_vectors(batch, meta_batch)
    
# Сохраняем
index.save(Path("/data/faiss_index"))

Типичные ошибки, которые сломают твой пайплайн

Эти ошибки я видел в десятках проектов. Учись на чужих, а не на своих.

Ошибка 1: Хранение всего в памяти

2 миллиона чанков по 500 токенов — это примерно 50 ГБ текста. Плюс эмбеддинги — ещё 24 ГБ. Плюс индексы. Итого 80+ ГБ RAM. Решение: обрабатывай батчами, сбрасывай на диск после каждого этапа.

Ошибка 2: Отсутствие idempotency

Если пайплайн упал на 90%, ты должен продолжить с этого места, а не начинать сначала. Добавляй checkpoint'ы после каждого батча. Сохраняй прогресс в простой файл.

Ошибка 3: Игнорирование дубликатов

В 2 миллионах документов обязательно будут дубликаты или почти дубликаты. Они засоряют индекс и ухудшают поиск. Добавь дедупликацию по simhash или MinHash на этапе очистки.

Ошибка 4: Один размер чанков для всех документов

Технические спецификации и художественные тексты требуют разного подхода. Для первых — мелкие чанки с точной информацией. Для вторых — крупные, чтобы сохранить контекст. Определяй тип документа и адаптируй чанкинг.

Ошибка 5: Поиск только по векторам

На больших объёмах pure векторный поиск начинает «производить шум». Добавляй гибридный подход с текстовым поиском (BM25/Elasticsearch). Релевантность вырастет значительно.

Что дальше? RAG — это только начало

Когда ты построил работающий RAG на 2 миллионах документов, появляются новые вопросы:

  • Как обновлять индекс, когда добавляются новые документы? (Подсказка: не перестраивать полностью)
  • Как оценивать качество поиска? Precision/recall на таком объёме считаются неделями
  • Как оптимизировать стоимость инференса LLM для генерации ответов?
  • Как добавить мультимодальность (поиск по изображениям в PDF)?

Ответы на эти вопросы — в дорожной карте RAG на 2026 год. Там разобраны продвинутые техники вроде агентов, графов знаний и оптимизации под разные типы запросов.

Главный урок: масштабирование RAG — это не про добавление больше GPU. Это про архитектурные решения, которые позволяют системе расти без полной перестройки. Разделяй этапы, добавляй контрольные точки, проектируй с учётом сбоев.

И да, 2 миллиона документов — это ещё не предел. Встречал системы на 50+ миллионах. Но это уже совсем другая история.