概述¶
增量模型在数据仓库中构建为表。第一次运行模型时,通过转换源数据的 所有 行来构建表。在随后的运行中,dbt只转换源数据中告诉dbt 仅 要筛选的记录,并将它们插入到目标表中,目标表就是已经构建的表。
通常,在增量运行中筛选的记录将是源数据中自上次运行dbt以来创建或更新的记录。因此,在每次运行dbt时,您的模型都是以增量方式构建的。
使用增量模型会限制需要转换的数据量,从而大大减少转换的运行时间。这提高了仓库性能并降低了计算成本。
使用增量物化¶
与dbt中内置的其他物化一样,增量模型是用select语句定义的,物化在配置块中定义。
要使用增量模型,还需要告诉dbt:
- 如何在增量运行中筛选记录
- 模型的unique key(如果有)
在增量运行中筛选记录¶
要告诉dbt应该在增量运行中转换哪些记录,请在is_incremental()宏中包装筛选这些记录的有效SQL。
通常,您需要筛选自上次dbt运行此模型以来创建的新记录。查找此模型最近运行的时间戳的最佳方法是检查目标表中的最新时间戳。dbt通过使用[{{ this }}](this)变量可以轻松查询目标表。
同样常见的是想要捕获新的和更新的记录。对于更新的记录,您需要定义unique key ,以确保不会将修改后的记录作为重复记录引入。您的is_incremental()代码将检查自上次dbt运行此模型以来创建*或修改*的记录。
例如,一个包含列上计算缓慢转换的模型可以构建增量,如下所示:
#models/stg_events.sql
{{
config(
materialized='incremental'
)
}}
select
*,
my_slow_function(my_column)
from raw_app_data.events
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where event_time > (select max(event_time) from {{ this }})
{% endif %}
优化您的增量模型
对于使用公共表表达式(CTE)的更复杂的增量模型,应该考虑·is_incremental()`宏的位置对查询性能的影响。在一些数据仓库中,尽早过滤记录可以极大地提高查询的运行时间!
定义unique key (可选)¶
unique_key可以更新现有记录,而不仅仅是追加新记录。如果现有unique_key的新信息到达,则该新信息可以取代当前信息,而不是附加到表中。如果到达重复的记录,则可以忽略该记录。有关管理此更新行为的更多选项,如只选择要更新的特定列,请参阅策略特定配置
不指定unique_key将导致仅追加行为,这意味着dbt将模型的SQL返回的所有行插入到预先存在的目标表中,而不考虑这些行是否表示重复。
可选的unique_key参数指定定义模型粒度的字段(或字段组合)。也就是说,字段标识一个唯一的记录。您可以在模型顶部的配置块中定义unique_key,它可以是单个列名或列名列表。
在模型定义中,unique_key应作为表示单列的字符串或可一起使用的单引号列名列表提供,例如“['col1','col2',…])”。以这种方式使用的列不应包含任何null,否则增量模型运行可能会失败。请确保每一列都没有NULL(例如使用coalize(column_NAME,'VALUE_IF_NULL')),或者定义一列proxy-key例如使用dbt_utils.generate_surrogate_key)。
tip
如果需要组合多个列来唯一标识每一行,我们建议您将这些列作为列表(`unique_key=['user_id','session_number']`)而不是字符串表达式(`unique _key='concat(user_id,session_number)'`)传递。
通过使用更通用的第一种语法,dbt可以确保以适合数据库的方式将列模板化到增量模型物化中。
当您以这种方式传递列表时,请确保每一列都不包含任何null,否则增量模型运行可能会失败。
或者,您可以定义一列[代理键](/terms/proxy-key),例如使用[`dbt_utils.generate_surrogate_key`](https://github.com/dbt-labs/dbt-utils#generate_surrogate_key-source)。
当您定义unique_key时,您将看到dbt模型返回的每一行新数据的行为:
- 如果
新和旧模型数据中存在相同的unique_key,则dbt将用新数据更新/替换新行。更新/替换的确切机制将根据您的数据库、增量策略和策略特定配置而有所不同。 - 如果
旧数据中不存在unique_key,则dbt将把整行插入表中。
请注意,如果在现有的目标表或新的增量行中有一个包含多行的unique_key,则增量模型运行将失败。您的数据库和增量策略将确定您看到的特定错误,因此,如果您在运行增量模型时遇到问题,最好仔细检查唯一键在现有数据库表和新的增量行中是否真正唯一。您可以在此处了解有关替代key的更多信息。
INFO
虽然常见的增量策略,如`删除+插入`+`合并`,可能会使用`unique_key`,但其他策略则不会。例如,`insert_overwrite`策略不使用`unique_key`,因为它对数据分区而不是单个行进行操作。有关详细信息,请参阅[关于incremental_strategy](#About-incremental_strategy)。
unique_key 示例¶
考虑一个基于事件流计算每日活动用户(DAU)数量的模型。当源数据到达时,您将需要重新计算dbt上次运行的那一天以及此后任何一天的DAU数量。模型如下所示:
#models/staging/fct_daily_active_users.sql
{{
config(
materialized='incremental',
unique_key='date_day'
)
}}
select
date_trunc('day', event_at) as date_day,
count(distinct user_id) as daily_active_users
from raw_app_data.events
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where date_day >= (select max(date_day) from {{ this }})
{% endif %}
group by 1
在不使用unique_key参数的情况下以增量方式构建此模型将导致目标表中一天有多条记录,即当天dbt每次运行就会生成一条记录。相反,包含unique_key参数可确保更新现有记录。
如何重建增量模型?¶
如果增量模型逻辑发生了变化,那么新行数据的转换可能与存储在目标表中的历史转换不同。在这种情况下,您应该重新构建增量模型。
要强制dbt从头开始重建整个增量模型,请在命令行中使用--full-refresh标志。此标志将导致dbt在重新生成数据库中的现有目标表以获取历史数据之前先将其删除。
+所示。
有关详细的使用说明,请查看dbt run文档。
了解增量模型¶
什么时候应该使用增量模型?¶
通常需要在数据仓库中将模型构建为表,因为下游查询的性能更高。虽然表实体化也将模型创建为表,但它会在每次dbt运行时重新构建表。这些运行可能会出现问题,因为它们在以下情况下需要大量计算:
- 源数据表有数百万甚至数十亿行.
- 对源数据的转换在计算上是昂贵的(也就是说,执行需要很长时间),例如,复杂的Regex函数或UDF被用于转换数据.
与编程中的许多事情一样,增量模型是复杂性和性能之间的权衡。虽然它们不像视图和表物化那样简单,但它们可以显著提高dbt运行的性能。
理解is_incremental() 宏¶
如果满足以下 所有 条件,则is_incremental()宏将返回True:
- 数据库中已存在目标表
- dbt未在完全刷新模式下运行
- 运行模型配置为
materialized='incremental'
请注意,无论is_incremental()的计算结果是True还是False,模型中的SQL都需要有效。
增量模型是如何在后台工作的?¶
dbt的增量物化在不同的数据库上工作方式不同。在支持的情况下,merge语句用于插入新记录和更新现有记录。
对于不支持merge语句的仓库,通过首先使用delete语句删除目标表中要更新的记录,然后使用insert语句来实现合并。
事务管理用于确保将其作为单个工作单元执行。
如果我的增量模型的列发生了变化怎么办?¶
在v.021.0里增加 on_schema_change 配置
增量模型现在可以配置为包括可选的`on_schema_change`参数,以便在增量模型列更改时启用额外的控制。这些选项使dbt能够在模式更改的情况下继续运行增量模型,从而减少 `--full-refresh` 场景并节省查询成本。
你可以按照以下方式配置on_schema_change.
#models/staging/fct_daily_active_users.sql
{{
config(
materialized='incremental',
unique_key='date_day',
on_schema_change='fail'
)
}}
on_schema_change的可能值有:
ignore: 默认行为(见下文).fail: 当源模式和目标模式出现分歧时触发错误消息append_new_columns: 将新列追加到现有表中。请注意,此设置不会从现有表中删除新数据中不存在的列.sync_all_columns: 向现有表中添加任何新列,并删除现在丢失的所有列。请注意,这包括数据类型更改。在BigQuery上,更改列类型需要进行完整的表扫描;在实施时要注意权衡.
注意: on_schema_change行为都不会为新添加的列回填旧记录中的值。如果需要填充这些值,我们建议运行手动更新,或触发--full-refresh。
on_schema_change 跟踪顶级更改
目前,`on_schema_change`只跟踪顶级列的更改。它不跟踪嵌套列的更改。例如,在BigQuery上,添加、删除或修改嵌套列不会触发模式更改,即使`on_schema_change`设置适当。
默认行为¶
这是默认设置的on_schema_chance:ignore以及旧版本的dbt的行为。
如果在增量模型中添加一列,并执行dbt run,则该列将不会出现在目标表中。
类似地,如果从增量模型中删除一列,并执行dbt run,则该列将不会从目标表中删除。
相反,无论何时增量的逻辑发生变化,都要对增量模型和任何下游模型执行完全刷新。
关于incremental_strategy¶
在某些适配器上,可选的incremental_strategy配置控制dbt使用的代码以建立增量模型。不同的方法可能因有效性而异,这取决于数据量,您的unique_key的可靠性,或某些功能的可用性
- Snowflake:
merge(default),delete+insert(optional),append(optional) - BigQuery:
merge(default),insert_overwrite(optional) - Spark:
append(default),insert_overwrite(optional),merge(optional, Delta-only)
配置增量策略¶
incremental_strategy配置可以在特定模型中指定,也可以是dbt_project.yml文件中的所有模型:
或者:
#models/my_model.sql
{{
config(
materialized='incremental',
unique_key='date_day',
incremental_strategy='delete+insert',
...
)
}}
select ...
Strategy-specific配置¶
如果使用merge策略并指定了unique_key,默认情况下,dbt将使用新值完全覆盖匹配的行。
在支持merge合并策略的适配器(包括Snowflake、BigQuery、Apache Spark和Databricks)上,您可以选择将列名列表传递给merge_update_columns配置。在这种情况下,dbt将更新配置指定的列,并保留其他列以前的值。
#models/my_model.sql
{{
config(
materialized = 'incremental',
unique_key = 'id',
merge_update_columns = ['email', 'ip_address'],
...
)
}}
select ...
关于 incremental_predicates¶
incremental_predicates是增量模型的高级使用,其中数据量足够大,可以证明在性能方面进行额外投资是合理的。此配置接受任何有效SQL表达式的列表。dbt不检查SQL语句的语法。
这是我们可能会在Snowflake上看到的yml文件中的模型配置示例:
models:
- name: my_incremental_model
config:
materialized: incremental
unique_key: id
# this will affect how the data is stored on disk, and indexed to limit scans
cluster_by: ['session_start']
incremental_strategy: merge
# this limits the scan of the existing table to the last 7 days of data
incremental_predicates: ["DBT_INTERNAL_DEST.session_start > datediff(day, -7, current_date)"]
# `incremental_predicates` accepts a list of SQL statements.
# `DBT_INTERNAL_DEST` and `DBT_INTERNAL_SOURCE` are the standard aliases for the target table and temporary table, respectively, during an incremental run using the merge strategy.
或者,以下是在模型文件中配置的相同配置:
-- in models/my_incremental_model.sql
{{
config(
materialized = 'incremental',
unique_key = 'id',
cluster_by = ['session_start'],
incremental_strategy = 'merge',
incremental_predicates = [
"DBT_INTERNAL_DEST.session_start > datediff(day, -7, current_date)"
]
)
}}
...
这将模板化(在dbt.log文件中)一个merge语句,如:
merge into <existing_table> DBT_INTERNAL_DEST
from <temp_table_with_new_records> DBT_INTERNAL_SOURCE
on
-- unique key
DBT_INTERNAL_DEST.id = DBT_INTERNAL_SOURCE.id
and
-- custom predicate: limits data scan in the "old" data / existing table
DBT_INTERNAL_DEST.session_start > datediff(day, -7, current_date)
when matched then update ...
when not matched then insert ...
将 上游 表的数据扫描限制在其增量模型SQL的主体内,这将限制处理/转换的“新”数据量。
with large_source_table as (
select * from {{ ref('large_source_table') }}
{% if is_incremental() %}
where session_start > dateadd(day, -3, current_date)
{% endif %}
),
...
Info
语法取决于您如何配置`incremental_strategy`:
- 如果使用`merge`策略,则可能需要使用`DBT_INTERNAL_DEST`(“旧”数据)或`DBT_TERNAL_SOURCE`(“新”数据)显式别名任何列.
- `insert_overwrite`增量策略在概念上有相当多的重叠.