Pipeline run configuration allows providing parameters to pipelines at the time they're executed.
Name | Description |
---|---|
ConfigSchema | See details with code examples in the API documentation. |
It's often useful to configure pipelines at run time. For example, you might want someone to manually operate a deployed pipeline and choose what dataset it operates on when they run it. In general, you should use Dagster's config system when you want the person or software that is executing a pipeline to be able to make choices about what the pipeline does, without needing to modify the pipeline definition.
The objects that compose a pipeline - solids and resources - are each individually configurable. When executing a pipeline, you can supply "run configuration" that specifies the configuration for each of the objects in the pipeline. When you execute a pipeline with the Python API, you supply run configuration as a Python dictionary. When you execute a pipeline from Dagit or the CLI, you can provide config in a YAML document.
A common use of configuration is for a schedule or sensor to provide configuration to the pipeline run it is launching. For example, a daily schedule might provide the day it's running on to one of the solids as a config value, and that solid might use that config value to decide what day's data to read.
Dagster includes a system for gradually-typed configuration schemas. These make it easy to catch configuration errors before pipeline execution, as well as to learn what configuration is required to execute a pipeline.
This example shows how to write a solid whose behavior is based on values that are passed in via configuration:
from dagster import pipeline, solid
@solid
def config_example_solid(context):
for _ in range(context.solid_config["iterations"]):
context.log.info("hello")
@pipeline
def config_example_pipeline():
config_example_solid()
How you specify config values depends on how you're running your pipeline:
When executing a pipeline with execute_pipeline
, you can specify the config values through run_config
argument:
from dagster import execute_pipeline
execute_pipeline(
config_example_pipeline,
run_config={"solids": {"config_example_solid": {"config": {"iterations": 1}}}},
)
When executing a pipeline from the command line, the easiest way to provide config is to put it into a YAML file, like:
solids:
config_example_solid:
config:
iterations: 1
When you invoke dagster pipeline execute, you can point to that YAML file using the --config
option:
dagster pipeline execute --config my_config.yaml
When executing a pipeline from Dagit's Playground, you can supply config as YAML using the config editor:
Dagster includes a system for gradually-typed configuration schemas. For example, you can specify that a particular solid accepts configuration for a particular set of keys, and that values provided for a particular key must be integers. Before executing a pipeline, Dagster will compare the provided run configuration to the config schema for the objects in the pipeline and fail early if they don't match.
Configuration schema helps:
The full range of config types and ways to specify config schema are documented in the API Reference with examples.
The most common objects to specify ConfigSchema
for are SolidDefinition
and ResourceDefinition
(see example code in Configuring a Resource).
Here's an example of a solid that defines a config schema:
from dagster import pipeline, solid
@solid(config_schema={"iterations": int})
def configurable_solid_with_schema(context):
for _ in range(context.solid_config["iterations"]):
context.log.info(context.solid_config["word"])
@pipeline
def configurable_pipeline_with_schema():
configurable_solid_with_schema()
Dagster validates the run_config
against the config_schema
. If the values violate the schema, it will fail at execution time. For example, the following will raise a DagsterInvalidConfigError
:
from dagster import execute_pipeline
execute_pipeline(
configurable_pipeline_with_schema,
run_config={
"solids": {
"configurable_solid_with_schema": {"config": {"nonexistent_config_value": 1}}
}
},
)
The config editor in Dagit the page comes with typeaheads, schema validation, and schema documentation. You can also click the "Scaffold Missing Config" button to generate dummy values based on the config schema.
You can also configure a ResourceDefinition
:
@resource(config_schema={"region": str, "use_unsigned_session": bool})
def s3_session(_init_context):
"""Connect to S3"""
And specify the configurations at runtime via a run config like:
resources:
key:
config:
region: us-east-1
use_unsigned_session: False
If you want to make a single config value available to multiple solids, you can house the configuration inside a resource and reference that resource from any solid that wants to use it.
from dagster import ModeDefinition, execute_pipeline, pipeline, resource, solid
@resource(config_schema=str)
def my_str_resource(init_context):
return init_context.resource_config
@solid(required_resource_keys={"my_str"})
def solid1(context):
context.log.info("solid1: " + context.resources.my_str)
@solid(required_resource_keys={"my_str"})
def solid2(context):
context.log.info("solid2: " + context.resources.my_str)
@pipeline(mode_defs=[ModeDefinition(resource_defs={"my_str": my_str_resource})])
def my_pipeline():
solid1()
solid2()
execute_pipeline(my_pipeline, run_config={"resources": {"my_str": "some_value"}})