SQLLab
Все статьи

ETL-паттерны в SQL: загрузка, трансформация и обновление данных

Практические ETL-паттерны на SQL: upsert, SCD Type 2, инкрементальная загрузка, дедупликация, разбивка на батчи, аудитные поля. Для дата-инженеров.

20 марта 2026 г.·6 мин чтения·

ETL (Extract, Transform, Load) — основа работы дата-инженера. SQL — язык трансформации данных, и большинство ETL-паттернов реализуются прямо в базе без Python. Разберём ключевые паттерны.

1. Upsert: вставить или обновить

Самый частый паттерн — вставить строку, если её нет, обновить если есть:

-- PostgreSQL: INSERT ... ON CONFLICT
INSERT INTO products (sku, name, price, updated_at)
SELECT sku, name, price, NOW()
FROM staging_products

ON CONFLICT (sku) DO UPDATE SET
    name       = EXCLUDED.name,
    price      = EXCLUDED.price,
    updated_at = NOW()
WHERE products.price <> EXCLUDED.price  -- обновить только если цена изменилась
      OR products.name <> EXCLUDED.name;
-- Upsert только новых (не трогать существующие)
INSERT INTO products (sku, name, price)
SELECT sku, name, price FROM staging_products
ON CONFLICT (sku) DO NOTHING;

2. Инкрементальная загрузка

Загружать только новые/изменённые записи:

-- Хранить watermark (последнее загруженное время)
CREATE TABLE etl_watermarks (
    table_name TEXT PRIMARY KEY,
    last_loaded_at TIMESTAMPTZ NOT NULL DEFAULT '1970-01-01'
);

-- Загрузить только новые записи
INSERT INTO orders (id, user_id, amount, created_at)
SELECT id, user_id, amount, created_at
FROM source_orders
WHERE created_at > (
    SELECT last_loaded_at FROM etl_watermarks WHERE table_name = 'orders'
);

-- Обновить watermark
UPDATE etl_watermarks
SET last_loaded_at = NOW()
WHERE table_name = 'orders';

3. SCD Type 2: история изменений (медленно меняющиеся измерения)

Сохранять историю: каждое изменение — новая строка с датами действия:

CREATE TABLE dim_users (
    id          BIGSERIAL PRIMARY KEY,
    user_id     INTEGER NOT NULL,
    email       TEXT,
    segment     TEXT,
    valid_from  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    valid_to    TIMESTAMPTZ,           -- NULL = текущая версия
    is_current  BOOLEAN DEFAULT true
);

-- Загрузить изменение:
-- 1. Закрыть старую версию
UPDATE dim_users
SET valid_to = NOW(), is_current = false
WHERE user_id = 42 AND is_current = true
  AND segment <> 'premium';  -- только если реально изменилось

-- 2. Вставить новую версию
INSERT INTO dim_users (user_id, email, segment, valid_from, is_current)
SELECT 42, 'alice@example.com', 'premium', NOW(), true
WHERE EXISTS (
    SELECT 1 FROM source_users WHERE id = 42 AND segment = 'premium'
) AND NOT EXISTS (
    SELECT 1 FROM dim_users WHERE user_id = 42 AND is_current = true AND segment = 'premium'
);
-- Запрос актуального состояния
SELECT * FROM dim_users WHERE is_current = true;

-- Состояние на определённую дату
SELECT * FROM dim_users
WHERE user_id = 42
  AND valid_from <= '2026-01-15'
  AND (valid_to IS NULL OR valid_to > '2026-01-15');

4. Дедупликация при загрузке

-- Загрузить уникальные строки, игнорировать дубли
INSERT INTO events (external_id, user_id, event_type, occurred_at)
SELECT DISTINCT ON (external_id)
    external_id, user_id, event_type, occurred_at
FROM staging_events
ORDER BY external_id, occurred_at  -- при дубле — берём первую по времени
ON CONFLICT (external_id) DO NOTHING;

-- Или через ROW_NUMBER
INSERT INTO events (external_id, user_id, event_type, occurred_at)
SELECT external_id, user_id, event_type, occurred_at
FROM (
    SELECT *,
           ROW_NUMBER() OVER (PARTITION BY external_id ORDER BY occurred_at) AS rn
    FROM staging_events
) t
WHERE rn = 1
ON CONFLICT (external_id) DO NOTHING;

5. Batch-обработка больших данных

-- Удалять старые данные батчами по 10000 строк (не блокируем таблицу)
DO $$
DECLARE
    deleted_count INTEGER;
BEGIN
    LOOP
        DELETE FROM events
        WHERE id IN (
            SELECT id FROM events
            WHERE created_at < NOW() - INTERVAL '1 year'
            LIMIT 10000
        );

        GET DIAGNOSTICS deleted_count = ROW_COUNT;
        EXIT WHEN deleted_count = 0;

        PERFORM pg_sleep(0.1);  -- небольшая пауза между батчами
    END LOOP;
END;
$$;
-- INSERT батчами из staging
INSERT INTO target_table
SELECT * FROM staging_table
WHERE id BETWEEN :batch_start AND :batch_end;

6. Аудитные поля: автоматическое заполнение

-- Таблица с аудитными полями
CREATE TABLE orders (
    id          BIGSERIAL PRIMARY KEY,
    user_id     INTEGER NOT NULL,
    amount      NUMERIC NOT NULL,
    -- Аудит:
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    created_by  TEXT NOT NULL DEFAULT current_user,
    updated_by  TEXT NOT NULL DEFAULT current_user,
    version     INTEGER NOT NULL DEFAULT 1
);

-- Триггер для автоматического обновления
CREATE OR REPLACE FUNCTION update_audit_fields()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at := NOW();
    NEW.updated_by := current_user;
    NEW.version := OLD.version + 1;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER audit_orders
BEFORE UPDATE ON orders
FOR EACH ROW EXECUTE FUNCTION update_audit_fields();

7. Перезагрузка партиции

Эффективная перезагрузка данных за период:

-- Загрузить данные за март в одну транзакцию
BEGIN;

-- Удалить старые данные за март
DELETE FROM sales_fact
WHERE sale_date BETWEEN '2026-03-01' AND '2026-03-31';

-- Вставить свежие данные
INSERT INTO sales_fact (sale_date, product_id, amount, units)
SELECT sale_date, product_id, amount, units
FROM staging_sales
WHERE sale_date BETWEEN '2026-03-01' AND '2026-03-31';

COMMIT;

8. Merge-паттерн через CTE

-- Атомарный upsert для нескольких операций
WITH
new_data AS (
    SELECT id, name, value, NOW() AS loaded_at
    FROM staging_table
),
updated AS (
    UPDATE target_table t
    SET name = n.name, value = n.value, loaded_at = n.loaded_at
    FROM new_data n WHERE n.id = t.id
    RETURNING t.id
),
inserted AS (
    INSERT INTO target_table (id, name, value, loaded_at)
    SELECT id, name, value, loaded_at FROM new_data
    WHERE id NOT IN (SELECT id FROM updated)
    RETURNING id
)
SELECT
    (SELECT COUNT(*) FROM updated)  AS updated_rows,
    (SELECT COUNT(*) FROM inserted) AS inserted_rows;

9. Проверка целостности после загрузки

-- Сравнить количество строк источника и цели
SELECT
    'source' AS location, COUNT(*) AS cnt FROM staging_sales WHERE month = '2026-03-01'
UNION ALL
SELECT 'target', COUNT(*) FROM sales_fact WHERE sale_date >= '2026-03-01' AND sale_date < '2026-04-01';

-- Сравнить суммы
SELECT
    COALESCE(
        (SELECT SUM(amount) FROM staging_sales WHERE month = '2026-03-01') =
        (SELECT SUM(amount) FROM sales_fact WHERE sale_date >= '2026-03-01' AND sale_date < '2026-04-01'),
        false
    ) AS amounts_match;

Итог: ключевые ETL-паттерны

ПаттернSQL-техника
UpsertINSERT ... ON CONFLICT DO UPDATE
Только новыеON CONFLICT DO NOTHING
ИнкрементальностьWatermark + WHERE created_at > last_loaded
История измененийSCD Type 2 (valid_from/valid_to + is_current)
ДедупликацияDISTINCT ON или ROW_NUMBER = 1
БатчиLIMIT N в цикле
Перезагрузка периодаDELETE + INSERT в транзакции
АудитТриггер на updated_at, updated_by, version

SQL ETL — мощный и эффективный подход для большинства задач преобразования данных. Для сложных оркестраций добавьте dbt или Airflow поверх SQL-трансформаций.

Похожие статьи

Попробуй на практике

Тренажёр с реальными задачами — бесплатно и без регистрации

Открыть тренажёр →