Components ETL pipeline tutorial
dg
and Dagster Components are under active development. You may encounter feature gaps, and the APIs may change. To report issues or give feedback, please join the #dg-components channel in the Dagster Community Slack.
Setup
1. Install duckdb
and tree
First, install duckdb
for a local database and tree
to visualize project structure:
- Mac
- Windows
- Linux
tree
is optional and is only used to produce a nicely formatted representation of the project structure on the comand line. You can also use find
, ls
, dir
, or any other directory listing command.
2. Install dg
Next, follow the dg
installation steps to install the dg
command line tool. dg
allows you to quickly create a components-ready Dagster project.
3. Create a new Dagster project
After installing dependencies, create a components-ready Dagster project. The steps for creating a project will depend on your package manager/environment management strategy.
- uv
- pip
First, run the command below, and respond yes to the prompt to run uv sync
after scaffolding:
dg init jaffle-platform
Next, enter the directory and activate the virtual environment:
cd jaffle-platform && source .venv/bin/activate
Running uv sync
after creating a Dagster project creates a virtual environment and installs the dependencies listed in pyproject.toml
, along with jaffle-platform
itself as an editable install.
Because pip
does not support global installations, you will need to install dg
inside your Dagster project virtual environment. To do so, follow the commands below to create and enter a Dagster project directory, initialize and activate a virtual environment, and install the dagster-dg
package into it:
mkdir jaffle-platform && cd jaffle-platform
python -m venv .venv
source .venv/bin/activate
pip install dagster-dg
Next, run dg init .
to create a new Dagster project in the current directory:
dg init .
Finally, install the newly created project package into the virtual environment as an editable install:
pip install -e .
To learn more about the files, directories, and default settings in a project created with dg init
, see "Creating a project with components".
Ingest data
1. Add the Sling component type to your environment
To ingest data, you will need to set up Sling. To make the Sling component available in your environment, install the dagster-sling
package:
- uv
- pip
uv add dagster-sling
pip install dagster-sling
2. Confirm availability of the Sling component type
To confirm that the dagster_sling.SlingReplicationCollectionComponent
component type is now available, run the dg list plugins
command:
dg list plugins
Using /.../jaffle-platform/.venv/bin/dagster-components
┏━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Plugin ┃ Objects ┃
┡━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ dagster │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┓ │
│ │ ┃ Symbol ┃ Summary ┃ Features ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━┩ │
│ │ │ dagster.asset │ Create a │ [scaffold-targ… │ │
│ │ │ │ definition for │ │ │
│ │ │ │ how to compute │ │ │
│ │ │ │ an asset. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────────┼─────────────────┤ │
│ │ │ dagster.asset_check │ Create a │ [scaffold-targ… │ │
│ │ │ │ definition for │ │ │
│ │ │ │ how to execute │ │ │
│ │ │ │ an asset check. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────────┼─────────────────┤ │
│ │ │ dagster.components.DefinitionsComponent │ An arbitrary set │ [component, │ │
│ │ │ │ of dagster │ scaffold-targe… │ │
│ │ │ │ definitions. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────────┼─────────────────┤ │
│ │ │ dagster.components.DefsFolderComponent │ A folder which │ [component, │ │
│ │ │ │ may contain │ scaffold-targe… │ │
│ │ │ │ multiple │ │ │
│ │ │ │ submodules, each │ │ │
│ │ │ │ which define │ │ │
│ │ │ │ components. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────────┼─────────────────┤ │
│ │ │ dagster.components.PipesSubprocessScriptCollectionComponent │ Assets that wrap │ [component, │ │
│ │ │ │ Python scripts │ scaffold-targe… │ │
│ │ │ │ executed with │ │ │
│ │ │ │ Dagster's │ │ │
│ │ │ │ PipesSubprocess… │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────────┼─────────────────┤ │
│ │ │ dagster.job │ Creates a job │ [scaffold-targ… │ │
│ │ │ │ with the │ │ │
│ │ │ │ specified │ │ │
│ │ │ │ parameters from │ │ │
│ │ │ │ the decorated │ │ │
│ │ │ │ graph/op │ │ │
│ │ │ │ invocation │ │ │
│ │ │ │ function. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────────┼─────────────────┤ │
│ │ │ dagster.multi_asset │ Create a │ [scaffold-targ… │ │
│ │ │ │ combined │ │ │
│ │ │ │ definition of │ │ │
│ │ │ │ multiple assets │ │ │
│ │ │ │ that are │ │ │
│ │ │ │ computed using │ │ │
│ │ │ │ the same op and │ │ │
│ │ │ │ same │ │ │
│ │ │ │ upstream assets. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────────┼─────────────────┤ │
│ │ │ dagster.schedule │ Creates a │ [scaffold-targ… │ │
│ │ │ │ schedule │ │ │
│ │ │ │ following the │ │ │
│ │ │ │ provided cron │ │ │
│ │ │ │ schedule and │ │ │
│ │ │ │ requests runs │ │ │
│ │ │ │ for the provided │ │ │
│ │ │ │ job. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────────┼─────────────────┤ │
│ │ │ dagster.sensor │ Creates a sensor │ [scaffold-targ… │ │
│ │ │ │ where the │ │ │
│ │ │ │ decorated │ │ │
│ │ │ │ function is used │ │ │
│ │ │ │ as the sensor's │ │ │
│ │ │ │ evaluation │ │ │
│ │ │ │ function. │ │ │
│ │ └─────────────────────────────────────────────────────────────┴──────────────────┴─────────────────┘ │
│ dagster_sling │ ┏━━━━━━━━━━ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┓ │
│ │ ┃ Symbol ┃ Summary ┃ Features ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━┩ │
│ │ │ dagster_sling.SlingReplicationCollectionComponent │ Expose one or more │ [component, │ │
│ │ │ │ Sling replications │ scaffold-target] │ │
│ │ │ │ to Dagster as │ │ │
│ │ │ │ assets. │ │ │
│ │ └───────────────────────────────────── ──────────────┴──────────────────────┴───────────────────────┘ │
└───────────────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
3. Create a new instance of the Sling component
Next, create a new instance of the Sling component type:
dg scaffold 'dagster_sling.SlingReplicationCollectionComponent' ingest_files
This adds a Sling component instance called ingest_files
to the src/jaffle_platform/defs
directory of your project:
tree src/jaffle_platform
src/jaffle_platform
├── __init__.py
├── definitions.py
├── defs
│ ├── __init__.py
│ └── ingest_files
│ ├── component.yaml
│ └── replication.yaml
└── lib
└── __init__.py
4 directories, 6 files
A single file, component.yaml
, was created in the ingest_files
directory. Every Dagster component has a component.yaml
file that specifies the component type and any parameters used to scaffold definitions from the component at runtime:
type: dagster_sling.SlingReplicationCollectionComponent
attributes:
replications:
- path: replication.yaml
Currently, the parameters in your Sling component component.yaml
define a single replication
, which is a Sling term that specifies how data should be replicated from a source to a target. The replication details are specified in a replication.yaml
file that is read by Sling. You will create this file shortly.
The path
parameter for a replication is relative to the directory that contains component.yaml
. This is a convention for components.
4. Download files for Sling source
Next, you will need to download some files locally to use your Sling source, since Sling doesn't support reading from the public internet:
curl -O https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_customers.csv &&
curl -O https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_orders.csv &&
curl -O https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_payments.csv
5. Set up the Sling to DuckDB replication
Once you have downloaded your Sling source files, update the replication.yaml
file to reference them:
source: LOCAL
target: DUCKDB
defaults:
mode: full-refresh
object: "{stream_table}"
streams:
file://raw_customers.csv:
object: "main.raw_customers"
file://raw_orders.csv:
object: "main.raw_orders"
file://raw_payments.csv:
object: "main.raw_payments"
Next, modify the component.yaml
file to tell the Sling component where replicated data with the DUCKDB
target should be written:
type: dagster_sling.SlingReplicationCollectionComponent
attributes:
sling:
connections:
- name: DUCKDB
type: duckdb
instance: /tmp/jaffle_platform.duckdb
replications:
- path: replication.yaml
6. View and materialize assets in the Dagster UI
To see what you've built so far, you can load your project in the Dagster UI:
dg dev
To materialize assets and load tables in the DuckDB instance, click Materialize All:
7. Verify the DuckDB tables
To verify the DuckDB tables were correctly populated, run the following command:
duckdb /tmp/jaffle_platform.duckdb -c "SELECT * FROM raw_customers LIMIT 5;"
┌───────┬────────────┬───────────┬──────────────────┐
│ id │ first_name │ last_name │ _sling_loaded_at │
│ int32 │ varchar │ varchar │ int32 │
├───────┼────────────┼───────────┼──────────────────┤