Почему ваш RAG не понимает email-переписку
Вы загрузили тысячу email-тредов в векторную базу. Задали вопрос: "Какое решение приняли по проекту Альфа?". И получили в ответ цитату из пятого письма в цепочке, где кто-то спрашивает: "А пиццу на обед закажем?".
Знакомо? Это не баг вашей LLM. Это фундаментальный провал стандартного RAG подхода при работе со структурированными диалогами. Email-тред — это не документ. Это граф. Дерево. Динамическая система сообщений, где контекст рождается в отношениях между письмами, а не в их содержимом.
Стандартный чанкинг по символам или предложениям разрушает структуру треда. Вы теряете самое ценное: последовательность диалога, причинно-следственные связи, ветвление обсуждений.
Как НЕ надо делать: три смертельных ошибки
Прежде чем покажу работающее решение, давайте разберем типичные косяки. Я видел их в десятках проектов.
1Чанкинг по количеству токенов
Берете письмо, делите на куски по 512 токенов, отправляете в эмбеддинг. Результат? Ответ на вопрос в одном чанке, сам вопрос — в другом. Поиск по векторам находит ответ, но без контекста вопроса он бессмысленен.
2Игнорирование метаданных
Message-ID, References, In-Reply-To — это не технический мусор. Это каркас, на котором держится вся структура треда. Отбросили их — получили мешок писем без связей.
3Наивная очистка текста
Удаляете все "Re: Re: Re:", подписи, цитаты предыдущих писем. Кажется логичным? А теперь у вас нет ответов на конкретные вопросы. Нет цепочек "вопрос-ответ".
Строим граф: Message-ID как фундамент
Все начинается с метаданных. Каждое письмо в треде содержит уникальный Message-ID и ссылки на предыдущие сообщения через References или In-Reply-To. Это готовый граф. Нужно только его построить.
import email
from email.utils import parsedate_to_datetime
from collections import defaultdict
class EmailThreadGraph:
def __init__(self):
self.nodes = {} # Message-ID -> email data
self.edges = defaultdict(list) # Message-ID -> list of child IDs
def add_email(self, raw_email):
msg = email.message_from_bytes(raw_email)
msg_id = msg['Message-ID'].strip('<>')
# Extract references
refs = []
if msg['References']:
refs = [r.strip(' <>') for r in msg['References'].split()]
elif msg['In-Reply-To']:
refs = [msg['In-Reply-To'].strip(' <>')]
# Store node
self.nodes[msg_id] = {
'subject': msg['Subject'],
'from': msg['From'],
'date': parsedate_to_datetime(msg['Date']),
'body': self._extract_body(msg),
'raw_headers': dict(msg.items())
}
# Add edges (from parent to current)
for ref in refs:
if ref in self.nodes:
self.edges[ref].append(msg_id)
return msg_id
def _extract_body(self, msg):
# Simplified body extraction
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == 'text/plain':
return part.get_payload(decode=True).decode('utf-8', errors='ignore')
else:
return msg.get_payload(decode=True).decode('utf-8', errors='ignore')
return ''
def get_thread_tree(self, root_msg_id):
"""Get all emails in a thread as a tree structure"""
visited = set()
tree = []
def dfs(msg_id, depth=0):
if msg_id in visited:
return
visited.add(msg_id)
email_data = self.nodes[msg_id].copy()
email_data['depth'] = depth
email_data['children'] = []
for child_id in self.edges.get(msg_id, []):
child_data = dfs(child_id, depth + 1)
if child_data:
email_data['children'].append(child_data)
return email_data
return dfs(root_msg_id)
Этот код строит дерево переписки. Не просто список писем в хронологическом порядке, а именно дерево с ветвлениями. Потому что в реальных тредах часто возникают параллельные обсуждения: кто-то отвечает на первое письмо, кто-то — на третье.
Умный чанкинг: сохраняем контекстные связи
Теперь, когда у нас есть граф, можно думать о чанкинге. Но не того типа, который режет текст на куски. Нам нужен контекстный чанкинг.
Основная идея: чанк — это не кусок текста. Чанк — это логическая единица диалога. Например:
- Вопрос и все ответы на него
- Предложение и его обсуждение
- Решение и аргументы за/против
Вот как это работает на практике:
class ContextualChunker:
def __init__(self, max_tokens=1000):
self.max_tokens = max_tokens
def chunk_thread(self, thread_tree):
"""Convert thread tree into contextual chunks"""
chunks = []
def process_node(node, current_chunk=None):
if current_chunk is None:
current_chunk = {
'messages': [],
'token_count': 0,
'subjects': set(),
'participants': set()
}
# Estimate tokens (simplified)
msg_tokens = len(node['body'].split()) * 1.3
# Check if we should start new chunk
if current_chunk['token_count'] + msg_tokens > self.max_tokens:
if current_chunk['messages']:
chunks.append(self._finalize_chunk(current_chunk))
current_chunk = {
'messages': [],
'token_count': 0,
'subjects': set(),
'participants': set()
}
# Add message to chunk
current_chunk['messages'].append({
'id': node.get('message_id'),
'from': node['from'],
'body': node['body'],
'depth': node['depth']
})
current_chunk['token_count'] += msg_tokens
current_chunk['subjects'].add(node.get('subject', ''))
current_chunk['participants'].add(node['from'])
# Process children (replies)
for child in node.get('children', []):
process_node(child, current_chunk)
return current_chunk
final_chunk = process_node(thread_tree)
if final_chunk['messages']:
chunks.append(self._finalize_chunk(final_chunk))
return chunks
def _finalize_chunk(self, chunk_data):
"""Create final chunk with metadata"""
# Combine messages with context markers
combined_text = []
for msg in chunk_data['messages']:
indent = " " * msg['depth']
combined_text.append(f"{indent}[{msg['from']}]: {msg['body']}")
return {
'text': '\n'.join(combined_text),
'metadata': {
'subjects': list(chunk_data['subjects']),
'participants': list(chunk_data['participants']),
'message_count': len(chunk_data['messages']),
'depth_range': (min(m['depth'] for m in chunk_data['messages']),
max(m['depth'] for m in chunk_data['messages']))
}
}
Ключевое отличие от обычного чанкинга: мы группируем сообщения по их положению в дереве. Ответы остаются вместе с вопросами. Ветки обсуждения не разрываются на границах чанков.
Извлечение решений: ищем не слова, а структуры
Самая ценная информация в email-тредах — это принятые решения. Но они редко формулируются явно. Обычно это выглядит так:
- Алексей: "Предлагаю использовать PostgreSQL"
- Мария: "А почему не MongoDB?"
- Иван: "У нас уже есть экспертиза по Postgres"
- Алексей: "Тогда давайте PostgreSQL, версию 16"
- Мария: "Согласна"
Решение здесь — "Использовать PostgreSQL версии 16". Но в тексте этой фразы нет. Решение рождается из диалога.
Для извлечения таких решений нужен двухуровневый подход:
1Структурный анализ
Ищем паттерны в дереве переписки:
def extract_decision_patterns(thread_tree):
patterns = []
def analyze_node(node):
# Pattern 1: Proposal -> Discussion -> Agreement
if len(node.get('children', [])) >= 2:
# Check if first child is question/challenge
# and last child is agreement
children_texts = [c['body'].lower() for c in node['children']]
has_question = any(q_word in text
for text in children_texts
for q_word in ['почему', 'зачем', 'а если', '?'])
has_agreement = any(agree_word in children_texts[-1]
for agree_word in ['соглас', 'ок', 'давайте', 'принято'])
if has_question and has_agreement:
patterns.append({
'type': 'proposal_decision',
'proposal': node['body'],
'decision_point': children_texts[-1]
})
# Pattern 2: Question -> Multiple answers -> Summary
# ... additional pattern detection
for child in node.get('children', []):
analyze_node(child)
analyze_node(thread_tree)
return patterns
2Семантический анализ с LLM
После структурного анализа отправляем потенциальные решения в LLM для валидации и формулировки:
import openai
class DecisionExtractor:
def __init__(self, llm_client):
self.llm = llm_client
def extract_decisions(self, contextual_chunk):
prompt = f"""
Analyze this email thread segment and extract all decisions made.
Format each decision as:
DECISION: [clear statement of what was decided]
EVIDENCE: [quotes that support this decision]
CONFIDENCE: [high/medium/low]
Thread segment:
{contextual_chunk['text']}
"""
response = self.llm.chat.completions.create(
model="gpt-4o-2026", # Используем самую новую модель на 19.02.2026
messages=[{"role": "user", "content": prompt}],
temperature=0.1
)
return self._parse_llm_response(response.choices[0].message.content)
def _parse_llm_response(self, text):
# Parse the structured response from LLM
decisions = []
current = {}
for line in text.split('\n'):
if line.startswith('DECISION:'):
if current:
decisions.append(current)
current = {'decision': line.replace('DECISION:', '').strip()}
elif line.startswith('EVIDENCE:'):
current['evidence'] = line.replace('EVIDENCE:', '').strip()
elif line.startswith('CONFIDENCE:'):
current['confidence'] = line.replace('CONFIDENCE:', '').strip().lower()
if current:
decisions.append(current)
return decisions
Важный нюанс: используйте самую новую модель OpenAI GPT-4o-2026 или аналогичную. Модели 2026 года значительно лучше понимают контекст диалогов и извлекают неявные решения.
Интеграция в RAG-пайплайн
Теперь соберем все вместе в работающую систему:
- Парсинг и построение графа: Извлекаем Message-ID, References, строим дерево переписки
- Контекстный чанкинг: Группируем сообщения по логическим блокам, сохраняя связи вопрос-ответ
- Извлечение решений: Анализируем каждый чанк на наличие паттернов принятия решений
- Индексация с метаданными: Векторизуем чанки вместе с метаданными (участники, темы, глубины)
- Поиск с учетом структуры: При поиске учитываем не только семантику, но и структурные признаки
Вот как выглядит конечный пайплайн:
class EmailRAGPipeline:
def __init__(self, vector_store, llm_client):
self.graph_builder = EmailThreadGraph()
self.chunker = ContextualChunker(max_tokens=800)
self.decision_extractor = DecisionExtractor(llm_client)
self.vector_store = vector_store
def index_thread(self, raw_emails):
"""Process and index an email thread"""
# 1. Build graph
for email_raw in raw_emails:
self.graph_builder.add_email(email_raw)
# 2. Get root messages (threads without parents)
all_ids = set(self.graph_builder.nodes.keys())
child_ids = set()
for children in self.graph_builder.edges.values():
child_ids.update(children)
root_ids = all_ids - child_ids
# 3. Process each thread
for root_id in root_ids:
thread_tree = self.graph_builder.get_thread_tree(root_id)
if not thread_tree:
continue
# 4. Contextual chunking
chunks = self.chunker.chunk_thread(thread_tree)
# 5. Extract decisions (optional, can be done on query)
decisions = []
for chunk in chunks:
chunk_decisions = self.decision_extractor.extract_decisions(chunk)
decisions.extend(chunk_decisions)
# 6. Prepare for indexing
for chunk in chunks:
# Enhanced metadata
chunk['metadata']['decisions'] = decisions
chunk['metadata']['thread_root'] = root_id
chunk['metadata']['has_decisions'] = len(decisions) > 0
# Index in vector store
self.vector_store.add_document(
text=chunk['text'],
metadata=chunk['metadata'],
embedding_model='text-embedding-3-large' # Актуальная модель на 2026
)
def query(self, question, filter_by_participant=None):
"""Query the email archive"""
# Prepare query with context
query_text = question
if filter_by_participant:
query_text += f" (involving {filter_by_participant})"
# Search in vector store
results = self.vector_store.search(
query=query_text,
filter_conditions={'has_decisions': True} if 'решение' in question.lower() else {},
top_k=5
)
# Rerank considering thread structure
reranked = self._rerank_by_thread_coherence(results)
return reranked
def _rerank_by_thread_coherence(self, results):
"""Rerank results based on thread completeness"""
# Prefer chunks that contain complete discussion threads
# over fragmented pieces
scored_results = []
for result in results:
score = result['similarity_score']
# Boost score for chunks with decisions
if result['metadata'].get('has_decisions'):
score *= 1.3
# Boost for chunks with multiple participants
participants = len(result['metadata'].get('participants', []))
if participants > 1:
score *= 1.1 + (participants * 0.05)
# Penalize shallow depth (likely incomplete discussions)
depth_min, depth_max = result['metadata'].get('depth_range', (0, 0))
if depth_max - depth_min < 2:
score *= 0.8
scored_results.append({**result, 'rerank_score': score})
return sorted(scored_results, key=lambda x: x['rerank_score'], reverse=True)
Ошибки, которые все равно совершат
Даже с этим гайдом вы увидите типичные промахи:
| Ошибка | Последствие | Как исправить |
|---|---|---|
| Игнорирование цитат в телах писем | Потеря контекста, дублирование | Удаляйте цитаты только если они точно повторяют предыдущее сообщение в чанке |
| Слишком большие чанки | Шум в поиске, высокие cost LLM | Начинайте с 600-800 токенов, увеличивайте только если теряется контекст |
| Отсутствие нормализации тем | "Re: Re: Re: Вопрос" ≠ "Вопрос" | Удаляйте префиксы "Re:", "Fwd:" но сохраняйте историю изменений темы |
| Неучет временных меток | Путаница в последовательности | Всегда используйте Date header, а не порядок в mailbox |
Что делать, когда тредов миллионы
Описанный подход работает для тысяч тредов. Но что если у вас корпоративный архив за 10 лет?
Тогда нужна гибридная стратегия:
- Первый уровень: Быстрый индекс по метаданным (участники, даты, темы) с использованием техник фильтрации Decompose
- Второй уровень: Детальный графовый анализ только для релевантных тредов
- Кэширование решений: Извлеченные решения храните отдельно, как материализованные представления
Для масштабирования такой системы изучите практический гайд по масштабированию RAG.
Почему это работает там, где обычный RAG падает
Email-треды — это структурированные данные в disguise. Они выглядят как текст, но ведут себя как граф. Обычный RAG ищет семантические similarity между кусками текста. Наш подход ищет semantic similarity в контексте structural similarity.
Когда вы спрашиваете "Какое решение приняли?", система:
- Ищет чанки, помеченные как содержащие решения
- Проверяет, что в чанке есть паттерн "предложение-обсуждение-согласие"
- Извлекает сформулированное решение (уже подготовленное при индексации)
- Возвращает решение с цитатами-доказательствами
Это не поиск по тексту. Это поиск по смысловым структурам.
Что дальше? GraphRAG и агенты
Описанный подход — это foundation. На 2026 год уже активно развиваются более сложные техники:
- GraphRAG: Полноценное представление тредов как knowledge graphs с entity linking. Об этом подробно в обзоре свежих исследований RAG.
- Agentic RAG: Агенты, которые могут "вести" диалог с архивом, задавая уточняющие вопросы. Полезно для сложных investigative запросов.
- Временные графы: Учет того, как решения менялись со временем. Что решили в январе, пересмотрели в марте.
Но начинать нужно с основ: граф + контекстный чанкинг + извлечение решений. Без этого фундамента более сложные техники будут давать красивый, но бесполезный output.
Главный урок: email — это не текст. Это диалог. И обрабатывать его нужно соответствующим образом. Иначе ваш RAG будет находить ответы на вопросы, которых никто не задавал.