ETL-пайплайн для анализа криминальной статистики с Prefect и Metabase | Гайд 2026 | AiManual
AiManual Logo Ai / Manual.
03 Фев 2026 Гайд

Строим ETL-пайплайн для криминальной статистики: Prefect, Metabase и никаких костылей

Пошаговый гайд по созданию ETL-пайплайна для анализа криминальной статистики с использованием Prefect 3.0, PostgreSQL и Metabase. Код, архитектура, ошибки.

Проблема: полиция не делится API, а аналитики требуют дашборды

Представьте ситуацию: нужно проанализировать криминальную статистику по городу. Источники - открытые данные полиции, но в формате PDF или через древний веб-интерфейс без API. Аналитики хотят видеть тренды, паттерны, горячие точки. Вручную это делать - адский труд. Нужен автоматизированный пайплайн, который сам скачает данные, обработает и положит в базу для визуализации.

Вот что обычно ломается в таких системах:

  • Скрапинг ломается при каждом изменении верстки сайта
  • ETL-процессы падают без логирования ошибок
  • Нет мониторинга - не понятно, когда данные перестали обновляться
  • Дашборды показывают устаревшую информацию
  • Код превращается в спагетти из скриптов и cron-заданий

Почему Airflow не подходит для этой задачи? В 2026 году Prefect 3.0 обгоняет Airflow в простоте настройки и отладки. Конфигурация через Python-декораторы вместо XML/JSON, встроенный UI для мониторинга, понятная модель задач и потоков. Для небольшого ETL-пайплайна Prefect - идеальный выбор.

Решение: Prefect как оркестратор, Metabase как визуализация

Архитектура простая, но надежная:

  1. Extract: Python-скрипт скачивает данные с сайта полиции (или парсит PDF)
  2. Transform: Очистка, нормализация, агрегация данных
  3. Load: Загрузка в PostgreSQL
  4. Orchestrate: Prefect управляет всем процессом по расписанию
  5. Visualize: Metabase строит дашборды поверх базы

Ключевое преимущество - изоляция этапов. Если сайт полиции упал, падает только этап Extract. Transform и Load продолжают работать со старыми данными. Prefect логирует ошибку, отправляет уведомление, а вы спокойно пьете кофе.

💡
Для сложного анализа текстовых описаний преступлений можно подключить семантический пайплайн для LLM. Это позволит автоматически категоризировать преступления по смыслу, а не только по формальным кодам.

1 Подготовка окружения: что ставить в 2026 году

Не используйте старые версии инструментов. На февраль 2026 актуальны:

  • Prefect 3.0+ (полностью переписанный движок, асинхронность из коробки)
  • Metabase 50+ (новая система разрешений, улучшенные диаграммы)
  • PostgreSQL 17+ (лучшая производительность для временных рядов)
  • Python 3.12+ (pattern matching для обработки данных)
# Установка Prefect 3.0
pip install "prefect>=3.0.0" prefect-postgres

# Для скрапинга
pip install httpx beautifulsoup4 pandas pdfplumber

# Docker-образ Metabase (последняя версия)
docker pull metabase/metabase:latest

Ошибка новичков: ставить Prefect 2.x. Версия 3.0 полностью меняет API. Декораторы @task и @flow остались, но внутренняя архитектура другая. Если найдете туториалы по Prefect 2 - смело закрывайте.

2 Проектируем таблицы: куда складывать данные о преступлениях

Не делайте одну гигантскую таблицу. Используйте звездообразную схему (star schema) как в финансовых моделях Power BI.

-- Фактовая таблица
CREATE TABLE crime_facts (
    id SERIAL PRIMARY KEY,
    date DATE NOT NULL,
    district_id INTEGER REFERENCES districts(id),
    crime_type_id INTEGER REFERENCES crime_types(id),
    weapon_used BOOLEAN,
    victims_count INTEGER,
    officer_response_time INTERVAL,
    latitude DECIMAL(9,6),
    longitude DECIMAL(9,6),
    raw_description TEXT,
    processed_at TIMESTAMP DEFAULT NOW()
);

-- Размерность: типы преступлений
CREATE TABLE crime_types (
    id SERIAL PRIMARY KEY,
    code VARCHAR(10) UNIQUE,
    category VARCHAR(100),
    severity INTEGER CHECK (severity BETWEEN 1 AND 5)
);

-- Размерность: районы
CREATE TABLE districts (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    population INTEGER,
    area_sq_km DECIMAL(10,2)
);

Зачем такая сложность? Потому что аналитики будут спрашивать: "Сколько краж со взломом в Северном районе в январе 2026 по сравнению с январем 2025?" С нормализованной схемой такой запрос пишется в одну строку.

3 Пишем ETL-флоу на Prefect 3.0

Вот полный флоу с обработкой ошибок и ретраями:

from datetime import datetime, timedelta
from prefect import flow, task, get_run_logger
from prefect.tasks import exponential_backoff
import httpx
import pandas as pd
from sqlalchemy import create_engine
import asyncio

@task(retries=3, retry_delay_seconds=exponential_backoff(10))
async def download_crime_data(date: datetime) -> pd.DataFrame:
    """Скачиваем данные за конкретный день"""
    logger = get_run_logger()
    
    # Имитируем API полиции (в реальности URL другой)
    url = f"https://api.police.gov/crimes?date={date.strftime('%Y-%m-%d')}"
    
    async with httpx.AsyncClient(timeout=30.0) as client:
        try:
            response = await client.get(url)
            response.raise_for_status()
            logger.info(f"Downloaded {len(response.content)} bytes for {date}")
            return pd.read_json(response.content)
        except httpx.RequestError as e:
            logger.error(f"Failed to download data for {date}: {e}")
            raise

@task
def clean_crime_data(raw_df: pd.DataFrame) -> pd.DataFrame:
    """Очищаем и трансформируем данные"""
    # Удаляем дубликаты
    df = raw_df.drop_duplicates(subset=['case_number'])
    
    # Нормализуем даты
    df['date'] = pd.to_datetime(df['incident_date'], errors='coerce')
    df = df.dropna(subset=['date'])
    
    # Стандартизируем категории преступлений
    crime_mapping = {
        'BURGLARY': 'THEFT',
        'ROBBERY': 'THEFT',
        'AUTO THEFT': 'THEFT',
        'ASSAULT': 'VIOLENCE'
    }
    df['crime_category'] = df['crime_type'].map(
        lambda x: crime_mapping.get(x, 'OTHER')
    )
    
    # Геокодирование (упрощенно)
    df['latitude'] = pd.to_numeric(df['lat'], errors='coerce')
    df['longitude'] = pd.to_numeric(df['lng'], errors='coerce')
    
    return df

@task
def load_to_postgres(clean_df: pd.DataFrame, table_name: str = "crime_facts"):
    """Загружаем в PostgreSQL"""
    engine = create_engine("postgresql://user:pass@localhost:5432/crime_db")
    
    # Используем метод upsert для обновления существующих записей
    clean_df.to_sql(
        table_name,
        engine,
        if_exists='append',
        index=False,
        method='multi'
    )

@flow(name="crime-etl-pipeline", log_prints=True)
async def crime_etl_flow(start_date: str, end_date: str):
    """Основной ETL флоу"""
    logger = get_run_logger()
    
    start = datetime.strptime(start_date, "%Y-%m-%d")
    end = datetime.strptime(end_date, "%Y-%m-%d")
    
    dates = [start + timedelta(days=i) for i in range((end - start).days + 1)]
    
    for date in dates:
        logger.info(f"Processing {date.strftime('%Y-%m-%d')}")
        
        try:
            # Extract
            raw_data = await download_crime_data(date)
            
            # Transform
            clean_data = clean_crime_data(raw_data)
            
            # Load
            load_to_postgres(clean_data)
            
            logger.info(f"Successfully processed {date}")
            
        except Exception as e:
            logger.error(f"Failed for {date}: {e}")
            # Продолжаем со следующим днем
            continue

# Запуск флоу
if __name__ == "__main__":
    # Для прошлой недели
    crime_etl_flow(
        start_date=(datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d"),
        end_date=datetime.now().strftime("%Y-%m-%d")
    )

Обратите внимание на асинхронность и обработку ошибок. Если данные за один день не скачались - флоу продолжает со следующим днем, а не падает полностью.

💡
Для production-среды добавьте квоты и rate limiting. Сайты полиции часто блокируют IP при слишком частых запросах. Используйте prefect.tasks.sleep между запросами или распределите загрузку по времени.

4 Настраиваем Prefect Server и деплой

Локально можно работать с Prefect в режиме ephemeral, но для продакшена нужен сервер:

# Запуск Prefect server (новый синтаксис в 3.0)
prefect server start

# Создаем work pool для выполнения задач
prefect work-pool create crime-pool --type process

# Деплоим наш флоу
prefect deploy crime_etl_flow.py:crime_etl_flow --name crime-daily --pool crime-pool

# Настраиваем расписание (каждый день в 2:00)
prefect deployment schedule create crime-daily \
  --interval 86400 \
  --anchor-date "2026-02-03T02:00:00"

Теперь флоу будет запускаться автоматически каждый день. Все запуски, логи, ошибки - в Prefect UI по адресу http://localhost:4200.

5 Визуализация в Metabase: дашборд, который понятен даже начальству

Metabase подключается к PostgreSQL и строит дашборды без кода:

# Запуск Metabase через Docker
docker run -d -p 3000:3000 \
  -e MB_DB_TYPE=postgres \
  -e MB_DB_DBNAME=crime_db \
  -e MB_DB_PORT=5432 \
  -e MB_DB_USER=metabase \
  -e MB_DB_PASS=secret \
  -e MB_DB_HOST=localhost \
  --name metabase \
  metabase/metabase:latest

После запуска настройте:

  1. Подключение к базе crime_db
  2. Создайте вопросы (questions):
    • Количество преступлений по дням (линейный график)
    • Распределение по типам преступлений (pie chart)
    • Географическая карта горячих точек (map)
    • Среднее время реагирования полиции по районам (bar chart)
  3. Соберите вопросы в дашборд "Криминальная статистика города"

Секрет профессиональных дашбордов: Добавьте KPI-карточки вверху дашборда: "Преступлений за сегодня", "Снижение за неделю", "Самый опасный район". Начальство любит цифры, которые можно быстро прочитать.

Ошибки, которые сломают ваш пайплайн (и как их избежать)

Ошибка Почему происходит Как исправить
Данные скачались, но не загрузились в БД Схема БД изменилась, нет прав на запись Добавьте валидацию схемы перед загрузкой. Используйте prefect.failures для graceful failure
Сайт полиции вернул CAPTCHA Слишком много запросов с одного IP Используйте ротацию прокси. Добавьте задержки между запросами
Дашборд показывает вчерашние данные ETL упал, но уведомление не пришло Настройте алерты в Prefect на Slack/Telegram. Мониторьте через Prefect UI
Геоданные не парсятся Формат координат изменился (DD vs DMS) Добавьте несколько парсеров с fallback. Логируйте сырые данные для отладки

Что дальше? От ETL к ML-предсказаниям

Когда ETL работает стабильно, можно добавлять аналитику:

  • Прогнозирование: Модель для предсказания количества преступлений по районам на завтра (вспомните про time travel в ML чтобы избежать утечки данных)
  • Кластеризация: Автоматическое выделение паттернов преступлений
  • Анализ текста: Обработка описаний преступлений через LLM для выявления скрытых связей

Но начинайте с простого. Сначала добейтесь, чтобы базовый ETL работал 30 дней без вашего вмешательства. Потом добавляйте сложность.

💡
Если Prefect кажется избыточным для вашей задачи, посмотрите статью про замену ETL на ИИ-агентов. Но предупреждаю: это следующий уровень сложности.

Полный код проекта с Docker Compose, примерами дашбордов и конфигурацией алертов доступен в GitHub репозитории. Клонируйте, настройте под свои данные и запускайте. Первый работающий дашборд - через 2 часа, а не через 2 недели.