Инкрементальные модели в dbt позволяют обрабатывать только новые или обновленные строки в исодной таблицы, что значительно улучшает производительность, так как не нужно пересоздавать всю таблицу.

Этот подход не только ускоряет трансформации, но и снижает вычислительные затраты. Однако настройка этих моделей может быть сложной для новичков.

Что такое инкрементальные модели?

В отличие от традиционного метода материализации таблиц, который пересоздает всю таблицу при каждом запуске задания в dbt, инкрементальные модели обрабатывают только новые или обновленные строки в вашем наборе данных.

Этот метод минимизирует объем данных, который нужно обработать, сокращая время выполнения и сижает стоимость выполнения запросов.

Как это работает?

Инкрементные модели фильтруют строки, которые были добавлены или обновлены на основе определенного столбца. Обычно это временная метка или уникальный ключ (например, номер заказа в intiger формате). В данном контексте я буду называть этот ключ контрольной точкой.

Задание dbt сравнивает значение контрольной точки в этом столбце с максимальным значением, уже имеющимся в целевой таблице.

Например, предположим, что вы работаете с таблицей поведенческих событий веб-сайта. В этом случае инкрементная модель будет обрабатывать только те новые события, время наступления которых больше, чем самая последняя дата в существующей модели данных.

Настройка инкрементной модели в dbt

Первым шагом при работе с инкрементными моделями в dbt является настройка конфигурации модели.

Укажите тип материализации

В начале модели необходимо указать тип материализации как инкрементный. Это делается с помощью блока config():

{{ config(
    materialized='incremental'
) }}

Это указывает dbt, что модель должна обновляться инкрементально, то есть во время каждого запуска она будет добавлять только новые или обновленные строки, а не перезаписывать всю таблицу целиком.

Создание модели dbt с настройкой инкрементного обновления

Далее следует основной SQL-запрос, который будет использоваться для построения модели. Вот пример запроса из реального проекта:

{{ config(
    materialized='incremental'
) }}

SELECT *, CURRENT_TIMESTAMP() AS batch_timestamp  -- capture the time of download

FROM {{ source('tracking_v2', 'renta_events') }}
WHERE date >= '2025-03-01'  -- base filter on date

{% if is_incremental() %}
  AND dateTime > (SELECT MAX(dateTime) FROM {{ this }})  -- incremental condition
{% endif %}

Как работает is_incremental()

Блок is_incremental() - это встроенная djinji функция dbt. Она возвращает true, если:

  1. Таблица уже существует в базе данных.

  2. Модель не запущена с флагом --full-refresh.

  3. Тип материализации установлен на инкрементный.

Если все эти условия выполнены, dbt применяет к запросу дополнительное условие:

AND dateTime > (SELECT MAX(dateTime) FROM {{ this }})

Это условие сравнивает столбец dateTime в исходной таблице с максимальным значением dateTime в таблице назначения (то есть, в вашей модели dbt).

Запуск модели dbt

Запустите модель dbt с помощью команды dbt run:

dbt run --select my_incremental_model

Сначала модель проверит, существует ли целевая таблица и работает ли dbt в режиме полного обновления (c флагом --full-refresh).

Если таблица не существует, dbt будет рассматривать это как полное создание таблицы. Если таблица существует, dbt просто добавит новые данные.

Что произойдет при первом запуске?

Создается новая таблица. Данные полностью мигрируются из исходной таблицы во вновь созданную модель/таблицу.

При последующих запусках dbt будет добавлять только новые строки. Отслеживать новые записи удобнее всего с помощью добавленного поля batch_timestamp, которое помогает отслеживать данные, обрабатываемые в каждом запуске. batch_timestamp - это уникальная временная метка для каждого вставленного пакета. Это позволяет отслеживать, какие записи были добавлены во время каждого запуска задания dbt, что полезно для отладки модели.

Перестроить модель с нуля

Иногда вам может понадобиться полностью обновить таблицу, например, добавить новые столбцы (хотя для этого случая есть более подходящая операция).

Чтобы выполнить полное обновление, выполните следующую команду:

dbt run --full-refresh --select my_incremental_model

Что делает эта команда:

  1. Удаляет текущую версию таблицы.

  2. Создает таблицу заново.

  3. Загружает все доступные данные из исходной таблицы (без инкрементного фильтра).

Эта команда уничтожит таблицу, удалит все существующие данные, создаст новую таблицу с новой схемой и загрузит данные с нуля.

Обработка изменений схемы с помощью инкрементных стратегий

Если схема изменяется, dbt предоставляет несколько вариантов того, как она должна обрабатывать это с помощью конфигурации on_schema_change:

Фича Описание фичи
ignore Поведение по умолчанию. Изменения схемы игнорируются.
append_new_columns Добавляет новые столбцы в таблицу, но не обновляет в них значения для существующих строк.
fail Остановите выполнение, если схема не синхронизирована с моделью.
sync_all_columns Добавляет новые и удаляет устаревшие столбцы для поддержания синхронизации.

Это поведение зависит от используемого хранилища данных (подробности см. в документации по dbt).

Ниже приведен пример установки конфигурации on_schema_change для вашей модели dbt:

{{ config(
    materialized='incremental',
    unique_key='eventId',
    incremental_strategy='merge',
    on_schema_change='append_new_columns'
) }}

Что здесь происходит?

Давайте разберем конфигурацию более подробно:

  1. materialized='incremental': Это означает, что модель будет обновляться инкрементально, добавляя только новые строки.

  2. unique_key='eventId': dbt будет использовать eventId в качестве уникального ключа таблицы. С помощью этого ключа dbt будет обнаруживать и объединять дублирующиеся строки.

  3. incremental_strategy='merge': Стратегия слияния обеспечивает добавление новых строк и обновление существующих.

  4. on_schema_change='append_new_columns': Если в исходной таблице появятся новые столбцы (например, user_agent или payment_method), они будут автоматически добавлены в модель, не вызывая ошибок.

Следует помнить, что новые столбцы, добавленные с помощью on_schema_change, не будут обновлять исторические строки, только вновь добавлены.

Если вам нужно обновить эти строки, вы должны запустить обновление модели с флагом --full-refresh.

Заключение

Надеюсь, этот материал хорошей отправной точкой для изучения инкрементных обновлений в dbt. Я также рекомендую ознакомиться с приведенными ниже ресурсами, в которых рассматриваются различные стратегии настройки инкрементных обновлений:

  1. How to Build Incremental Models от Kahan Data Solutions (я считаю, что это лучшее руководство, которое вы можете найти на YouTube).

  2. Примеры конфигураций различных инкрементные моделей dbt от команды IOMETE.

Если у вас есть вопросы, не стесняйтесь обращаться ко мне на LinkedIn.

Если вы используете dbt, рассмотрите возможность интеграции Renta ETL с dbt Cloud, что улучшает контроль над вводом и управлением конвейерами данных.

Частые вопросы