Твой AI-агент валится не потому что модель дура. А потому что ты забыл поставить circuit breaker. В этой статье — пять структурных ошибок, которые я видел в каждом втором продакшене. Ни одна не про качество промптов.
🔁 Ошибка #1: Retry-ад — бесконечные повторы без backoff
Симптом: внешний API отвечает 503, твой агент ретраит каждые 100 мс, через 5 минут кластер падает.
Масштаб: в 2025 году одна финтех-компания потратила $47 000 за 23 минуты из-за retry loop в AI-агенте, который долбил OpenAI на каждом шаге транзакции. Экспоненциальная задержка? Нет, не слышали.
1Как НЕ надо
while True: # вечность
try:
response = call_api()
break
except Exception:
time.sleep(0.1) # zero mercy
2Правильно
import random
def retry_with_backoff(func, max_retries=5, base=2.0):
for attempt in range(max_retries):
try:
return func()
except Exception:
sleep_time = (base ** attempt) + random.uniform(0, 1)
time.sleep(sleep_time)
raise
Добавь джиттер (случайный шум), иначе эскалация при синхронном ретрае на N агентах — они начнут долбить одновременно. Ещё лучше — использовать circuit breaker от греха подальше.
💡 Читай также: Почему ломаются LLM-агенты: инженерный разбор сбоев многоагентных систем — там я разбираю, как retry loops ломают оркестрацию.
⚡ Ошибка #2: Нет circuit breaker — полуоткрытое состояние тебя спасёт
Твой RAG-векторсторе упал. Агент продолжает слать запросы, тратит контекст, время и деньги. Ответы — бессмысленные.
Паттерн: circuit breaker с тремя состояниями: Closed (всё ок), Open (быстро падаем), Half-Open (пробуем снова).
class CircuitBreaker:
def __init__(self, threshold=5, recovery_time=30):
self.failures = 0
self.threshold = threshold
self.state = 'closed'
self.last_failure = None
self.recovery_time = recovery_time
def call(self, func):
if self.state == 'open':
if (time.time() - self.last_failure) > self.recovery_time:
self.state = 'half-open'
else:
raise CircuitBreakerOpenError()
try:
result = func()
if self.state == 'half-open':
self.state = 'closed'
self.failures = 0
return result
except Exception:
self.failures += 1
self.last_failure = time.time()
if self.failures >= self.threshold:
self.state = 'open'
raise
🚦 Ошибка #3: Все агенты в одной очереди — rate limiting без изоляции
У тебя 50 агентов. Они все долбят один эндпоинт. Rate limiter срабатывает — 429 Too Many Requests. Агенты ретраят (снова, как в п.1). Каскад.
Решение: отдельные пулы соединений per agent или per class агентов. Token bucket на каждый пул.
from collections import deque
class TokenBucket:
def __init__(self, rate, burst):
self.tokens = burst
self.rate = rate
self.last_refill = time.time()
def consume(self, tokens=1):
now = time.time()
self.tokens = min(self.tokens + (now - self.last_refill) * self.rate, burst)
self.last_refill = now
if self.tokens < tokens:
return False
self.tokens -= tokens
return True
Раздавай бакеты каждому агенту. Если один агент начинает молотить — остальные не страдают. Классическая изоляция fault domain.
⏳ Ошибка #4: Deadline — твой лучший друг, но ты его не зовёшь
Агент ждёт ответа от LLM. Бесконечно. Блокируется весь пайплайн. Потому что в OpenAI API произошла задержка, а timeout не выставлен.
Правило: каждый вызов LLM или внешнего сервиса должен иметь deadline. Лучше завершиться с ошибкой и перейти к следующему шагу, чем висеть вечность.
import asyncio
async def call_llm_with_timeout(prompt, timeout=10.0):
try:
return await asyncio.wait_for(
openai.Completion.create(prompt),
timeout=timeout
)
except asyncio.TimeoutError:
log.error('LLM timeout, fallback to cache')
return get_cached_response(prompt)
⚠️ Не советую ставить один таймаут на весь агент. Каждый вызов — свой deadline. Иначе один медленный шаг убивает весь workflow.
Graceful degradation — пусть агент идёт дальше с неполным результатом. Как это сделать — я писал в статье про planner/executor.
🧱 Ошибка #5: Синхронная архитектура — один упал, стоять всем
Многоагентная система. Sub-agent A упал с ошибкой. Agent B ждёт его ответ. Agent C тоже. Все повисли. Очередь растёт.
Проблема: синхронное ожидание в цепочке агентов. Решение — асинхронная передача сообщений через брокер (RabbitMQ, Kafka, Redis Streams) или хотя бы asyncio с таймаутами на каждый шаг.
async def orchestrate():
task_a = asyncio.create_task(run_agent('A'))
task_b = asyncio.create_task(run_agent('B'))
done, pending = await asyncio.wait(
[task_a, task_b],
timeout=30,
return_when=ALL_COMPLETED
)
for task in pending:
task.cancel()
log.warning('Agent timeout, proceed with partial results')
Асинхронность не панацея — без изоляции ошибок (см. circuit breaker) она лишь откладывает крах. Но это база.
💡 Дополнительно: в статье Фреймворки для оркестрации AI-агентов я сравниваю, как разные инструменты решают эту проблему — спойлер: почти никак.
Итог не-итога: твой агент не умирает от тупого промпта. Он умирает от retry loops, отсутствия breaker’а и синхронного кода. Почини это — модель вывезет.