ETL (Extract, Transform, Load) — основа работы дата-инженера. SQL — язык трансформации данных, и большинство ETL-паттернов реализуются прямо в базе без Python. Разберём ключевые паттерны.
1. Upsert: вставить или обновить
Самый частый паттерн — вставить строку, если её нет, обновить если есть:
-- PostgreSQL: INSERT ... ON CONFLICT
INSERT INTO products (sku, name, price, updated_at)
SELECT sku, name, price, NOW()
FROM staging_products
ON CONFLICT (sku) DO UPDATE SET
name = EXCLUDED.name,
price = EXCLUDED.price,
updated_at = NOW()
WHERE products.price <> EXCLUDED.price -- обновить только если цена изменилась
OR products.name <> EXCLUDED.name;
-- Upsert только новых (не трогать существующие)
INSERT INTO products (sku, name, price)
SELECT sku, name, price FROM staging_products
ON CONFLICT (sku) DO NOTHING;
2. Инкрементальная загрузка
Загружать только новые/изменённые записи:
-- Хранить watermark (последнее загруженное время)
CREATE TABLE etl_watermarks (
table_name TEXT PRIMARY KEY,
last_loaded_at TIMESTAMPTZ NOT NULL DEFAULT '1970-01-01'
);
-- Загрузить только новые записи
INSERT INTO orders (id, user_id, amount, created_at)
SELECT id, user_id, amount, created_at
FROM source_orders
WHERE created_at > (
SELECT last_loaded_at FROM etl_watermarks WHERE table_name = 'orders'
);
-- Обновить watermark
UPDATE etl_watermarks
SET last_loaded_at = NOW()
WHERE table_name = 'orders';
3. SCD Type 2: история изменений (медленно меняющиеся измерения)
Сохранять историю: каждое изменение — новая строка с датами действия:
CREATE TABLE dim_users (
id BIGSERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
email TEXT,
segment TEXT,
valid_from TIMESTAMPTZ NOT NULL DEFAULT NOW(),
valid_to TIMESTAMPTZ, -- NULL = текущая версия
is_current BOOLEAN DEFAULT true
);
-- Загрузить изменение:
-- 1. Закрыть старую версию
UPDATE dim_users
SET valid_to = NOW(), is_current = false
WHERE user_id = 42 AND is_current = true
AND segment <> 'premium'; -- только если реально изменилось
-- 2. Вставить новую версию
INSERT INTO dim_users (user_id, email, segment, valid_from, is_current)
SELECT 42, 'alice@example.com', 'premium', NOW(), true
WHERE EXISTS (
SELECT 1 FROM source_users WHERE id = 42 AND segment = 'premium'
) AND NOT EXISTS (
SELECT 1 FROM dim_users WHERE user_id = 42 AND is_current = true AND segment = 'premium'
);
-- Запрос актуального состояния
SELECT * FROM dim_users WHERE is_current = true;
-- Состояние на определённую дату
SELECT * FROM dim_users
WHERE user_id = 42
AND valid_from <= '2026-01-15'
AND (valid_to IS NULL OR valid_to > '2026-01-15');
4. Дедупликация при загрузке
-- Загрузить уникальные строки, игнорировать дубли
INSERT INTO events (external_id, user_id, event_type, occurred_at)
SELECT DISTINCT ON (external_id)
external_id, user_id, event_type, occurred_at
FROM staging_events
ORDER BY external_id, occurred_at -- при дубле — берём первую по времени
ON CONFLICT (external_id) DO NOTHING;
-- Или через ROW_NUMBER
INSERT INTO events (external_id, user_id, event_type, occurred_at)
SELECT external_id, user_id, event_type, occurred_at
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY external_id ORDER BY occurred_at) AS rn
FROM staging_events
) t
WHERE rn = 1
ON CONFLICT (external_id) DO NOTHING;
5. Batch-обработка больших данных
-- Удалять старые данные батчами по 10000 строк (не блокируем таблицу)
DO $$
DECLARE
deleted_count INTEGER;
BEGIN
LOOP
DELETE FROM events
WHERE id IN (
SELECT id FROM events
WHERE created_at < NOW() - INTERVAL '1 year'
LIMIT 10000
);
GET DIAGNOSTICS deleted_count = ROW_COUNT;
EXIT WHEN deleted_count = 0;
PERFORM pg_sleep(0.1); -- небольшая пауза между батчами
END LOOP;
END;
$$;
-- INSERT батчами из staging
INSERT INTO target_table
SELECT * FROM staging_table
WHERE id BETWEEN :batch_start AND :batch_end;
6. Аудитные поля: автоматическое заполнение
-- Таблица с аудитными полями
CREATE TABLE orders (
id BIGSERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
amount NUMERIC NOT NULL,
-- Аудит:
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_by TEXT NOT NULL DEFAULT current_user,
updated_by TEXT NOT NULL DEFAULT current_user,
version INTEGER NOT NULL DEFAULT 1
);
-- Триггер для автоматического обновления
CREATE OR REPLACE FUNCTION update_audit_fields()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at := NOW();
NEW.updated_by := current_user;
NEW.version := OLD.version + 1;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER audit_orders
BEFORE UPDATE ON orders
FOR EACH ROW EXECUTE FUNCTION update_audit_fields();
7. Перезагрузка партиции
Эффективная перезагрузка данных за период:
-- Загрузить данные за март в одну транзакцию
BEGIN;
-- Удалить старые данные за март
DELETE FROM sales_fact
WHERE sale_date BETWEEN '2026-03-01' AND '2026-03-31';
-- Вставить свежие данные
INSERT INTO sales_fact (sale_date, product_id, amount, units)
SELECT sale_date, product_id, amount, units
FROM staging_sales
WHERE sale_date BETWEEN '2026-03-01' AND '2026-03-31';
COMMIT;
8. Merge-паттерн через CTE
-- Атомарный upsert для нескольких операций
WITH
new_data AS (
SELECT id, name, value, NOW() AS loaded_at
FROM staging_table
),
updated AS (
UPDATE target_table t
SET name = n.name, value = n.value, loaded_at = n.loaded_at
FROM new_data n WHERE n.id = t.id
RETURNING t.id
),
inserted AS (
INSERT INTO target_table (id, name, value, loaded_at)
SELECT id, name, value, loaded_at FROM new_data
WHERE id NOT IN (SELECT id FROM updated)
RETURNING id
)
SELECT
(SELECT COUNT(*) FROM updated) AS updated_rows,
(SELECT COUNT(*) FROM inserted) AS inserted_rows;
9. Проверка целостности после загрузки
-- Сравнить количество строк источника и цели
SELECT
'source' AS location, COUNT(*) AS cnt FROM staging_sales WHERE month = '2026-03-01'
UNION ALL
SELECT 'target', COUNT(*) FROM sales_fact WHERE sale_date >= '2026-03-01' AND sale_date < '2026-04-01';
-- Сравнить суммы
SELECT
COALESCE(
(SELECT SUM(amount) FROM staging_sales WHERE month = '2026-03-01') =
(SELECT SUM(amount) FROM sales_fact WHERE sale_date >= '2026-03-01' AND sale_date < '2026-04-01'),
false
) AS amounts_match;
Итог: ключевые ETL-паттерны
| Паттерн | SQL-техника |
|---|---|
| Upsert | INSERT ... ON CONFLICT DO UPDATE |
| Только новые | ON CONFLICT DO NOTHING |
| Инкрементальность | Watermark + WHERE created_at > last_loaded |
| История изменений | SCD Type 2 (valid_from/valid_to + is_current) |
| Дедупликация | DISTINCT ON или ROW_NUMBER = 1 |
| Батчи | LIMIT N в цикле |
| Перезагрузка периода | DELETE + INSERT в транзакции |
| Аудит | Триггер на updated_at, updated_by, version |
SQL ETL — мощный и эффективный подход для большинства задач преобразования данных. Для сложных оркестраций добавьте dbt или Airflow поверх SQL-трансформаций.