Дата-инженер использует SQL не для разовых аналитических запросов, а для построения надёжных пайплайнов. Это другой уровень: идемпотентность, обработка поздних данных, партиционирование.
1. Идемпотентные запросы: безопасный перезапуск
Пайплайн должен давать одинаковый результат при повторном запуске:
-- НЕ идемпотентно:
INSERT INTO fact_orders SELECT * FROM stg_orders WHERE date = '2026-03-15';
-- При повторном запуске — дубликаты!
-- Идемпотентно через TRUNCATE + INSERT:
BEGIN;
DELETE FROM fact_orders WHERE order_date = '2026-03-15';
INSERT INTO fact_orders SELECT * FROM stg_orders WHERE order_date = '2026-03-15';
COMMIT;
-- Идемпотентно через ON CONFLICT:
INSERT INTO fact_orders (order_id, user_id, amount, order_date)
SELECT order_id, user_id, amount, order_date FROM stg_orders
ON CONFLICT (order_id) DO UPDATE SET
amount = EXCLUDED.amount,
updated_at = NOW();
2. Обработка поздних данных (Late Arriving Data)
События часто приходят с опозданием. Пайплайн должен уметь «переписать прошлое»:
-- Хранить время события и время загрузки
CREATE TABLE events (
event_id UUID PRIMARY KEY,
user_id INTEGER,
event_type TEXT,
event_time TIMESTAMPTZ, -- когда произошло событие
loaded_at TIMESTAMPTZ DEFAULT NOW(), -- когда загружено в DWH
batch_id TEXT -- какой батч загрузил
);
-- Найти события за вчера, загруженные сегодня (поздние данные)
SELECT COUNT(*) AS late_events
FROM events
WHERE event_time::date = CURRENT_DATE - 1
AND loaded_at::date = CURRENT_DATE;
-- Перезапуск за конкретную дату: удалить и перезагрузить
DELETE FROM fact_daily WHERE event_date = '2026-03-14';
INSERT INTO fact_daily
SELECT * FROM events WHERE event_time::date = '2026-03-14';
3. Change Data Capture (CDC) паттерн
Отслеживать только изменившиеся строки:
-- Хранить хеш строки для обнаружения изменений
ALTER TABLE dim_customers ADD COLUMN row_hash TEXT;
-- Вычислить хеш при загрузке
UPDATE dim_customers
SET row_hash = MD5(ROW(name, email, segment, city)::TEXT);
-- При следующей загрузке: найти изменения
SELECT s.*
FROM stg_customers s
JOIN dim_customers d ON d.customer_id = s.customer_id
WHERE MD5(ROW(s.name, s.email, s.segment, s.city)::TEXT) <> d.row_hash;
-- Только изменившиеся строки
4. Партиционирование для ETL
Обрабатывать данные по партициям (не весь датасет сразу):
-- Партиционированная таблица
CREATE TABLE events (
event_id UUID,
event_date DATE NOT NULL,
event_type TEXT,
user_id INTEGER
) PARTITION BY RANGE (event_date);
-- Создать партицию за месяц
CREATE TABLE events_2026_03 PARTITION OF events
FOR VALUES FROM ('2026-03-01') TO ('2026-04-01');
-- Загрузка только нужной партиции (быстро, не трогает остальные)
INSERT INTO events (event_id, event_date, event_type, user_id)
SELECT * FROM stg_events WHERE event_date >= '2026-03-01' AND event_date < '2026-04-01';
-- Переопределение партиции: мгновенная «перезагрузка» месяца
BEGIN;
TRUNCATE events_2026_03;
INSERT INTO events_2026_03 SELECT * FROM stg_events
WHERE event_date >= '2026-03-01' AND event_date < '2026-04-01';
COMMIT;
5. Data Vault: Hub, Link, Satellite
Data Vault — методология для масштабируемых хранилищ:
-- HUB: уникальные бизнес-ключи
CREATE TABLE h_customer (
hub_customer_id BIGSERIAL PRIMARY KEY,
customer_bk TEXT NOT NULL UNIQUE, -- бизнес-ключ (например CRM ID)
load_date TIMESTAMPTZ NOT NULL DEFAULT NOW(),
record_source TEXT NOT NULL
);
-- SATELLITE: атрибуты с историей
CREATE TABLE s_customer_details (
hub_customer_id BIGINT REFERENCES h_customer(hub_customer_id),
load_date TIMESTAMPTZ NOT NULL,
load_end_date TIMESTAMPTZ,
name TEXT,
email TEXT,
segment TEXT,
PRIMARY KEY (hub_customer_id, load_date)
);
-- LINK: связи между хабами
CREATE TABLE l_customer_order (
link_id BIGSERIAL PRIMARY KEY,
hub_customer_id BIGINT REFERENCES h_customer,
hub_order_id BIGINT REFERENCES h_order,
load_date TIMESTAMPTZ DEFAULT NOW()
);
6. Качество данных: SLA на freshness
-- Таблица мониторинга свежести данных
CREATE TABLE data_freshness (
table_name TEXT PRIMARY KEY,
max_loaded_at TIMESTAMPTZ,
sla_hours INTEGER,
last_checked TIMESTAMPTZ DEFAULT NOW()
);
-- Проверка: не превысили ли SLA?
SELECT
table_name,
max_loaded_at,
sla_hours,
EXTRACT(EPOCH FROM (NOW() - max_loaded_at)) / 3600 AS hours_since_update,
CASE
WHEN EXTRACT(EPOCH FROM (NOW() - max_loaded_at)) / 3600 > sla_hours
THEN '🔴 SLA нарушен'
ELSE '✅ OK'
END AS status
FROM data_freshness
ORDER BY hours_since_update DESC;
-- Обновить мониторинг
INSERT INTO data_freshness (table_name, max_loaded_at, sla_hours)
SELECT 'fact_orders', MAX(loaded_at), 4 FROM fact_orders
ON CONFLICT (table_name) DO UPDATE SET
max_loaded_at = EXCLUDED.max_loaded_at,
last_checked = NOW();
7. Параллельная загрузка батчей
-- Разбить данные на батчи по хешу для параллельной обработки
SELECT *
FROM stg_orders
WHERE MOD(ABS(HASHTEXT(order_id::text)), :total_workers) = :worker_id;
-- :total_workers = 4, :worker_id = 0/1/2/3 — каждый воркер берёт свою долю
8. Версионирование трансформаций
-- Хранить метаданные трансформации
CREATE TABLE pipeline_runs (
run_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
pipeline_name TEXT NOT NULL,
start_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
end_time TIMESTAMPTZ,
status TEXT, -- running / success / failed
rows_processed INTEGER,
error_message TEXT,
git_commit TEXT -- хеш коммита с кодом трансформации
);
-- В пайплайне:
INSERT INTO pipeline_runs (pipeline_name, git_commit)
VALUES ('fact_orders_daily', :git_sha)
RETURNING run_id INTO :run_id;
-- После выполнения:
UPDATE pipeline_runs
SET status = 'success', end_time = NOW(), rows_processed = :count
WHERE run_id = :run_id;
Итог: принципы SQL для дата-инженера
| Принцип | Техника |
|---|---|
| Идемпотентность | DELETE + INSERT или ON CONFLICT |
| Поздние данные | Хранить event_time отдельно от loaded_at |
| Обнаружение изменений | MD5(ROW(...)) хеш |
| Производительность | Партиционирование, батчи |
| Мониторинг | freshness таблица + алерты |
| Отладка | pipeline_runs с метаданными |
| Параллелизм | HASHTEXT % workers |
Надёжный пайплайн = идемпотентный + мониторинг + обработка поздних данных.