Почему данные о воздухе - это боль
Открываешь сайт NASA или Copernicus. Хочешь скачать данные о PM2.5 или NO₂. А там... десяток форматов, каждый со своими приколами. NetCDF весит 20 ГБ, GeoJSON криво читается pandas, а Parquet требует танцев с бубном.
Первый раз столкнулся с этим в 2024, когда делал дашборд для одного экостартапа. Клиент хотел видеть прогноз загрязнения воздуха на 3 дня вперёд. Пришлось разбираться со всем этим зоопарком форматов. Сейчас расскажу, как не наступать на те же грабли.
Важный момент: в 2026 году спутниковые данные стали точнее в 3 раза. Новые аппараты типа Sentinel-6 дают разрешение 300 метров вместо прежних 10 километров. Это меняет правила игры.
Три кита экоданных: кто для чего
Не бывает универсального формата. Каждый решает свою задачу. Запоминайте:
| Формат | Когда использовать | Главная боль | Объём данных |
|---|---|---|---|
| GeoJSON | Веб-карты, мобильные приложения | Медленный на больших данных | До 100 МБ |
| Parquet (GeoParquet) | Аналитика, ML, долгосрочные тренды | Сложная структура метаданных | ГБ-ТБ |
| NetCDF/HDF5 | Спутниковые данные, научные расчёты | Огромные файлы, специфичный API | 10+ ГБ |
GeoJSON: когда нужны карты, а не анализ
Самый человекочитаемый формат. Открываешь в любом текстовом редакторе - видишь координаты и значения. Но это же и его слабость.
1 Как НЕ читать GeoJSON
Не делайте так:
# ПЛОХО: pandas не умеет в геоданные
import pandas as pd
df = pd.read_json('air_quality.geojson') # Всё сломается
2 Правильный способ: geopandas + фильтрация
# Устанавливаем если нет
# pip install geopandas pyproj
import geopandas as gpd
from datetime import datetime, timedelta
# Читаем с фильтрацией по времени
gdf = gpd.read_file(
'https://data.example.com/air_quality_2026.geojson',
# Новый параметр в 2026: фильтрация при чтении
where="timestamp >= '2026-01-20' AND pollutant = 'PM2.5'"
)
# Проверяем что получили
print(f"Загружено {len(gdf)} точек измерения")
print(f"Диапазон дат: {gdf['timestamp'].min()} - {gdf['timestamp'].max()}")
# Быстрая визуализация
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(12, 8))
gdf.plot(
column='value',
cmap='RdYlGn_r', # Красный-жёлтый-зелёный (обратный)
legend=True,
markersize=50,
alpha=0.7,
ax=ax
)
ax.set_title('Концентрация PM2.5, 24.01.2026')
plt.savefig('pm25_map.png', dpi=150, bbox_inches='tight')
Parquet и GeoParquet: когда данных много
В 2025 году окончательно приняли стандарт GeoParquet 1.1. Теперь геоданные в Parquet - не хаки, а официальная спецификация.
Внимание: старые библиотеки (pyarrow < 16.0) не поддерживают GeoParquet. Проверяйте версии.
3 Читаем многолетние данные за секунды
# Установка: pip install pyarrow>=16.0 geopandas
import pyarrow.parquet as pq
import geopandas as gpd
import pandas as pd
# Смотрим метаданные файла
metadata = pq.read_metadata('air_quality_2010_2026.geoparquet')
print(f"Колонки: {metadata.schema.names}")
print(f"Строк: {metadata.num_rows}")
print(f"Размер: {metadata.serialized_size / 1e9:.2f} GB")
# Читаем только нужные колонки и период
# Parquet умеет predicate pushdown - фильтрация на уровне файла
gdf = gpd.read_parquet(
'air_quality_2010_2026.geoparquet',
columns=['geometry', 'pm25', 'timestamp', 'station_id'],
filters=[
('timestamp', '>=', pd.Timestamp('2025-01-01')),
('timestamp', '<', pd.Timestamp('2026-01-01')),
('pm25', '>', 0) # Игнорируем пропуски
]
)
# Агрегация по месяцам
monthly_stats = gdf.groupby(
pd.Grouper(key='timestamp', freq='M')
).agg({
'pm25': ['mean', 'max', 'std'],
'station_id': 'nunique'
}).round(2)
print(monthly_stats.head())
4 Пишем GeoParquet правильно
# Конвертируем GeoJSON в GeoParquet
import geopandas as gpd
# Читаем исходные данные
input_gdf = gpd.read_file('daily_measurements.geojson')
# Оптимизируем типы данных для экономии места
input_gdf['timestamp'] = pd.to_datetime(input_gdf['timestamp'])
input_gdf['station_id'] = input_gdf['station_id'].astype('category')
# Сохраняем с пространственным индексом
input_gdf.to_parquet(
'optimized_air_quality.geoparquet',
engine='pyarrow',
compression='zstd', # Лучшее сжатие в 2026
index=False,
# Ключевой параметр для GeoParquet
geometry_encoding='WKB',
# Создаём партиции по году для быстрого поиска
partition_cols=['year']
)
print(f"Сжали с {input_gdf.memory_usage(deep=True).sum() / 1e6:.1f} MB")
print(f"до {os.path.getsize('optimized_air_quality.geoparquet') / 1e6:.1f} MB")
NetCDF/HDF5: тяжёлая артиллерия спутников
Формат для учёных. Хранит многомерные массивы с координатными сетками. Если видите файл на 50 ГБ с расширением .nc - это оно.
Главная проблема NetCDF - нужно понимать его внутреннюю структуру. Каждый центр данных (NASA, ECMWF, Copernicus) использует свою организацию переменных.
5 Как разобрать спутниковый снимок
# Установка: pip install xarray[complete] netCDF4 h5py
import xarray as xr
import numpy as np
import matplotlib.pyplot as plt
# Открываем NetCDF файл (может быть на диске или по HTTP)
# Пример: данные Sentinel-5P о NO₂
ds = xr.open_dataset(
'S5P_NO2_20260124.nc',
engine='netcdf4',
# Читаем только нужные переменные
decode_cf=True,
mask_and_scale=True
)
# Смотрим что внутри
print("\nПеременные в файле:")
for var in ds.data_vars:
print(f" {var}: {ds[var].attrs.get('long_name', 'No description')}")
print("\nРазмеры данных:")
for dim in ds.dims:
print(f" {dim}: {ds.dims[dim]}")
# Извлекаем данные о диоксиде азота
# В данных Copernicus переменная обычно называется 'nitrogen_dioxide_total_column'
no2_data = ds['nitrogen_dioxide_total_column']
# Выбираем срез по времени (если есть временная ось)
if 'time' in ds.dims:
# Берём последний временной срез
no2_slice = no2_data.isel(time=-1)
else:
no2_slice = no2_data
# Конвертируем в DataFrame для анализа
no2_df = no2_slice.to_dataframe().reset_index()
print(f"\nРазмер DataFrame: {len(no2_df)} строк")
# Быстрая визуализация
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
# 1. Распределение значений
axes[0].hist(no2_df['nitrogen_dioxide_total_column'].dropna(), bins=50, alpha=0.7)
axes[0].set_xlabel('Концентрация NO₂ [mol/m²]')
axes[0].set_ylabel('Частота')
axes[0].set_title('Распределение концентраций NO₂')
axes[0].grid(True, alpha=0.3)
# 2. Пространственное распределение (если есть координаты)
if 'latitude' in no2_df.columns and 'longitude' in no2_df.columns:
scatter = axes[1].scatter(
no2_df['longitude'],
no2_df['latitude'],
c=no2_df['nitrogen_dioxide_total_column'],
cmap='hot_r',
s=1,
alpha=0.6
)
plt.colorbar(scatter, ax=axes[1], label='NO₂ [mol/m²]')
axes[1].set_xlabel('Долгота')
axes[1].set_ylabel('Широта')
axes[1].set_title('Пространственное распределение NO₂')
plt.tight_layout()
plt.savefig('no2_analysis.png', dpi=150)
plt.show()
# Не забываем закрыть файл
if hasattr(ds, 'close'):
ds.close()
Откуда брать данные в 2026 году
Источники обновляются каждый год. Вот что актуально на январь 2026:
- Copernicus Atmosphere Monitoring Service (CAMS) - глобальные прогнозы качества воздуха. Бесплатно, но нужно регистрироваться. Данные в NetCDF и GRIB.
- NASA Earthdata - архив спутниковых измерений. Особенно хороши данные MODIS и VIIRS об аэрозолях. Форматы: HDF, NetCDF.
- OpenAQ - агрегатор данных с наземных станций по всему миру. API возвращает JSON, но есть дампы в Parquet. Идеально для анализа трендов.
- Google Earth Engine - облачная плата для обработки спутниковых данных. Можно экспортировать результаты в GeoTIFF или TFRecord.
- Национальные мониторинговые сети - в Европе это EEA, в США - EPA AirNow. Качество данных отличное, но форматы... разные.
Собираем пайплайн: от сырых данных до дашборда
Вот рабочий пример пайплайна, который я использовал в реальном проекте:
#!/usr/bin/env python3
"""
Пайплайн обработки данных о качестве воздуха
Обновлено для 2026 года
"""
import warnings
warnings.filterwarnings('ignore')
import pandas as pd
import geopandas as gpd
from datetime import datetime, timedelta
import logging
from pathlib import Path
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class AirQualityPipeline:
"""Универсальный пайплайн для работы с экоданными"""
def __init__(self, cache_dir='./cache'):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
def download_open_aq(self, country='RU', days=7):
"""Скачиваем данные с OpenAQ"""
import requests
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
url = (
f"https://api.openaq.org/v2/measurements?"
f"country={country}&"
f"date_from={start_date.isoformat()}Z&"
f"date_to={end_date.isoformat()}Z&"
f"limit=10000&"
f"parameter=pm25,no2,o3"
)
logger.info(f"Загружаем данные с {url}")
response = requests.get(url)
data = response.json()
# Конвертируем в GeoDataFrame
features = []
for item in data.get('results', []):
feature = {
'type': 'Feature',
'geometry': {
'type': 'Point',
'coordinates': [item['coordinates']['longitude'],
item['coordinates']['latitude']]
},
'properties': {
'value': item['value'],
'parameter': item['parameter'],
'unit': item['unit'],
'location': item['location'],
'timestamp': item['date']['utc']
}
}
features.append(feature)
gdf = gpd.GeoDataFrame.from_features(features, crs='EPSG:4326')
logger.info(f"Загружено {len(gdf)} измерений")
return gdf
def process_to_parquet(self, gdf, output_path):
"""Обрабатываем и сохраняем в Parquet"""
# Добавляем вычисляемые поля
gdf['timestamp'] = pd.to_datetime(gdf['timestamp'])
gdf['hour'] = gdf['timestamp'].dt.hour
gdf['day_of_week'] = gdf['timestamp'].dt.dayofweek
# Агрегация по локациям
daily_stats = gdf.groupby(['location', 'parameter', pd.Grouper(key='timestamp', freq='D')]).agg({
'value': ['mean', 'max', 'count']
}).round(3)
daily_stats.columns = ['_'.join(col).strip() for col in daily_stats.columns]
daily_stats = daily_stats.reset_index()
# Сохраняем
output_path = Path(output_path)
daily_stats.to_parquet(output_path, engine='pyarrow', compression='zstd')
logger.info(f"Данные сохранены в {output_path}")
logger.info(f"Размер файла: {output_path.stat().st_size / 1e6:.2f} MB")
return daily_stats
def create_alert_layer(self, gdf, threshold=25):
"""Создаём слой с превышениями ПДК"""
# PM2.5 > 25 мкг/м³ - опасный уровень по WHO 2025
alerts = gdf[
(gdf['parameter'] == 'pm25') &
(gdf['value'] > threshold)
].copy()
if len(alerts) > 0:
alerts['severity'] = pd.cut(
alerts['value'],
bins=[25, 35, 50, 100, float('inf')],
labels=['low', 'medium', 'high', 'extreme']
)
logger.warning(f"Обнаружено {len(alerts)} превышений ПДК PM2.5")
# Сохраняем как GeoJSON для карты
alerts_path = self.cache_dir / 'alerts.geojson'
alerts.to_file(alerts_path, driver='GeoJSON')
logger.info(f"Слой с алертами сохранён в {alerts_path}")
return alerts
# Использование
if __name__ == '__main__':
pipeline = AirQualityPipeline()
# 1. Загружаем данные
gdf = pipeline.download_open_aq(country='RU', days=30)
# 2. Обрабатываем
stats = pipeline.process_to_parquet(gdf, './data/air_quality_russia_2026.parquet')
# 3. Ищем опасные уровни
alerts = pipeline.create_alert_layer(gdf, threshold=25)
# 4. Экспортируем для дашборда
if len(gdf) > 0:
# GeoJSON для веб-карты (только последние 7 дней)
recent = gdf[gdf['timestamp'] > (datetime.now() - timedelta(days=7))]
recent.to_file('./data/recent_measurements.geojson', driver='GeoJSON')
# CSV для табличных отчётов
stats.to_csv('./data/daily_statistics.csv', index=False)
logger.info("Пайплайн завершён успешно")
Типичные ошибки (и как их избежать)
Ошибка 1: Читать весь NetCDF файл в память. Файл на 50 ГБ убьёт любой сервер.
Решение: Используйте xarray с chunks или dask для ленивой загрузки.
Ошибка 2: Игнорировать систему координат (CRS). Точки окажутся не там, где должны быть.
Решение: Всегда проверяйте gdf.crs и при необходимости конвертируйте: gdf.to_crs('EPSG:4326').
Ошибка 3: Смешивать данные разных источников без нормализации единиц измерения.
Решение: NASA использует mol/m², Copernicus - кг/м³, наземные станции - мкг/м³. Конвертируйте всё к одной системе.
Ошибка 4: Не учитывать время измерения. Данные со спутника и наземной станции за один день могут отличаться на 6 часов.
Решение: Приводите все временные метки к UTC и учитывайте время пролёта спутника.
Что делать дальше: от анализа к предсказанию
Когда разобрались с форматами, можно строить прогнозные модели. Вот направление на 2026 год:
- Векторные базы для пространственного поиска - храните embeddings геоданных и ищите похожие паттерны загрязнения. Как в статье "Визуализация RAG в 3D".
- Графы знаний для связывания данных - стройте связи между источниками выбросов, погодой и качеством воздуха. Knowledge Graph на практике покажет как.
- Локальные LLM для анализа отчётов - автоматизируйте чтение экологических бюллетеней. Берите код из "Забей на ChatGPT".
- ИИ для спутниковых снимков - алгоритмы типа AlphaEarth Foundations уже умеют детектировать источники загрязнения по снимкам.
Самый неочевидный совет: начните с маленького региона. Не пытайтесь сразу анализировать всю планету. Возьмите один город, разберитесь с его данными за год. Поймите сезонные паттерны, источники выбросов, как влияет погода. Потом масштабируйте.
Данные о воздухе - это не просто цифры. Это здоровье людей, эффективность экологической политики, основа для климатических решений. И в 2026 году у нас наконец-то есть инструменты, чтобы работать с ними без головной боли.