Проблема: полиция не делится API, а аналитики требуют дашборды
Представьте ситуацию: нужно проанализировать криминальную статистику по городу. Источники - открытые данные полиции, но в формате PDF или через древний веб-интерфейс без API. Аналитики хотят видеть тренды, паттерны, горячие точки. Вручную это делать - адский труд. Нужен автоматизированный пайплайн, который сам скачает данные, обработает и положит в базу для визуализации.
Вот что обычно ломается в таких системах:
- Скрапинг ломается при каждом изменении верстки сайта
- ETL-процессы падают без логирования ошибок
- Нет мониторинга - не понятно, когда данные перестали обновляться
- Дашборды показывают устаревшую информацию
- Код превращается в спагетти из скриптов и cron-заданий
Почему Airflow не подходит для этой задачи? В 2026 году Prefect 3.0 обгоняет Airflow в простоте настройки и отладки. Конфигурация через Python-декораторы вместо XML/JSON, встроенный UI для мониторинга, понятная модель задач и потоков. Для небольшого ETL-пайплайна Prefect - идеальный выбор.
Решение: Prefect как оркестратор, Metabase как визуализация
Архитектура простая, но надежная:
- Extract: Python-скрипт скачивает данные с сайта полиции (или парсит PDF)
- Transform: Очистка, нормализация, агрегация данных
- Load: Загрузка в PostgreSQL
- Orchestrate: Prefect управляет всем процессом по расписанию
- Visualize: Metabase строит дашборды поверх базы
Ключевое преимущество - изоляция этапов. Если сайт полиции упал, падает только этап Extract. Transform и Load продолжают работать со старыми данными. Prefect логирует ошибку, отправляет уведомление, а вы спокойно пьете кофе.
1 Подготовка окружения: что ставить в 2026 году
Не используйте старые версии инструментов. На февраль 2026 актуальны:
- Prefect 3.0+ (полностью переписанный движок, асинхронность из коробки)
- Metabase 50+ (новая система разрешений, улучшенные диаграммы)
- PostgreSQL 17+ (лучшая производительность для временных рядов)
- Python 3.12+ (pattern matching для обработки данных)
# Установка Prefect 3.0
pip install "prefect>=3.0.0" prefect-postgres
# Для скрапинга
pip install httpx beautifulsoup4 pandas pdfplumber
# Docker-образ Metabase (последняя версия)
docker pull metabase/metabase:latest
Ошибка новичков: ставить Prefect 2.x. Версия 3.0 полностью меняет API. Декораторы @task и @flow остались, но внутренняя архитектура другая. Если найдете туториалы по Prefect 2 - смело закрывайте.
2 Проектируем таблицы: куда складывать данные о преступлениях
Не делайте одну гигантскую таблицу. Используйте звездообразную схему (star schema) как в финансовых моделях Power BI.
-- Фактовая таблица
CREATE TABLE crime_facts (
id SERIAL PRIMARY KEY,
date DATE NOT NULL,
district_id INTEGER REFERENCES districts(id),
crime_type_id INTEGER REFERENCES crime_types(id),
weapon_used BOOLEAN,
victims_count INTEGER,
officer_response_time INTERVAL,
latitude DECIMAL(9,6),
longitude DECIMAL(9,6),
raw_description TEXT,
processed_at TIMESTAMP DEFAULT NOW()
);
-- Размерность: типы преступлений
CREATE TABLE crime_types (
id SERIAL PRIMARY KEY,
code VARCHAR(10) UNIQUE,
category VARCHAR(100),
severity INTEGER CHECK (severity BETWEEN 1 AND 5)
);
-- Размерность: районы
CREATE TABLE districts (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
population INTEGER,
area_sq_km DECIMAL(10,2)
);
Зачем такая сложность? Потому что аналитики будут спрашивать: "Сколько краж со взломом в Северном районе в январе 2026 по сравнению с январем 2025?" С нормализованной схемой такой запрос пишется в одну строку.
3 Пишем ETL-флоу на Prefect 3.0
Вот полный флоу с обработкой ошибок и ретраями:
from datetime import datetime, timedelta
from prefect import flow, task, get_run_logger
from prefect.tasks import exponential_backoff
import httpx
import pandas as pd
from sqlalchemy import create_engine
import asyncio
@task(retries=3, retry_delay_seconds=exponential_backoff(10))
async def download_crime_data(date: datetime) -> pd.DataFrame:
"""Скачиваем данные за конкретный день"""
logger = get_run_logger()
# Имитируем API полиции (в реальности URL другой)
url = f"https://api.police.gov/crimes?date={date.strftime('%Y-%m-%d')}"
async with httpx.AsyncClient(timeout=30.0) as client:
try:
response = await client.get(url)
response.raise_for_status()
logger.info(f"Downloaded {len(response.content)} bytes for {date}")
return pd.read_json(response.content)
except httpx.RequestError as e:
logger.error(f"Failed to download data for {date}: {e}")
raise
@task
def clean_crime_data(raw_df: pd.DataFrame) -> pd.DataFrame:
"""Очищаем и трансформируем данные"""
# Удаляем дубликаты
df = raw_df.drop_duplicates(subset=['case_number'])
# Нормализуем даты
df['date'] = pd.to_datetime(df['incident_date'], errors='coerce')
df = df.dropna(subset=['date'])
# Стандартизируем категории преступлений
crime_mapping = {
'BURGLARY': 'THEFT',
'ROBBERY': 'THEFT',
'AUTO THEFT': 'THEFT',
'ASSAULT': 'VIOLENCE'
}
df['crime_category'] = df['crime_type'].map(
lambda x: crime_mapping.get(x, 'OTHER')
)
# Геокодирование (упрощенно)
df['latitude'] = pd.to_numeric(df['lat'], errors='coerce')
df['longitude'] = pd.to_numeric(df['lng'], errors='coerce')
return df
@task
def load_to_postgres(clean_df: pd.DataFrame, table_name: str = "crime_facts"):
"""Загружаем в PostgreSQL"""
engine = create_engine("postgresql://user:pass@localhost:5432/crime_db")
# Используем метод upsert для обновления существующих записей
clean_df.to_sql(
table_name,
engine,
if_exists='append',
index=False,
method='multi'
)
@flow(name="crime-etl-pipeline", log_prints=True)
async def crime_etl_flow(start_date: str, end_date: str):
"""Основной ETL флоу"""
logger = get_run_logger()
start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d")
dates = [start + timedelta(days=i) for i in range((end - start).days + 1)]
for date in dates:
logger.info(f"Processing {date.strftime('%Y-%m-%d')}")
try:
# Extract
raw_data = await download_crime_data(date)
# Transform
clean_data = clean_crime_data(raw_data)
# Load
load_to_postgres(clean_data)
logger.info(f"Successfully processed {date}")
except Exception as e:
logger.error(f"Failed for {date}: {e}")
# Продолжаем со следующим днем
continue
# Запуск флоу
if __name__ == "__main__":
# Для прошлой недели
crime_etl_flow(
start_date=(datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d"),
end_date=datetime.now().strftime("%Y-%m-%d")
)
Обратите внимание на асинхронность и обработку ошибок. Если данные за один день не скачались - флоу продолжает со следующим днем, а не падает полностью.
prefect.tasks.sleep между запросами или распределите загрузку по времени.4 Настраиваем Prefect Server и деплой
Локально можно работать с Prefect в режиме ephemeral, но для продакшена нужен сервер:
# Запуск Prefect server (новый синтаксис в 3.0)
prefect server start
# Создаем work pool для выполнения задач
prefect work-pool create crime-pool --type process
# Деплоим наш флоу
prefect deploy crime_etl_flow.py:crime_etl_flow --name crime-daily --pool crime-pool
# Настраиваем расписание (каждый день в 2:00)
prefect deployment schedule create crime-daily \
--interval 86400 \
--anchor-date "2026-02-03T02:00:00"
Теперь флоу будет запускаться автоматически каждый день. Все запуски, логи, ошибки - в Prefect UI по адресу http://localhost:4200.
5 Визуализация в Metabase: дашборд, который понятен даже начальству
Metabase подключается к PostgreSQL и строит дашборды без кода:
# Запуск Metabase через Docker
docker run -d -p 3000:3000 \
-e MB_DB_TYPE=postgres \
-e MB_DB_DBNAME=crime_db \
-e MB_DB_PORT=5432 \
-e MB_DB_USER=metabase \
-e MB_DB_PASS=secret \
-e MB_DB_HOST=localhost \
--name metabase \
metabase/metabase:latest
После запуска настройте:
- Подключение к базе crime_db
- Создайте вопросы (questions):
- Количество преступлений по дням (линейный график)
- Распределение по типам преступлений (pie chart)
- Географическая карта горячих точек (map)
- Среднее время реагирования полиции по районам (bar chart)
- Соберите вопросы в дашборд "Криминальная статистика города"
Секрет профессиональных дашбордов: Добавьте KPI-карточки вверху дашборда: "Преступлений за сегодня", "Снижение за неделю", "Самый опасный район". Начальство любит цифры, которые можно быстро прочитать.
Ошибки, которые сломают ваш пайплайн (и как их избежать)
| Ошибка | Почему происходит | Как исправить |
|---|---|---|
| Данные скачались, но не загрузились в БД | Схема БД изменилась, нет прав на запись | Добавьте валидацию схемы перед загрузкой. Используйте prefect.failures для graceful failure |
| Сайт полиции вернул CAPTCHA | Слишком много запросов с одного IP | Используйте ротацию прокси. Добавьте задержки между запросами |
| Дашборд показывает вчерашние данные | ETL упал, но уведомление не пришло | Настройте алерты в Prefect на Slack/Telegram. Мониторьте через Prefect UI |
| Геоданные не парсятся | Формат координат изменился (DD vs DMS) | Добавьте несколько парсеров с fallback. Логируйте сырые данные для отладки |
Что дальше? От ETL к ML-предсказаниям
Когда ETL работает стабильно, можно добавлять аналитику:
- Прогнозирование: Модель для предсказания количества преступлений по районам на завтра (вспомните про time travel в ML чтобы избежать утечки данных)
- Кластеризация: Автоматическое выделение паттернов преступлений
- Анализ текста: Обработка описаний преступлений через LLM для выявления скрытых связей
Но начинайте с простого. Сначала добейтесь, чтобы базовый ETL работал 30 дней без вашего вмешательства. Потом добавляйте сложность.
Полный код проекта с Docker Compose, примерами дашбордов и конфигурацией алертов доступен в GitHub репозитории. Клонируйте, настройте под свои данные и запускайте. Первый работающий дашборд - через 2 часа, а не через 2 недели.