Software-defined assets provide a declarative way to define what assets should exist and how to create them. But when it's not known what asset an op is going to materialize until the op runs, you can still create assets using AssetMaterialization events.
"Asset" is Dagster's word for an entity, external to ops, that is mutated or created by an op. An asset might be a table in a database that an op appends to, an ML model in a model store that an op overwrites, or even a slack channel that an op writes messages to.
Op outputs often correspond to assets. For example, an op might be responsible for recreating a table, and one of its outputs might be a dataframe containing the contents of that table.
Assets can also have partitions, which refer to slices of the overall asset. The simplest example would be a table that has a partition for each day. A given op execution may simply write a single day's worth of data to that table, rather than dropping the entire table and replacing it with new data.
Dagster lets you track the interactions between ops, outputs, and assets over time and view them in the Dagit Asset Catalog. Every asset has a "key", which serves as a unique identifier for that particular entity. The act of creating or updating the contents of an asset is called a "materialization", and Dagster tracks these materializations using AssetMaterialization events. These events can either be logged by the user at runtime, or automatically created by Dagster in cases where an AssetKey has been referenced by an op output.
There are two general patterns for dealing with assets when using Dagster:
Put the logic to write/store assets inside the body of an op.
Focus the op purely on business logic, and delegate the logic to write/store assets to an IO manager.
Regardless of which pattern you are using, AssetMaterialization events are used to communicate to Dagster that a materialization has occurred. You can create these events either by explicitly logging them at runtime, or (using an experimental interface), have Dagster automatically generate them by defining that a given op output corresponds to a given AssetKey.
One way of recording materialization events is to log AssetMaterialization events at runtime. These events should be co-located with your materialization logic, meaning if you store your object within your op body, then you should log from within that op, and if you store your object using an IOManager, then you should log the event from your manager.
To make Dagster aware that we materialized an asset in our op, we can log an AssetMaterialization event using the method OpExecutionContext.log_event. This would involve changing the following op:
from dagster import op
@opdefmy_simple_op():
df = read_df()
remote_storage_path = persist_to_storage(df)return remote_storage_path
into something like this:
from dagster import AssetMaterialization, op
@opdefmy_materialization_op(context):
df = read_df()
remote_storage_path = persist_to_storage(df)
context.log_event(
AssetMaterialization(
asset_key="my_dataset", description="Persisted result to storage"))return remote_storage_path
We should now see a materialization event in the event log when we execute a job with this op.
Logging an AssetMaterialization from an IO Manager#
There are a variety of types of metadata that can be associated with a materialization event, all through the MetadataEntry class. Each materialization event optionally takes a list of metadata entries that are then displayed in the event log and the Asset Catalog.
from dagster import AssetMaterialization, MetadataValue, op
@opdefmy_metadata_materialization_op(context):
df = read_df()
remote_storage_path = persist_to_storage(df)
context.log_event(
AssetMaterialization(
asset_key="my_dataset",
description="Persisted result to storage",
metadata={"text_metadata":"Text-based metadata for this event","path": MetadataValue.path(remote_storage_path),"dashboard_url": MetadataValue.url("http://mycoolsite.com/url_for_my_data"),"size (bytes)": calculate_bytes(df),},))return remote_storage_path
from dagster import AssetMaterialization, IOManager
classPandasCsvIOManagerWithAsset(IOManager):defload_input(self, context):
file_path = os.path.join("my_base_dir", context.step_key, context.name)return read_csv(file_path)defhandle_output(self, context, obj):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
obj.to_csv(file_path)
context.log_event(
AssetMaterialization(
asset_key=AssetKey(file_path),
description="Persisted result to storage.",
metadata={"number of rows": obj.shape[0],"some_column mean": obj["some_column"].mean(),},))
Check our API docs for MetadataEntry for more details on they types of event metadata available.
Specifying a partition for an AssetMaterialization#
If you are materializing a single slice of an asset (e.g. a single day's worth of data on a larger table), rather than mutating or creating it entirely, you can indicate this to Dagster by including the partition argument on the object.
from dagster import AssetMaterialization, op
@op(config_schema={"date":str})defmy_partitioned_asset_op(context):
partition_date = context.op_config["date"]
df = read_df_for_date(partition_date)
remote_storage_path = persist_to_storage(df)
context.log_event(
AssetMaterialization(asset_key="my_dataset", partition=partition_date))return remote_storage_path
Software-defined assets vs. Asset Materializations#
When working with software-defined assets, the assets and their dependencies must be known at definition time. When you look at software-defined assets in Dagit, you can see exactly what assets are going to be materialized before any code runs.
Asset Materializations, on the other hand, are logged at run time. When you run an op, you find out which assets were materialized while the op is running. This allows for some flexibility, like if you wanted to determine which assets should be materialized based on the output of a previous op.