来源
使用来源¶
来源使您可以命名和描述通过提取和加载工具加载到仓库中的数据。通过在dbt中将这些表声明为源,您可以
- 在模型中使用{{ source() }}函数来选择源数据表,有助于定义数据血缘
- 测试您对源数据的假设
- 计算源数据的新鲜度
声明来源¶
源是在.yml文件中嵌套在sources:键下定义的。
version: 2
sources:
- name: jaffle_shop
[database](/reference/resource-properties/database): raw
[schema](/reference/resource-properties/schema)*: jaffle_shop
tables:
- name: orders
- name: customers
- name: stripe
tables:
- name: payments
*默认情况下,schema将与name相同。仅当您希望使用与现有架构不同的源名称时,才添加schema
如果您还不熟悉这些文件,请务必先查看关于schema.yml文件。
选择一个数据源¶
一旦定义了源,就可以使用{{source()}}函数从模型中引用它。
select
...
from {{ source('jaffle_shop', 'orders') }}
left join {{ source('jaffle_shop', 'customers') }} using (customer_id)
dbt会编译成完整的表名:
使用 {{ source () }} 函数还会在模型和源表之间创建一个依赖关系。

测试和记录来源¶
您还可以: - 将测试添加到源 - 将描述添加到源中,这些源将作为文档网站的一部分呈现
如果您已经在模型中添加了测试和描述(如果没有,请查看关于测试和文档的指南),那么这些应该是熟悉的概念。
version: 2
sources:
- name: jaffle_shop
description: 这是我们的应用程序使用的Postgres数据库的副本
tables:
- name: orders
description: >
每个订单一条记录。包括已取消和已删除的订单.
columns:
- name: id
description: 订单表的Primary key
tests:
- unique
- not_null
- name: status
description: 请注意,状态可能会随着时间的推移而变化
- name: ...
- name: ...
您可以在参考部分中找到有关源可用属性的更多详细信息.
捕捉源数据的新鲜度¶
通过几个额外的配置,dbt可以选择性地快照源表中数据的新鲜度。这有助于了解数据管道是否处于健康状态,并且是为仓库定义SLAs的关键组成部分。
声明数据源新鲜度¶
要将数据源配置为快照新鲜度信息,请在源中添加freshness,并在表声明中添加loaded_at_field:
version: 2
sources:
- name: jaffle_shop
database: raw
freshness: # 默认新鲜度
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
loaded_at_field: _etl_loaded_at
tables:
- name: orders
freshness: # 再严格一点
warn_after: {count: 6, period: hour}
error_after: {count: 12, period: hour}
- name: customers # 这将使用上面定义的新鲜度
- name: product_skus
freshness: null # 不检查这张表的新鲜度
在freshness块中,可以提供warn_after和error_after中的一个或两个。如果两者都不提供,那么dbt将不会为该源中的表计算新鲜度快照。
此外,需要loaded_at_field来计算表的新鲜度。如果没有提供loaded_at_field,那么dbt将不会计算表的新鲜度。
这些配置是分层应用的,因此为源指定的freshness和loaded_at字段值将流向该源中定义的所有表。当源中的所有表都具有相同的loaded_at_field时,这很有用,因为配置只能在顶级源定义中指定一次。
检查源新鲜度¶
要快照源的新鲜度信息,请使用dbt source freshness命令(参考文档):
在后台,dbt使用新鲜度属性构造一个select查询,如下所示。您可以在日志中找到此查询。
select
max(_etl_loaded_at) as max_loaded_at,
convert_timezone('UTC', current_timestamp()) as snapshotted_at
from raw.jaffle_shop.orders
此查询的结果用于确定源是否新鲜:
