Dagster & dbt Cloud
Dagster allows you to run dbt Cloud jobs alongside other technologies. You can schedule them to run as a step in a larger pipeline and manage them as a data asset.
Our updated dbt Cloud integration offers two capabilities:
- Observability - You can view your dbt Cloud assets in the Dagster Asset Graph and double click into run/materialization history.
- Orchestration - You can use Dagster to schedule runs/materializations of your dbt Cloud assets, either on a cron schedule, or based on upstream dependencies.
Installation
- uv
- pip
uv add dagster-dbt
pip install dagster-dbt
Observability example
To make use of the observability capability, you will need to add code to your Dagster project that does the following:
- Defines your dbt Cloud credentials and workspace.
- Uses the integration to create asset specs for models in the workspace.
- Builds a sensor which will poll dbt Cloud for updates on runs/materialization history and dbt Cloud Assets.
import os
import dagster as dg
from dagster_dbt.cloud_v2.resources import (
DbtCloudCredentials,
DbtCloudWorkspace,
load_dbt_cloud_asset_specs,
)
from dagster_dbt.cloud_v2.sensor_builder import build_dbt_cloud_polling_sensor
# Define credentials
creds = DbtCloudCredentials(
account_id=os.getenv("DBT_CLOUD_ACCOUNT_ID"),
access_url=os.getenv("DBT_CLOUD_ACCESS_URL"),
token=os.getenv("DBT_CLOUD_TOKEN"),
)
# Define the workspace
workspace = DbtCloudWorkspace(
credentials=creds,
project_id=os.getenv("DBT_CLOUD_PROJECT_ID"),
environment_id=os.getenv("DBT_CLOUD_ENVIRONMENT_ID"),
)
# Use the integration to create asset specs for models in the workspace
dbt_cloud_asset_specs = load_dbt_cloud_asset_specs(workspace=workspace)
# Build a sensor which will poll dbt Cloud for updates on runs/materialization history
# and dbt Cloud Assets
dbt_cloud_polling_sensor = build_dbt_cloud_polling_sensor(workspace=workspace)
Orchestration example
To make use of the orchestration capability, you will need to add code to your Dagster project that does the following:
- Defines your dbt Cloud credentials and workspace.
- Builds your asset graph in a materializable way.
- Adds these assets to the Declarative Automation Sensor.
- Builds a sensor to poll dbt Cloud for updates on runs/materialization history and dbt Cloud Assets.
import os
import dagster as dg
from dagster_dbt.cloud_v2.asset_decorator import dbt_cloud_assets
from dagster_dbt.cloud_v2.resources import DbtCloudCredentials, DbtCloudWorkspace
from dagster_dbt.cloud_v2.sensor_builder import build_dbt_cloud_polling_sensor
# Define credentials
creds = DbtCloudCredentials(
account_id=os.getenv("DBT_CLOUD_ACCOUNT_ID"),
access_url=os.getenv("DBT_CLOUD_ACCESS_URL"),
token=os.getenv("DBT_CLOUD_TOKEN"),
)
# Define the worskpace
workspace = DbtCloudWorkspace(
credentials=creds,
project_id=os.getenv("DBT_CLOUD_PROJECT_ID"),
environment_id=os.getenv("DBT_CLOUD_ENVIRONMENT_ID"),
)
# Builds your asset graph in a materializable way
@dbt_cloud_assets(workspace=workspace)
def my_dbt_cloud_assets(
context: dg.AssetExecutionContext, dbt_cloud: DbtCloudWorkspace
):
yield from dbt_cloud.cli(args=["build"], context=context).wait()
# Automates your assets using Declarative Automation
# https://docs.dagster.io/guides/automate/declarative-automation
my_dbt_cloud_assets = my_dbt_cloud_assets.map_asset_specs(
lambda spec: spec.replace_attributes(
automation_condition=dg.AutomationCondition.eager()
)
)
# Adds these assets to the Declarative Automation Sensor
automation_sensor = dg.AutomationConditionSensorDefinition(
name="automation_sensor",
target="*",
default_status=dg.DefaultSensorStatus.RUNNING,
minimum_interval_seconds=1,
)
# Build a sensor which will poll dbt Cloud for updates on runs/materialization history
# and dbt Cloud Assets
dbt_cloud_polling_sensor = build_dbt_cloud_polling_sensor(workspace=workspace)
About dbt Cloud
dbt Cloud is a hosted service for running dbt jobs. It helps data analysts and engineers productionize dbt deployments. Beyond dbt open source, dbt Cloud provides scheduling , CI/CD, serving documentation, and monitoring & alerting.
If you're currently using dbt Cloud™, you can also use Dagster to run dbt-core
in its place. You can read more about how to do that here.