Почему MCP Tool Registry — это не просто ещё один реестр
Знакомо? Собрали RAG-систему, подключили пару инструментов, всё работает локально. Потом добавили ещё пять. Потом десять. И вот уже у вас в конфигах 30 строк подключения к разным серверам, каждый со своими версиями, таймаутами и требованиями к аутентификации.
Проблема не в количестве инструментов. Проблема в том, как их оркестрировать в реальном времени, когда пользователь спрашивает что-то вроде "Найди мне информацию в документации, проверь актуальность через API и сгенерируй отчёт".
MCP Tool Registry — это централизованный реестр, который автоматически управляет подключением, версионированием и оркестрацией MCP-серверов. Он превращает хаос из 30 конфигов в единую точку управления.
Архитектура, которая не сломается на третьем инструменте
Стандартный подход — подключать MCP-серверы напрямую к агенту. Работает, пока у вас их два-три. Потом начинаются проблемы:
- Таймауты накладываются друг на друга
- Один сбойный сервер ломает всю цепочку
- Нет централизованного логирования
- Обновление версий — адская задача
MCP Tool Registry решает это через трёхуровневую архитектуру:
| Уровень | Задача | Технологии |
|---|---|---|
| Реестр | Хранение метаданных серверов | PostgreSQL + Redis кэш |
| Оркестратор | Управление жизненным циклом | FastAPI + Celery workers |
| Шлюз | Единая точка входа для агентов | WebSocket + HTTP прокси |
Звучит сложно? На практике это означает, что ваш агент обращается к одному endpoint'у, а Tool Registry сам решает, какие серверы запустить, как их скомбинировать и как обработать результат.
Собираем реестр с нуля: код, который работает
Начнём с базы данных. Не используйте SQLite для продакшена — она заблокируется при первой же серьёзной нагрузке.
1 Схема базы данных для Tool Registry
CREATE TABLE mcp_servers (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
description TEXT,
version VARCHAR(50) NOT NULL,
-- Docker образ или путь к локальному исполняемому файлу
image_or_path TEXT NOT NULL,
-- Порт, на котором сервер слушает внутри контейнера
internal_port INTEGER NOT NULL,
-- Команда запуска (если нужно переопределить)
startup_command TEXT,
-- Требования к окружению
environment JSONB DEFAULT '{}',
-- Метаданные инструментов, которые предоставляет сервер
tools_metadata JSONB NOT NULL,
-- Health check endpoint
health_check_path VARCHAR(255) DEFAULT '/health',
-- Таймауты в миллисекундах
startup_timeout INTEGER DEFAULT 30000,
request_timeout INTEGER DEFAULT 10000,
-- Статистика использования
total_requests BIGINT DEFAULT 0,
avg_response_time FLOAT DEFAULT 0,
-- Активен ли сервер в реестре
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE server_instances (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
server_id UUID REFERENCES mcp_servers(id) ON DELETE CASCADE,
-- Динамический порт, выделенный для этого экземпляра
assigned_port INTEGER NOT NULL,
-- ID контейнера Docker или процесса
container_id VARCHAR(255),
-- Статус: starting, running, stopping, stopped, error
status VARCHAR(50) NOT NULL,
-- PID процесса
pid INTEGER,
-- Использование ресурсов
cpu_usage FLOAT DEFAULT 0,
memory_usage_mb INTEGER DEFAULT 0,
-- Когда был запущен
started_at TIMESTAMP DEFAULT NOW(),
-- Последний health check
last_health_check TIMESTAMP,
-- Причина остановки (если есть)
stop_reason TEXT
);
2 Оркестратор на FastAPI: запуск и управление серверами
Вот как выглядит основной endpoint для запуска инструментов:
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import List, Dict, Any
import docker
import asyncio
from datetime import datetime
import logging
app = FastAPI(title="MCP Tool Registry")
client = docker.from_env()
logger = logging.getLogger(__name__)
class ToolRequest(BaseModel):
tool_name: str
parameters: Dict[str, Any]
context: Dict[str, Any] = {}
timeout_ms: int = 10000
class ToolExecutionResult(BaseModel):
success: bool
result: Any
error: str = None
execution_time_ms: int
server_instance_id: str
# Кэш запущенных экземпляров
running_instances = {}
@app.post("/execute-tool", response_model=ToolExecutionResult)
async def execute_tool(request: ToolRequest, background_tasks: BackgroundTasks):
"""
Основной endpoint для выполнения инструмента через MCP Tool Registry.
Автоматически находит подходящий сервер, запускает его если нужно,
выполняет запрос и возвращает результат.
"""
start_time = datetime.now()
# 1. Найти сервер, который предоставляет нужный инструмент
server = await find_server_for_tool(request.tool_name)
if not server:
raise HTTPException(status_code=404,
detail=f"No server found for tool '{request.tool_name}'")
# 2. Получить или запустить экземпляр сервера
instance = await get_or_start_server_instance(server)
# 3. Выполнить запрос к серверу
try:
result = await call_mcp_server(
instance=instance,
tool_name=request.tool_name,
parameters=request.parameters,
timeout_ms=request.timeout_ms
)
execution_time = (datetime.now() - start_time).total_seconds() * 1000
# 4. Запланировать остановку сервера если он не используется
background_tasks.add_task(maybe_stop_instance, instance.id)
return ToolExecutionResult(
success=True,
result=result,
execution_time_ms=int(execution_time),
server_instance_id=instance.id
)
except asyncio.TimeoutError:
# Сервер завис — помечаем как проблемный
await mark_instance_as_problematic(instance.id)
raise HTTPException(status_code=504, detail="Tool execution timeout")
except Exception as e:
logger.error(f"Tool execution failed: {e}")
await mark_instance_as_problematic(instance.id)
raise HTTPException(status_code=500, detail=f"Tool execution failed: {str(e)}")
async def get_or_start_server_instance(server):
"""
Получить доступный экземпляр сервера или запустить новый.
Использует пул соединений для эффективного reuse.
"""
# Проверяем есть ли доступные экземпляры в пуле
available_instances = [
inst for inst in running_instances.values()
if inst.server_id == server.id and inst.status == "running"
]
if available_instances:
# Берём наименее нагруженный
instance = min(available_instances, key=lambda x: x.current_requests)
instance.current_requests += 1
return instance
# Запускаем новый экземпляр
instance = await start_server_instance(server)
running_instances[instance.id] = instance
return instance
Интеграция с существующими RAG-системами
Допустим, у вас уже есть RAG-система, похожая на ту, что описана в руководстве по локальному Agentic RAG. Как встроить Tool Registry?
Вместо того, чтобы хардкодить вызовы инструментов, вы делаете так:
# Старый подход (хрупкий)
try:
result = await search_documents(query)
except ConnectionError:
# Сервер упал, что делать?
pass
# Новый подход через Tool Registry
async def enhanced_rag_pipeline(query, context):
"""
Расширенный RAG конвейер, использующий Tool Registry
для динамического подключения инструментов.
"""
# 1. Поиск в векторной БД (локальный инструмент)
search_results = await tool_registry.execute_tool(
tool_name="vector_search",
parameters={"query": query, "top_k": 5}
)
# 2. Проверка актуальности через внешний API
# (MCP-сервер для проверки дат в документах)
freshness_check = await tool_registry.execute_tool(
tool_name="check_freshness",
parameters={"documents": search_results}
)
# 3. Если есть устаревшие документы, ищем обновления
if freshness_check.needs_update:
updated_docs = await tool_registry.execute_tool(
tool_name="fetch_latest_docs",
parameters={"document_ids": freshness_check.outdated_ids}
)
search_results.update(updated_docs)
# 4. Генерация ответа с контекстом
response = await tool_registry.execute_tool(
tool_name="generate_with_context",
parameters={
"query": query,
"context": search_results,
"style": "technical"
}
)
return response
Важно: не делайте синхронные вызовы к Tool Registry из асинхронного кода. Используйте asyncio.gather() для параллельного выполнения независимых инструментов, но будьте осторожны с ограничениями ресурсов.
Автоматическое обнаружение и регистрация инструментов
Самое мощное в Tool Registry — автоматизация. Когда вы добавляете новый MCP-сервер (например, сервер ВкусВилла для заказа продуктов), он должен автоматически регистрироваться в реестре.
Реализуем авто-дискавери:
import asyncio
import yaml
from pathlib import Path
from watchfiles import awatch
class AutoDiscoveryService:
def __init__(self, registry_client, config_dir: Path):
self.registry = registry_client
self.config_dir = config_dir
self.watched_files = {'mcp-servers.yaml', 'mcp-config.yaml'}
async def start_discovery(self):
"""
Запускает мониторинг конфигурационных файлов
и автоматически регистрирует/обновляет серверы.
"""
async for changes in awatch(self.config_dir):
for change_type, file_path in changes:
if Path(file_path).name in self.watched_files:
await self.process_config_change(file_path)
async def process_config_change(self, config_path: Path):
"""
Обрабатывает изменения в конфигурации MCP серверов.
"""
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
for server_config in config.get('servers', []):
# Проверяем, существует ли уже сервер
existing = await self.registry.get_server(server_config['name'])
if existing:
# Обновляем если версия изменилась
if existing.version != server_config['version']:
await self.registry.update_server(
name=server_config['name'],
config=server_config
)
logger.info(f"Updated server {server_config['name']} to version {server_config['version']}")
else:
# Регистрируем новый сервер
await self.registry.register_server(server_config)
logger.info(f"Registered new server: {server_config['name']}")
# Автоматически создаём навыки из документации
# используя подход из Skill Seekers
if server_config.get('create_skills_from_docs', False):
await self.create_skills_from_documentation(server_config)
Безопасность: что не пишут в документации
Когда вы запускаете произвольные MCP-серверы через Tool Registry, возникают вопросы безопасности. Особенно если эти серверы обрабатывают пользовательские данные или имеют доступ к API.
Основные угрозы (подробнее в статье про безопасность MCP):
- Изоляция: Каждый сервер должен работать в отдельном контейнере или namespace
- Лимиты ресурсов: Ограничение CPU, памяти, дискового пространства
- Сетевой доступ: Белый список разрешённых доменов для внешних вызовов
- Аутентификация между компонентами: mTLS для внутренней коммуникации
Вот как реализовать базовую изоляцию через Docker:
async def start_server_with_isolation(server_config):
"""
Запускает MCP сервер с изоляцией через Docker.
"""
client = docker.from_env()
# Создаём уникальное имя сети для изоляции
network_name = f"mcp-isolated-{server_config['name']}-{uuid.uuid4().hex[:8]}"
network = client.networks.create(network_name, driver="bridge")
# Ограничиваем ресурсы
container = client.containers.run(
image=server_config['image'],
name=f"mcp-{server_config['name']}-{uuid.uuid4().hex[:8]}",
network=network_name,
# Жёсткие лимиты
mem_limit="512m", # Не больше 512MB RAM
cpuset_cpus="0-1", # Только первые 2 ядра
# Read-only root filesystem
read_only=True,
# Только необходимые capabilities
cap_drop=["ALL"],
capabilities=["NET_BIND_SERVICE"],
# Без привилегий
privileged=False,
# User namespace
user="1000:1000",
# Environment variables
environment=server_config.get('env', {}),
# Health check
healthcheck={
"test": ["CMD", "curl", "-f", "http://localhost:${PORT}/health"],
"interval": 30000000000, # 30 секунд
"timeout": 5000000000, # 5 секунд
"retries": 3
},
detach=True
)
return {
"container": container,
"network": network,
"assigned_port": assigned_port
}
Мониторинг и метрики: что смотреть в продакшене
Tool Registry без мониторинга — слепой. Вот ключевые метрики, которые нужно отслеживать:
| Метрика | Порог | Действие при превышении |
|---|---|---|
| Время запуска сервера | > 30 секунд | Пометить как проблемный, использовать fallback |
| Процент ошибок на инструмент | > 5% | Временно отключить, уведомить админа |
| Использование памяти на экземпляр | > 80% лимита | Перезапустить с увеличенным лимитом |
| Количество активных экземпляров | > 50 | Включить агрессивный reuse или scaling |
Пример конфигурации Prometheus для сбора метрик:
# prometheus.yml
scrape_configs:
- job_name: 'mcp_tool_registry'
static_configs:
- targets: ['tool-registry:8000']
metrics_path: '/metrics'
- job_name: 'mcp_servers'
# Динамическое обнаружение серверов через Service Discovery
file_sd_configs:
- files:
- '/etc/prometheus/mcp-servers.json'
relabel_configs:
- source_labels: [__address__]
target_label: __param_target
- source_labels: [__param_target]
target_label: instance
- target_label: __address__
replacement: 'blackbox-exporter:9115' # Health check прокси
# Алерт при высокой частоте ошибок
alerting:
alertmanagers:
- static_configs:
- targets: ['alertmanager:9093']
alert_rules:
- alert: MCPToolHighErrorRate
expr: rate(mcp_tool_errors_total[5m]) > 0.05
for: 2m
labels:
severity: warning
annotations:
summary: "High error rate for MCP tools"
description: "Error rate {{ $value }} exceeds 5% threshold"
Оптимизация производительности: кеширование и пулинг
Запускать новый контейнер Docker для каждого запроса — неприемлемо. Нужен пул предзапущенных экземпляров.
Но есть нюанс: разные инструменты требуют разного времени инициализации. Сервер для Text-to-SQL может загружать модель 2-3 гигабайта и тратить на это 30 секунд. Сервер для простого поиска по документации запускается за 100 мс.
Решение — адаптивный пул:
class AdaptivePoolManager:
def __init__(self):
self.pools = {} # server_id -> ServerPool
self.warmup_times = {} # server_id -> среднее время запуска
async def get_instance(self, server_id, tool_name):
"""
Получает экземпляр из пула, создавая новый если нужно.
"""
pool = self.pools.get(server_id)
if not pool:
pool = ServerPool(server_id)
self.pools[server_id] = pool
# Проверяем, есть ли свободные экземпляры
instance = pool.get_idle_instance()
if instance:
return instance
# Если нет свободных, проверяем нужно ли создавать новый
avg_execution_time = self.get_avg_execution_time(server_id, tool_name)
avg_warmup_time = self.warmup_times.get(server_id, 5000) # 5 секунд по умолчанию
# Эвристика: создаём новый экземпляр если
# время ожидания > времени запуска
if pool.avg_wait_time > avg_warmup_time:
# Запускаем в фоне новый экземпляр
asyncio.create_task(pool.warmup_new_instance())
# Ждём освобождения существующего экземпляра
instance = await pool.wait_for_instance(timeout=avg_execution_time * 2)
if not instance:
# Таймаут — запускаем синхронно
instance = await pool.create_instance_sync()
return instance
Интеграция с AI-агентами: реальный кейс
Допустим, вы строите агента для анализа документации. Без Tool Registry код выглядел бы так:
# Сложно поддерживать, легко сломать
class DocumentationAgent:
def __init__(self):
self.vector_search = VectorSearchClient("localhost:8001")
self.pdf_parser = PDFParserClient("localhost:8002")
self.code_analyzer = CodeAnalyzerClient("localhost:8003")
self.summarizer = SummarizerClient("localhost:8004")
# ... и ещё 10 клиентов
С Tool Registry:
class DocumentationAgent:
def __init__(self, tool_registry_url):
self.registry = MCPToolRegistryClient(tool_registry_url)
async def analyze_documentation(self, doc_path):
"""
Умный анализ документации с автоматическим выбором инструментов.
"""
# 1. Определяем тип документа
doc_type = await self.registry.execute_tool(
tool_name="detect_document_type",
parameters={"path": doc_path}
)
# 2. Парсим в зависимости от типа
if doc_type == "pdf":
parser_tool = "parse_pdf"
elif doc_type == "markdown":
parser_tool = "parse_markdown"
elif doc_type == "code":
parser_tool = "parse_source_code"
else:
parser_tool = "parse_generic_text"
parsed = await self.registry.execute_tool(
tool_name=parser_tool,
parameters={"path": doc_path}
)
# 3. Извлекаем ключевые концепции
concepts = await self.registry.execute_tool(
tool_name="extract_concepts",
parameters={"text": parsed.content}
)
# 4. Если это API документация, генерируем примеры кода
if "api" in concepts.tags:
examples = await self.registry.execute_tool(
tool_name="generate_code_examples",
parameters={
"endpoints": parsed.endpoints,
"languages": ["python", "javascript", "curl"]
}
)
parsed.examples = examples
return parsed
Преимущество? Когда появляется новый тип документа (например, аудио-документация для мультимодального RAG), вы просто регистрируете новый MCP-сервер для обработки аудио. Агент автоматически начнёт его использовать.
Развертывание в Kubernetes: production-ready манифест
Для продакшена используйте Kubernetes. Вот базовый Deployment:
# tool-registry-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: mcp-tool-registry
namespace: ai-platform
spec:
replicas: 3
selector:
matchLabels:
app: tool-registry
template:
metadata:
labels:
app: tool-registry
spec:
serviceAccountName: tool-registry-sa
containers:
- name: registry-core
image: your-registry/mcp-tool-registry:latest
ports:
- containerPort: 8000
name: http
- containerPort: 8001
name: metrics
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-secrets
key: connection-string
- name: REDIS_URL
value: "redis://redis-master:6379"
# Лимиты ресурсов
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "2"
# Health checks
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
# Sidecar для мониторинга Docker
- name: docker-monitor
image: docker:dind
securityContext:
privileged: true
volumeMounts:
- name: docker-socket
mountPath: /var/run/docker.sock
volumes:
- name: docker-socket
hostPath:
path: /var/run/docker.sock
---
# Service для discovery
apiVersion: v1
kind: Service
metadata:
name: tool-registry
namespace: ai-platform
spec:
selector:
app: tool-registry
ports:
- port: 8000
targetPort: 8000
name: http
- port: 8001
targetPort: 8001
name: metrics
type: ClusterIP
---
# Horizontal Pod Autoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: tool-registry-hpa
namespace: ai-platform
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: mcp-tool-registry
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
Чего не хватает в текущих реализациях
Большинство open-source Tool Registry реализаций упускают важные аспекты:
- Версионирование схем инструментов: Когда меняется API инструмента, старые агенты должны продолжать работать
- A/B тестирование инструментов: Запускать две версии одного инструмента и сравнивать результаты
- Автоматическое создание документации: Генерация OpenAPI спецификаций для зарегистрированных инструментов
- Интеграция с мониторингом бизнес-метрик: Не только технические метрики, но и качество ответов
Для A/B тестирования добавьте в схему базы:
ALTER TABLE mcp_servers ADD COLUMN ab_test_variant VARCHAR(50);
ALTER TABLE mcp_servers ADD COLUMN ab_test_traffic_percentage INTEGER DEFAULT 0;
-- При запросе инструмента:
-- 1. Найти все серверы, предоставляющие этот инструмент
-- 2. Если есть A/B варианты, распределить трафик согласно percentage
-- 3. Логировать, какой вариант использовался для каждого запроса
-- 4. Сравнивать метрики качества между вариантами
Следующий шаг: от Tool Registry к AI Operating System
Tool Registry — это не конечная точка. Это фундамент для AI Operating System, где:
- Инструменты автоматически комбинируются в workflow'ы
- Система обучается на истории использования, предлагая оптимальные цепочки инструментов
- Ресурсы динамически выделяются под текущую нагрузку
- Безопасность встроена на уровне ядра системы
Попробуйте начать с простого: возьмите свою RAG-систему, вынесите все внешние вызовы в отдельные MCP-серверы и подключите через Tool Registry. Первые два сервера будут болезненными. Пятый покажется логичным. Десятый — вы уже не сможете представить, как жили без этого.
Главный совет: не пытайтесь построить идеальную систему с первого раза. Сделайте минимально рабочую версию, которая решает вашу самую больную проблему прямо сейчас. У нас это обычно было управление версиями инструментов — когда продакшен-агент ломался из-за того, что в staging обновили API поиска, но забыли обновить продакшен.
Tool Registry решил это за неделю. Остальное — постепенные улучшения.