Freshness policies
Freshness policies are under active development. You may encounter feature gaps and the APIs may change.
Freshness policies help you understand which of your assets have materialized recently and which ones are running behind - a key component of asset health. Freshness policies also communicate expectations for data freshness, allowing downstream asset consumers to determine how often assets are expected to be updated.
For example, freshness policies can help identify stale assets caused by:
- Misconfigured
AutomationConditions
- Runs not being scheduled due to an upstream failure
- Runs taking longer than expected to complete
Enabling freshness policies
Freshness policies are not enabled by default while in preview. To use them in open source and local development, add the following to your dagster.yaml
:
freshness:
enabled: True
To use freshness policies in Dagster+, sign up for the Observability update early access program.
Relationship to existing FreshnessPolicy
There is an existing FreshnessPolicy
API that has been deprecated since version 1.6. We're opting to reuse the name for the new freshness APIs, and have renamed the deprecated functionality to LegacyFreshnessPolicy
. To continue using the deprecated functionality, follow the instructions in the 1.11 migration guide.
Relationship to freshness checks
Freshness policies are not yet recommended for production use, but are intended to supersede freshness checks in a future release. Freshness checks are still and will continue to be supported for the foreseeable future. A migration guide will be provided.
Freshness policy types
Currently, we support time window-based freshness policies, which are suitable for most use cases. We plan to add more policy types in the future.
Time window
A time window freshness policy is useful when you expect an asset to have new data or be recalculated with some frequency. An asset that does not meet this condition will be considered failing its freshness policy. You can set an optional warning window on a freshness policy; if the asset does not successfully materialize within this window, it will enter a warning
freshness state.
For example, the policy below states that there should be a successful materialization of the asset at least every 24 hours for it to be considered fresh, with a warning window of 12 hours:
from datetime import timedelta
from dagster.preview.freshness import FreshnessPolicy
# Create a freshness policy that requires a materialization at least once every 24 hours,
# and warns if the latest materialization is older than 12 hours.
policy = FreshnessPolicy.time_window(
fail_window=timedelta(hours=24), warn_window=timedelta(hours=12)
)
fail_window
andwarn_window
cannot be shorter than 60 seconds.warn_window
must be less thanfail_window
.
Cron
A cron freshness policy is useful when you expect an asset to have new data or be recalculated on a known schedule.
The policy defines a cron schedule deadline_cron
that denotes the deadline for the asset materialization.
To account for the time it takes to materialize the asset, a lower_bound_delta
time delta is also specified,
which denotes an amount of time prior to each cron tick.
Together, deadline_cron
and lower_bound_delta
define a recurring time window in which the asset is expected to materialize.
The asset is fresh if it materializes in this time window, and will remain fresh until at least the next deadline. If the asset has not materialized in the window after the deadline passes, it will fail freshness until it materializes again.
Example:
from datetime import timedelta
from dagster import asset
from dagster.preview.freshness import FreshnessPolicy
@asset(
freshness_policy=FreshnessPolicy.cron(
deadline_cron="0 10 * * *",
lower_bound_delta=timedelta(hours=1),
timezone="America/Los_Angeles",
)
)
def daily_asset():
"""Expected to materialize every day between 9:00am and 10:00am Pacific Time.
If the asset materializes between 9am and 10am, it is fresh, and will remain fresh until at least the next deadline (10am the next day).
If the asset has not materialized by 10am, it fails the freshness policy. It will remain in the fail state until it materializes again.
Once it materializes, it will become fresh and remain fresh until at least the next deadline (10am the next day).
"""
pass
deadline_cron
must be a valid cron string and has a minimum resolution of 1 minute.lower_bound_delta
cannot be shorter than 1 minute, and must fit within the smallest interval ofdeadline_cron
. Example: fordeadline_cron="0 10 * * 1-5"
(weekdays at 10am),lower_bound_delta
must be between 1 minute and 24 hours.timezone
is optional. IANA timezones are supported. If not provided, defaults to UTC.
Setting freshness policies
On individual assets
You can configure a freshness policy directly on an asset:
from datetime import timedelta
from dagster import AssetSpec, asset
from dagster.preview.freshness import FreshnessPolicy
policy = FreshnessPolicy.time_window(fail_window=timedelta(hours=24))
@asset(freshness_policy=policy)
def my_asset():
pass
# Or on an asset spec
spec = AssetSpec("my_asset", freshness_policy=policy)
Across multiple assets
To apply freshness policies to multiple or all assets in your deployment, you can use map_asset_specs
.
Use map_resolved_asset_specs
to apply a policy to an asset selection.
from datetime import timedelta
from dagster import Definitions, asset
from dagster.preview.freshness import FreshnessPolicy, apply_freshness_policy
@asset
def parent_asset():
pass
@asset(deps=[parent_asset])
def child_asset():
pass
@asset
def asset_2():
pass
policy = FreshnessPolicy.time_window(fail_window=timedelta(hours=24))
defs = Definitions(assets=[parent_asset, child_asset, asset_2])
# Apply the policy to multiple assets - in this case, all assets in defs
defs = defs.map_asset_specs(func=lambda spec: apply_freshness_policy(spec, policy))
# Use map_resolved_asset_specs to apply the policy to a selection
defs = defs.map_resolved_asset_specs(
func=lambda spec: apply_freshness_policy(spec, policy),
selection='key:"parent_asset"+', # will apply policy to parent_asset and its downstream dependencies
)
You can also use map_asset_specs
directly on the asset specs before creating a Definitions
object:
from datetime import timedelta
from dagster import asset, map_asset_specs
from dagster._core.definitions.definitions_class import Definitions
from dagster.preview.freshness import FreshnessPolicy, apply_freshness_policy
@asset
def asset_1():
pass
@asset
def asset_2():
pass
policy = FreshnessPolicy.time_window(fail_window=timedelta(hours=24))
assets = [asset_1, asset_2]
assets_with_policies = map_asset_specs(
func=lambda spec: apply_freshness_policy(spec, policy), iterable=assets
)
defs = Definitions(assets=assets_with_policies)
Applying a freshness policy in this way to an asset with an existing freshness policy (for example, if it was defined in the @asset
decorator) will overwrite the existing policy.
Setting a default freshness policy
Often, it's useful to set a default freshness policy across all assets, and override the default on individual assets.
To do so, you can use map_asset_specs
with overwrite_existing
set to False
on the mapped function to avoid overwriting any pre-defined freshness policies:
from datetime import timedelta
from dagster import Definitions, asset
from dagster.preview.freshness import FreshnessPolicy, apply_freshness_policy
@asset
def default_asset():
"""This asset will have the default time window freshness policy applied to it."""
pass
@asset(
freshness_policy=FreshnessPolicy.cron(
deadline_cron="0 10 * * *",
lower_bound_delta=timedelta(hours=1),
)
)
def override_asset():
"""This asset will override the default policy with a cron freshness policy."""
pass
defs = Definitions(assets=[default_asset, override_asset])
default_policy = FreshnessPolicy.time_window(fail_window=timedelta(hours=24))
# This will apply default_policy to default_asset, but retain the cron policy on override_asset
defs = defs.map_asset_specs(
func=lambda spec: apply_freshness_policy(
spec, default_policy, overwrite_existing=False
),
)
Limitations
Freshness policies are not currently supported for source observable assets (SourceAssets
) and cacheable assets (CacheableAssetsDefinition
).
Future enhancements
- More freshness policy types, including:
- Anomaly detection-based
- Custom (user-defined) freshness
- Support for source observable assets