Skip to content

物化

简介

物化(Materializations)是在数据仓库中持久化Dbt模型的策略,dbt中内置了四种类型的materializations:

  • 表(table)
  • 视图(view)
  • 增量(incremental)
  • 临时表(ephemeral)

配置物化

默认情况下,dbt模型物化为视图。通过提供如下所示的物化配置参数,可以使用不同的物化来配置模型。

# dbt_project.yml配置示例:
# .
# └── models
#     ├── csvs
#     │   ├── employees.sql
#     │   └── goals.sql
#     └── events
#         ├── stg_event_log.sql
#         └── stg_event_sessions.sql

name: my_project
version: 1.0.0
config-version: 2

models:
  my_project:
    events:
      # 物化所有模型为表格
      +materialized: table
    csvs:
      # 这是多余的,不需要设置
      +materialized: view

或者,可以直接在模型sql文件内部配置物化。如果您还为特定模型设置[性能优化]配置(例如,Redshift特定配置BigQuery特定配置,这可能会很有用。

{{ config(materialized='table', sort='timestamp', dist='user_id') }}

select *
from ...

物化Materializations

视图(View)

当使用视图物化时,您的模型在每次运行时都会通过create view as语句重建为视图。

  • 优点: 不存储额外的数据,源数据上的视图会始终有最新的记录。
  • 缺点: 执行重大转换或堆叠在其他视图之上的视图查询速度较慢.
  • 建议:
    • 通常模型从视图开始,只有当您注意到性能问题时,才会更改为另一个物化方式.
    • 视图最适合于不进行重大转换的模型,例如重命名、改写列。

表(Table)

当使用table物化时,每次运行时都会通过create table as语句将模型重建为表。

  • 优点: 查询速度快
  • 缺点:
    • 重建表可能需要很长时间,尤其是对于复杂的转换
    • 基础源数据中的新记录不会自动添加到表中
  • 建议:
    • 对BI工具查询的任何模型使用表,为最终用户提供更快的体验
    • 对于许多下游模型使用的任何较慢的转换,也要使用表

增量(Incremental)

增量incremental模型允许dbt在表中插入或更新自上次运行dbt以来新的记录.

  • 优点: 只需转换新的记录,可以显著缩短构建时间
  • 缺点: 增量型号需要额外的配置,并且是dbt的高级用法。阅读更多关于使用增量模型的信息.
  • 建议:
    • 增量模型最适合事件类型的数据
    • 当您的dbt运行变得太慢时,请使用增量模型(即不要从增量模型开始)

临时表(Ephemeral)

临时表模型不直接构建到数据库中。相反,dbt将把这个模型中的代码插入到依赖模型中,作为一个常见的表格表达式。

  • 优点:
    • 您仍然可以编写可重用逻辑
    • 临时表模型可以通过减少混乱来帮助保持数据仓库的整洁(也可以考虑通过使用自定义模式将模型拆分为多个模式).
  • 缺点:
    • 你不能直接从这个模型中进行查询.
    • 通过dbt-run operation调用的宏不能ref()临时表节点
    • 过度使用临时表也会使查询更难调试.
  • 建议:
    • DAG早期的非常轻量级的转换
    • 仅用于一个或两个下流模型
    • 不需要被直接查询

Python materializations

Python模型支持两种物化方式: - 表table - 增量incremental

增量Python模型与SQL模型支持所有相同的增量策略。支持的具体策略取决于您的适配器。

Python模型不能物化为视图临时表。Python不支持非模型资源类型(如测试和快照)。

对于增量模型,如SQL模型,您需要将传入的表筛选为仅包含新行的数据:

import snowflake.snowpark.functions as F

def model(dbt, session):
    dbt.config(materialized = "incremental")
    df = dbt.ref("upstream_table")

    if dbt.is_incremental:

        # only new rows compared to max in current table
        max_from_this = f"select max(updated_at) from {dbt.this}"
        df = df.filter(df.updated_at >= session.sql(max_from_this).collect()[0][0])

        # or only rows from the past 3 days
        df = df.filter(df.updated_at >= F.dateadd("day", F.lit(-3), F.current_timestamp()))

    ...

    return df
import pyspark.sql.functions as F

def model(dbt, session):
    dbt.config(materialized = "incremental")
    df = dbt.ref("upstream_table")

    if dbt.is_incremental:

        # 当前表中仅新的记录
        max_from_this = f"select max(updated_at) from {dbt.this}"
        df = df.filter(df.updated_at >= session.sql(max_from_this).collect()[0][0])

        # or only rows from the past 3 days
        df = df.filter(df.updated_at >= F.date_add(F.current_timestamp(), F.lit(-3)))

    ...

    return df

注意: 对于merge增量策略,BigQuery/Dataproc支持增量模型。目前还不支持insert_overwrite策略。