DataFlow: Воспроизводимые пайплайны данных для LLM в 2025 | AiManual
AiManual Logo Ai / Manual.
02 Янв 2026 Инструмент

DataFlow: как строить воспроизводимые пайплайны подготовки данных для LLM

Полное руководство по DataFlow — инструменту для создания воспроизводимых пайплайнов подготовки данных для обучения больших языковых моделей.

DataFlow — это не проще, это по-другому

Забудьте про скрипты с тысячей строк кода, где вы вручную перебираете файлы, парсите JSON и падаете в обморок от ошибок кодировки. DataFlow в PyTorch — это другой подход к обработке данных для LLM. Не просто библиотека, а философия.

Представьте: вы описываете что должно произойти с данными, а не как это сделать. Как в декларативном программировании, только для терабайтов текста.

💡
Если вы читали нашу статью про семантические пайплайны для LLM, то DataFlow — это техническая реализация тех же идей. Там мы говорили «что», здесь показываем «как».

В чем боль?

Вы тренируете модель на 100 GPU. Данные текут из десяти источников. Через неделю выясняется, что в одном из JSON-файлов изменилась структура. Или кто-то случайно заменил токенизатор. Или добавил новую фильтрацию. Результат? Модель обучается на других данных, воспроизводимость летит в тартарары.

Классический подход: пишем скрипт подготовки данных. Запускаем. Сохраняем результат. Тренируем модель. Все работает, пока не нужно повторить.

# Как НЕ надо делать
import json
import os
from transformers import AutoTokenizer

def prepare_data():
    data = []
    for filename in os.listdir("data/"):
        with open(f"data/{filename}", "r") as f:
            content = f.read()
            # А если файл битый?
            # А если кодировка не UTF-8?
            # А если файл весит 10GB?
            data.append(content)
    
    tokenizer = AutoTokenizer.from_pretrained("gpt2")
    tokenized = []
    for text in data:
        tokens = tokenizer(text, truncation=True, max_length=512)
        tokenized.append(tokens)
    
    return tokenized

Проблемы видны невооруженным глазом. Нет обработки ошибок. Нет ленивой загрузки. Нет воспроизводимости. Нет возможности отладки отдельных этапов.

DataFlow: граф вместо скрипта

DataFlow предлагает думать о пайплайне как о графе операций. Каждый узел — преобразование. Каждое ребро — поток данных.

# Как надо делать с DataFlow
from torchdata.datapipes.iter import IterDataPipe
from torchdata.dataloader2 import DataLoader2
import torchdata

def build_dataflow():
    # 1. Чтение файлов
    dp = torchdata.datapipes.iter.FileLister("data/")
    dp = dp.open_files(mode="r")
    
    # 2. Парсинг JSON
    dp = dp.parse_json_files()
    
    # 3. Извлечение текстового поля
    dp = dp.map(lambda x: x["text"])
    
    # 4. Фильтрация по длине
    dp = dp.filter(lambda x: len(x) > 100)
    
    # 5. Токенизация (ленивая!)
    tokenizer = AutoTokenizer.from_pretrained("gpt2")
    dp = dp.map(lambda x: tokenizer(x, truncation=True, max_length=512))
    
    return dp

Ключевое слово — ленивая. Данные не обрабатываются заранее. Они текут через граф только когда модель их запрашивает во время тренировки. Для больших датасетов это не удобство, а необходимость.

Традиционный подходDataFlow подход
Загружаем все данные в памятьОбрабатываем потоком
Фиксированный пайплайнКомпозируемый граф
Сложно отлаживатьКаждый этап изолирован
Нет воспроизводимостиДекларативное описание

Операторы, которые меняют правила игры

DataFlow не ограничивается базовыми map и filter. Вот что действительно круто:

  • Shuffler — перемешивание без загрузки всего датасета в память
  • Batcher — группировка в батчи с разными стратегиями
  • Cycler — бесконечный цикл по данным
  • Zipper — объединение нескольких источников
  • Demux — разделение потока по условиям

Но самый мощный оператор — Iterate. Он позволяет создавать итеративные пайплайны, где данные проходят через несколько циклов обработки. Идеально для LLM-driven очистки данных.

# Итеративная очистка с помощью LLM
def llm_cleaner(text):
    # Используем небольшую модель для очистки
    # Например, удаляем мусор, исправляем опечатки
    cleaned = some_llm_api(f"Clean this text: {text}")
    return cleaned

def build_iterative_flow():
    dp = FileLister("raw_data/")
    dp = dp.open_files().readlines()
    
    # Первый проход: базовая очистка
    dp = dp.map(lambda x: x.strip())
    dp = dp.filter(lambda x: len(x) > 50)
    
    # Второй проход: LLM-очистка
    dp = dp.map(llm_cleaner)
    
    # Третий проход: проверка качества
    dp = dp.filter(check_quality)
    
    return dp

Это тот самый семантический пайплайн из нашей предыдущей статьи, но реализованный технически. Вместо жестких правил — адаптивная обработка с обратной связью.

Хранилища: не просто файлы на диске

DataFlow интегрируется с разными бэкендами хранения. Это не только локальные файлы:

  • S3 совместимые хранилища
  • Google Cloud Storage
  • Базы данных через адаптеры
  • Потоковые источники (Kafka, RabbitMQ)

Но главное — версионирование. Каждый пайплайн можно зафиксировать вместе с версиями данных. Git для данных, только лучше.

# Версионирование пайплайна
from torchdata.datapipes.iter import IterableWrapper
import hashlib

class VersionedDataPipe(IterDataPipe):
    def __init__(self, source_pipe):
        self.source = source_pipe
        self.version_hash = self._compute_hash()
    
    def _compute_hash(self):
        # Хеш от конфигурации пайплайна
        config = {"steps": ["clean", "tokenize", "batch"]}
        return hashlib.md5(str(config).encode()).hexdigest()
    
    def __iter__(self):
        for item in self.source:
            yield {"data": item, "pipeline_version": self.version_hash}

Сравнение с альтернативами

DataFlow — не единственный игрок на поле. Но у него есть преимущества.

ИнструментПлюсыМинусыКогда выбирать
Apache BeamМасштабируется до кластеровСложный порог входаОбработка петабайтов в продакшене
TensorFlow DataИнтеграция с TFПривязан к экосистеме TFЕсли вся инфраструктура на TensorFlow
HuggingFace DatasetsГотовые датасетыМало гибкости для кастомных пайплайновБыстрый старт с популярными датасетами
DataFlow (PyTorch)Глубокая интеграция с PyTorch, ленивая обработка, декларативностьМолодая экосистемаИсследования и продакшн LLM на PyTorch

Главное преимущество DataFlow — он создан для исследовательского цикла. Вы можете быстро менять пайплайн, экспериментировать с разными преобразованиями, и все это без переписывания половины кода.

1Реальный пример: пайплайн для синтетических данных

Допустим, вы хотите генерировать синтетические данные для дообучения LLM. Старый подход — скрипт, который падает при первой же ошибке. Новый подход — отказоустойчивый граф.

from torchdata.datapipes.iter import IterableWrapper
import random

def generate_synthetic_example(topic):
    """Генерирует синтетический пример по теме"""
    prompts = [
        f"Write a short story about {topic}",
        f"Explain {topic} to a 10-year-old",
        f"List 5 facts about {topic}"
    ]
    prompt = random.choice(prompts)
    # Здесь был бы вызов LLM API
    # Для примера возвращаем заглушку
    return {"prompt": prompt, "completion": f"Generated text about {topic}"}

def build_synthetic_pipeline(topics, num_samples_per_topic):
    """Строит пайплайн для генерации синтетических данных"""
    
    # Создаем источник тем
    topics_pipe = IterableWrapper(topics)
    
    # Повторяем каждую тему N раз
    topics_pipe = topics_pipe.cycle(count=num_samples_per_topic)
    
    # Генерируем примеры
    data_pipe = topics_pipe.map(generate_synthetic_example)
    
    # Шаффлим
    data_pipe = data_pipe.shuffle(buffer_size=1000)
    
    # Разбиваем на батчи
    data_pipe = data_pipe.batch(batch_size=32, drop_last=True)
    
    # Добавляем версионирование
    data_pipe = data_pipe.map(lambda batch: {
        "batch": batch,
        "pipeline_version": "v1.2",
        "topics": topics
    })
    
    return data_pipe

# Использование
topics = ["quantum physics", "medieval history", "python programming"]
pipeline = build_synthetic_pipeline(topics, num_samples_per_topic=1000)

# Теперь pipeline можно использовать в тренировке
for batch in pipeline:
    train_model(batch["batch"])

Внимание: в реальном пайплайне нужно добавить обработку ошибок. Генерация через API может падать по таймауту, возвращать некорректные данные, или вообще не работать.

2Интеграция с существующей инфраструктурой

DataFlow не требует выбрасывать весь существующий код. Вы можете обернуть старые DataLoader в DataPipes и постепенно мигрировать.

from torch.utils.data import DataLoader
from torchdata.datapipes.iter import IterableWrapper

# Старый DataLoader
old_dataset = YourCustomDataset()
old_loader = DataLoader(old_dataset, batch_size=32)

# Обертка в DataPipe
def old_loader_to_datapipe(loader):
    for batch in loader:
        yield batch

# Создаем DataPipe из старого лоадера
dp = IterableWrapper(old_loader_to_datapipe(old_loader))

# Теперь можно добавлять новые преобразования
dp = dp.map(lambda x: preprocess(x))
dp = dp.filter(lambda x: x is not None)
dp = dp.batch(batch_size=64)

Кому подойдет DataFlow?

Не всем. Вот кому стоит смотреть в эту сторону:

  • Исследователи LLM, которым нужно быстро экспериментировать с разными способами подготовки данных
  • Команды, строящие RAG-системы — как раз те, кто читал нашу статью про семантические пайплайны
  • Дата инженеры в стартапах, где требования меняются каждый день
  • Те, кто работает с синтетическими данными — где пайплайн должен быть гибким и адаптивным

А вот кому пока рано:

  • Если ваши данные помещаются в памяти и пайплайн никогда не меняется
  • Если вы привязаны к TensorFlow всей душой и телом
  • Если вам нужна распределенная обработка прямо сейчас (хотя DataFlow к этому движется)

Подводные камни (потому что они есть всегда)

DataFlow — не серебряная пуля. Вот с чем вы столкнетесь:

  1. Документация — ее мало, и она часто отстает от реальности
  2. Сообщество — пока небольшое, сложных вопросов в Stack Overflow не найдете
  3. Отладка — ленивые вычисления усложняют дебаг, нужно учиться новым подходам
  4. Производительность — на маленьких данных overhead может быть заметен

Но главная проблема в другом. DataFlow требует другого мышления. Вы больше не пишете императивный код «сделай это, потом то». Вы описываете граф преобразований. Это как перейти с процедурного программирования на функциональное — мозг ломается первые две недели.

💡
Если вы планируете серьезные эксперименты с LLM, посмотрите также нашу статью про кластеризацию LLM. DataFlow хорошо сочетается с распределенной обработкой.

Что дальше?

DataFlow развивается. В планах — лучшая интеграция с распределенными системами, больше готовых операторов, улучшенная отладка. Но уже сейчас это самый перспективный подход к подготовке данных для LLM в экосистеме PyTorch.

Совет напоследок: начните с малого. Возьмите один маленький пайплайн, перепишите его на DataFlow. Почему один? Потому что первые два вы бросите на полпути, когда столкнетесь с неочевидным поведением. Третий уже получится.

И последнее: не пытайтесь переписать всю инфраструктуру за неделю. DataFlow — это не фреймворк, который вы внедряете. Это подход, который вы принимаете. Постепенно. С болями. С ошибками. Но в итоге — с воспроизводимыми пайплайнами, которые не ломаются от изменения формата одного JSON-файла.