SQLLab
Все статьи

SQL для дата-инженера: паттерны и техники 2026

SQL-паттерны для дата-инженеров: партиционирование запросов, оконные функции для ETL, обработка поздних данных, идемпотентные пайплайны, Data Vault.

21 марта 2026 г.·5 мин чтения·

Дата-инженер использует SQL не для разовых аналитических запросов, а для построения надёжных пайплайнов. Это другой уровень: идемпотентность, обработка поздних данных, партиционирование.

1. Идемпотентные запросы: безопасный перезапуск

Пайплайн должен давать одинаковый результат при повторном запуске:

-- НЕ идемпотентно:
INSERT INTO fact_orders SELECT * FROM stg_orders WHERE date = '2026-03-15';
-- При повторном запуске — дубликаты!

-- Идемпотентно через TRUNCATE + INSERT:
BEGIN;
DELETE FROM fact_orders WHERE order_date = '2026-03-15';
INSERT INTO fact_orders SELECT * FROM stg_orders WHERE order_date = '2026-03-15';
COMMIT;

-- Идемпотентно через ON CONFLICT:
INSERT INTO fact_orders (order_id, user_id, amount, order_date)
SELECT order_id, user_id, amount, order_date FROM stg_orders
ON CONFLICT (order_id) DO UPDATE SET
    amount = EXCLUDED.amount,
    updated_at = NOW();

2. Обработка поздних данных (Late Arriving Data)

События часто приходят с опозданием. Пайплайн должен уметь «переписать прошлое»:

-- Хранить время события и время загрузки
CREATE TABLE events (
    event_id      UUID PRIMARY KEY,
    user_id       INTEGER,
    event_type    TEXT,
    event_time    TIMESTAMPTZ,  -- когда произошло событие
    loaded_at     TIMESTAMPTZ DEFAULT NOW(),  -- когда загружено в DWH
    batch_id      TEXT         -- какой батч загрузил
);

-- Найти события за вчера, загруженные сегодня (поздние данные)
SELECT COUNT(*) AS late_events
FROM events
WHERE event_time::date = CURRENT_DATE - 1
  AND loaded_at::date = CURRENT_DATE;

-- Перезапуск за конкретную дату: удалить и перезагрузить
DELETE FROM fact_daily WHERE event_date = '2026-03-14';
INSERT INTO fact_daily
SELECT * FROM events WHERE event_time::date = '2026-03-14';

3. Change Data Capture (CDC) паттерн

Отслеживать только изменившиеся строки:

-- Хранить хеш строки для обнаружения изменений
ALTER TABLE dim_customers ADD COLUMN row_hash TEXT;

-- Вычислить хеш при загрузке
UPDATE dim_customers
SET row_hash = MD5(ROW(name, email, segment, city)::TEXT);

-- При следующей загрузке: найти изменения
SELECT s.*
FROM stg_customers s
JOIN dim_customers d ON d.customer_id = s.customer_id
WHERE MD5(ROW(s.name, s.email, s.segment, s.city)::TEXT) <> d.row_hash;
-- Только изменившиеся строки

4. Партиционирование для ETL

Обрабатывать данные по партициям (не весь датасет сразу):

-- Партиционированная таблица
CREATE TABLE events (
    event_id   UUID,
    event_date DATE NOT NULL,
    event_type TEXT,
    user_id    INTEGER
) PARTITION BY RANGE (event_date);

-- Создать партицию за месяц
CREATE TABLE events_2026_03 PARTITION OF events
    FOR VALUES FROM ('2026-03-01') TO ('2026-04-01');

-- Загрузка только нужной партиции (быстро, не трогает остальные)
INSERT INTO events (event_id, event_date, event_type, user_id)
SELECT * FROM stg_events WHERE event_date >= '2026-03-01' AND event_date < '2026-04-01';

-- Переопределение партиции: мгновенная «перезагрузка» месяца
BEGIN;
TRUNCATE events_2026_03;
INSERT INTO events_2026_03 SELECT * FROM stg_events
WHERE event_date >= '2026-03-01' AND event_date < '2026-04-01';
COMMIT;

5. Data Vault: Hub, Link, Satellite

Data Vault — методология для масштабируемых хранилищ:

-- HUB: уникальные бизнес-ключи
CREATE TABLE h_customer (
    hub_customer_id BIGSERIAL PRIMARY KEY,
    customer_bk     TEXT NOT NULL UNIQUE,  -- бизнес-ключ (например CRM ID)
    load_date       TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    record_source   TEXT NOT NULL
);

-- SATELLITE: атрибуты с историей
CREATE TABLE s_customer_details (
    hub_customer_id BIGINT REFERENCES h_customer(hub_customer_id),
    load_date       TIMESTAMPTZ NOT NULL,
    load_end_date   TIMESTAMPTZ,
    name            TEXT,
    email           TEXT,
    segment         TEXT,
    PRIMARY KEY (hub_customer_id, load_date)
);

-- LINK: связи между хабами
CREATE TABLE l_customer_order (
    link_id         BIGSERIAL PRIMARY KEY,
    hub_customer_id BIGINT REFERENCES h_customer,
    hub_order_id    BIGINT REFERENCES h_order,
    load_date       TIMESTAMPTZ DEFAULT NOW()
);

6. Качество данных: SLA на freshness

-- Таблица мониторинга свежести данных
CREATE TABLE data_freshness (
    table_name    TEXT PRIMARY KEY,
    max_loaded_at TIMESTAMPTZ,
    sla_hours     INTEGER,
    last_checked  TIMESTAMPTZ DEFAULT NOW()
);

-- Проверка: не превысили ли SLA?
SELECT
    table_name,
    max_loaded_at,
    sla_hours,
    EXTRACT(EPOCH FROM (NOW() - max_loaded_at)) / 3600 AS hours_since_update,
    CASE
        WHEN EXTRACT(EPOCH FROM (NOW() - max_loaded_at)) / 3600 > sla_hours
        THEN '🔴 SLA нарушен'
        ELSE '✅ OK'
    END AS status
FROM data_freshness
ORDER BY hours_since_update DESC;

-- Обновить мониторинг
INSERT INTO data_freshness (table_name, max_loaded_at, sla_hours)
SELECT 'fact_orders', MAX(loaded_at), 4 FROM fact_orders
ON CONFLICT (table_name) DO UPDATE SET
    max_loaded_at = EXCLUDED.max_loaded_at,
    last_checked = NOW();

7. Параллельная загрузка батчей

-- Разбить данные на батчи по хешу для параллельной обработки
SELECT *
FROM stg_orders
WHERE MOD(ABS(HASHTEXT(order_id::text)), :total_workers) = :worker_id;
-- :total_workers = 4, :worker_id = 0/1/2/3 — каждый воркер берёт свою долю

8. Версионирование трансформаций

-- Хранить метаданные трансформации
CREATE TABLE pipeline_runs (
    run_id        UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    pipeline_name TEXT NOT NULL,
    start_time    TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    end_time      TIMESTAMPTZ,
    status        TEXT,  -- running / success / failed
    rows_processed INTEGER,
    error_message TEXT,
    git_commit    TEXT  -- хеш коммита с кодом трансформации
);

-- В пайплайне:
INSERT INTO pipeline_runs (pipeline_name, git_commit)
VALUES ('fact_orders_daily', :git_sha)
RETURNING run_id INTO :run_id;

-- После выполнения:
UPDATE pipeline_runs
SET status = 'success', end_time = NOW(), rows_processed = :count
WHERE run_id = :run_id;

Итог: принципы SQL для дата-инженера

ПринципТехника
ИдемпотентностьDELETE + INSERT или ON CONFLICT
Поздние данныеХранить event_time отдельно от loaded_at
Обнаружение измененийMD5(ROW(...)) хеш
ПроизводительностьПартиционирование, батчи
Мониторингfreshness таблица + алерты
Отладкаpipeline_runs с метаданными
ПараллелизмHASHTEXT % workers

Надёжный пайплайн = идемпотентный + мониторинг + обработка поздних данных.

Похожие статьи

Попробуй на практике

Тренажёр с реальными задачами — бесплатно и без регистрации

Открыть тренажёр →