Build pipelines with Azure Machine Learning
This article covers how to use Dagster Pipes to submit jobs to Azure Machine Learning.
The dagster-azure integration library provides the PipesAzureMLClient resource, which can be used to launch Azure ML jobs from Dagster assets and ops. Dagster can receive events such as logs, asset checks, or asset materializations from jobs launched with this client. The client requires minimal code changes to your Azure ML jobs.
Prerequisites
To run the examples, you'll need to:
- Create a new Dagster project:
uvx create-dagster@latest project <project-name> - Install the necessary Python libraries:
- uv
- pip
Install the required dependencies:
uv add dagster-azure
Install the required dependencies:
pip install dagster-azure
- In Azure, you'll need:
- An existing Azure ML workspace
- An Azure Blob Data Storage Container to be used by Dagster. The recommended way to work with
dagster-pipesand Azure ML is to use Azure Blob Data Storage to communicate between the Dagster orchestrator and the Azure ML job.
Step 1: Create an Azure ML environment for Dagster Pipes
Your Azure ML job will require an Azure ML environment that contains the dagster-pipes library. Since the Azure ML job will be communicating with the Dagster orchestrator via Azure Blob Storage, we will also need to install the azure-identity and azure-storage-blob Python libraries.
In your Azure ML dashboard, choose "Add a Custom Environment", select the environment source you want to use (e.g. sklearn-1.5:33), and edit the list of Python packages to install.
For instance, if specifying Python dependencies using a conda.yaml file, include the following lines:
dependencies:
- python=3.10
- pip:
- dagster-pipes
- azure-storage-blob
- azure-identity
Step 2: Add dagster-pipes to the Azure ML job script
Call open_dagster_pipes in your Azure ML script to create a context that can be used to send messages to Dagster:
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
from dagster_pipes import (
PipesAzureBlobStorageContextLoader,
PipesAzureBlobStorageMessageWriter,
PipesContext,
open_dagster_pipes,
)
AZURE_STORAGE_ACCOUNT = "<AZURE-STORAGE-ACCOUNT>"
# Initialize Azure Blob Storage client
blob_client = BlobServiceClient(
account_url=f"https://{AZURE_STORAGE_ACCOUNT}.blob.core.windows.net",
credential=DefaultAzureCredential(),
)
# Set up Pipes communication via Azure Blob Storage
context_loader = PipesAzureBlobStorageContextLoader(client=blob_client)
message_writer = PipesAzureBlobStorageMessageWriter(client=blob_client)
with open_dagster_pipes(
context_loader=context_loader, message_writer=message_writer
) as context:
# Access Dagster context
context.log.info("Running Azure ML job")
# Your ML Training code here
# ...
# ...
# ...
result = {"accuracy": "0.83"}
# Report materialization back to Dagster
context.report_asset_materialization(
metadata={"accuracy": {"raw_value": result["accuracy"], "type": "float"}}
)
Make sure that the identity that is configured for running Azure ML jobs has access to Dagster's Azure Blob Storage account.
The metadata format shown above ({"raw_value": value, "type": type}) is part of Dagster Pipes' special syntax for specifying rich Dagster metadata. For a complete reference of all supported metadata types and their formats, see the Dagster Pipes metadata reference.
Step 3: Create an asset using the PipesAzureMLClient to launch the job
You can scaffold assets from the command line by running dg scaffold defs dagster.asset <path/to/asset_file.py>. For more information, see the dg CLI docs.
In the Dagster asset/op code, use the PipesAzureMLClient resource to launch the job:
from azure.ai.ml import command
from dagster_azure.pipes import (
PipesAzureBlobStorageContextInjector,
PipesAzureBlobStorageMessageReader,
PipesAzureMLClient,
)
import dagster as dg
@dg.asset
def azureml_training_job(
context: dg.AssetExecutionContext,
pipes_azureml: PipesAzureMLClient,
):
return pipes_azureml.run(
context=context,
command=command(
code="./src", # Path to your source code
command="python train.py",
environment="dagster-env:1",
compute="my-compute-cluster",
display_name="ml-training-job",
),
).get_results()
This will launch the Azure ML job and wait for it to complete. If the job fails, the Dagster process will raise an exception. If the Dagster process is interrupted while the job is still running, the job will be canceled, provided that forward_termination=True is set in the client.
Step 4: Create Dagster definitions
Next, add the PipesAzureMLClient resource to your project's Definitions object:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
import dagster as dg
@dg.definitions
def resources() -> dg.Definitions:
azure_blob_storage = BlobServiceClient(
account_url="https://<DAGSTER-STORAGE-ACCOUNT>.blob.core.windows.net/",
credential=DefaultAzureCredential(),
)
azure_ml = MLClient(
credential=DefaultAzureCredential(),
subscription_id="<SUBSCRIPTION-ID>",
resource_group_name="<RESOURCE-GROUP-NAME>",
workspace_name="<WORKSPACE-NAME>",
)
return dg.Definitions(
resources={
"pipes_azureml": PipesAzureMLClient(
client=azure_ml,
context_injector=PipesAzureBlobStorageContextInjector(
container="dagster", client=azure_blob_storage
),
message_reader=PipesAzureBlobStorageMessageReader(
container="dagster", client=azure_blob_storage
),
),
},
)
Dagster will now be able to launch the Azure ML job from the azureml_training_job asset, and receive logs and events from the job.