物化
简介¶
物化(Materializations)是在数据仓库中持久化Dbt模型的策略,dbt中内置了四种类型的materializations:
- 表(table)
- 视图(view)
- 增量(incremental)
- 临时表(ephemeral)
配置物化¶
默认情况下,dbt模型物化为视图。通过提供如下所示的物化配置参数,可以使用不同的物化来配置模型。
# dbt_project.yml配置示例:
# .
# └── models
# ├── csvs
# │ ├── employees.sql
# │ └── goals.sql
# └── events
# ├── stg_event_log.sql
# └── stg_event_sessions.sql
name: my_project
version: 1.0.0
config-version: 2
models:
my_project:
events:
# 物化所有模型为表格
+materialized: table
csvs:
# 这是多余的,不需要设置
+materialized: view
或者,可以直接在模型sql文件内部配置物化。如果您还为特定模型设置[性能优化]配置(例如,Redshift特定配置或BigQuery特定配置,这可能会很有用。
物化Materializations¶
视图(View)¶
当使用视图物化时,您的模型在每次运行时都会通过create view as语句重建为视图。
- 优点: 不存储额外的数据,源数据上的视图会始终有最新的记录。
- 缺点: 执行重大转换或堆叠在其他视图之上的视图查询速度较慢.
- 建议:
- 通常模型从视图开始,只有当您注意到性能问题时,才会更改为另一个物化方式.
- 视图最适合于不进行重大转换的模型,例如重命名、改写列。
表(Table)¶
当使用table物化时,每次运行时都会通过create table as语句将模型重建为表。
- 优点: 查询速度快
- 缺点:
- 重建表可能需要很长时间,尤其是对于复杂的转换
- 基础源数据中的新记录不会自动添加到表中
- 建议:
- 对BI工具查询的任何模型使用表,为最终用户提供更快的体验
- 对于许多下游模型使用的任何较慢的转换,也要使用表
增量(Incremental)¶
增量incremental模型允许dbt在表中插入或更新自上次运行dbt以来新的记录.
- 优点: 只需转换新的记录,可以显著缩短构建时间
- 缺点: 增量型号需要额外的配置,并且是dbt的高级用法。阅读更多关于使用增量模型的信息.
- 建议:
- 增量模型最适合事件类型的数据
- 当您的dbt运行变得太慢时,请使用增量模型(即不要从增量模型开始)
临时表(Ephemeral)¶
临时表模型不直接构建到数据库中。相反,dbt将把这个模型中的代码插入到依赖模型中,作为一个常见的表格表达式。
- 优点:
- 您仍然可以编写可重用逻辑
- 临时表模型可以通过减少混乱来帮助保持数据仓库的整洁(也可以考虑通过使用自定义模式将模型拆分为多个模式).
- 缺点:
- 你不能直接从这个模型中进行查询.
- 通过
dbt-run operation调用的宏不能ref()临时表节点 - 过度使用临时表也会使查询更难调试.
- 建议:
- DAG早期的非常轻量级的转换
- 仅用于一个或两个下流模型
- 不需要被直接查询
Python materializations¶
Python模型支持两种物化方式:
- 表table
- 增量incremental
增量Python模型与SQL模型支持所有相同的增量策略。支持的具体策略取决于您的适配器。
Python模型不能物化为视图或临时表。Python不支持非模型资源类型(如测试和快照)。
对于增量模型,如SQL模型,您需要将传入的表筛选为仅包含新行的数据:
import snowflake.snowpark.functions as F
def model(dbt, session):
dbt.config(materialized = "incremental")
df = dbt.ref("upstream_table")
if dbt.is_incremental:
# only new rows compared to max in current table
max_from_this = f"select max(updated_at) from {dbt.this}"
df = df.filter(df.updated_at >= session.sql(max_from_this).collect()[0][0])
# or only rows from the past 3 days
df = df.filter(df.updated_at >= F.dateadd("day", F.lit(-3), F.current_timestamp()))
...
return df
import pyspark.sql.functions as F
def model(dbt, session):
dbt.config(materialized = "incremental")
df = dbt.ref("upstream_table")
if dbt.is_incremental:
# 当前表中仅新的记录
max_from_this = f"select max(updated_at) from {dbt.this}"
df = df.filter(df.updated_at >= session.sql(max_from_this).collect()[0][0])
# or only rows from the past 3 days
df = df.filter(df.updated_at >= F.date_add(F.current_timestamp(), F.lit(-3)))
...
return df
注意: 对于merge增量策略,BigQuery/Dataproc支持增量模型。目前还不支持insert_overwrite策略。