DagsterDocs

Unconnected Inputs#

Relevant APIs#

NameDescription
@root_input_managerThe decorator used to define a Root Input Manager.
RootInputManagerThe base class to define how to load inputs to solids at the root of a pipeline.
@dagster_type_loaderThe decorator used to define a Dagster Type Loader.
DagsterTypeLoaderThe base class used to specify how to load inputs that depends on the type.

Overview#

Solids in a pipeline may have input definitions that don't correspond to the outputs of upstream solids. You can provide values for these inputs in a few different ways. Dagster checks each, in order, and uses the first that's available:

  • Root Input Managers (experimental) - If the input to a pipeline comes from an external source, such as a table in a database, often it makes sense to define a resource that's responsible for loading it. This makes it easy to swap out implementations in different modes and mock it in tests. RootInputManagers, which can be referenced from InputDefinitions, are resources that load unconnected inputs.
  • Dagster Type Loaders - DagsterTypeLoaders provide a way to specify how to load inputs that depends on the type, but not on the mode. DagsterTypeLoaders can be placed on DagsterTypes, which can be placed on InputDefinitions.
  • Default Values - InputDefinition accepts a default_value argument.

Examples#

Loading a built-in dagster type from config#

When you have a solid at the beginning of a pipeline that operates on a built-in dagster type like string or int, you can provide a value for that input via run config.

Here's a basic pipeline with an unconnected string input:

@solid(input_defs=[InputDefinition("input_string", String)])
def my_solid(context, input_string):
    context.log.info(f"input string: {input_string}")


@pipeline
def my_pipeline():
    my_solid()

The String dagster type has a dagster type loader that allows it to load inputs from config:

execute_pipeline(
        my_pipeline,
        run_config={"solids": {"my_solid": {"inputs": {"input_string": {"value": "marmot"}}}}},
    )

Loading a custom dagster type from config#

When you have a solid at the beginning of your pipeline that operates on a dagster type that you've defined, you can write your own DagsterTypeLoader to define how to load that input via run config.

@dagster_type_loader(config_schema={"diameter": float, "juiciness": float, "cultivar": str})
def apple_loader(_context, config):
    return Apple(
        diameter=config["diameter"], juiciness=config["juiciness"], cultivar=config["cultivar"]
    )


@usable_as_dagster_type(loader=apple_loader)
class Apple:
    def __init__(self, diameter, juiciness, cultivar):
        self.diameter = diameter
        self.juiciness = juiciness
        self.cultivar = cultivar


@solid(input_defs=[InputDefinition("input_apple", Apple)])
def my_solid(context, input_apple):
    context.log.info(f"input apple diameter: {input_apple.diameter}")


@pipeline
def my_pipeline():
    my_solid()

With this, the input can be specified via config as below:

execute_pipeline(
        my_pipeline,
        run_config={
            "solids": {
                "my_solid": {
                    "inputs": {
                        "input_apple": {"diameter": 2.4, "juiciness": 6.0, "cultivar": "honeycrisp"}
                    }
                }
            }
        },
    )

Providing an input manager for a root input
Experimental
#

When you have a solid at the beginning of a pipeline that operates on data from external source, you might wish to separate that I/O from your solid's business logic, in the same way you would with an IO manager if the solid were loading from an upstream output.

To accomplish this, you can define an RootInputManager.

@solid(input_defs=[InputDefinition("dataframe", root_manager_key="my_root_manager")])
def my_solid(_, dataframe):
    """Do some stuff"""


@root_input_manager
def table1_loader(_):
    return read_dataframe_from_table(name="table1")


@pipeline(mode_defs=[ModeDefinition(resource_defs={"my_root_manager": table1_loader})])
def my_pipeline():
    my_solid()

Setting the root_manager_key on an InputDefinition controls how that input is loaded in pipelines where it has no upstream output.

The root_input_manager decorator behaves nearly identically to the resource decorator. It yields an RootInputManagerDefinition, which is a ResourceDefinition that will produce an RootInputManager.

Providing per-input config to a root input manager
Experimental
#

When launching a run, you might want to parameterize how particular root inputs are loaded.

To accomplish this, you can define an input_config_schema on the input manager definition. The load function can access this config when storing or loading data, via the InputContext.

@root_input_manager(input_config_schema={"table_name": str})
def table_loader(context):
    return read_dataframe_from_table(name=context.config["table_name"])

Then, when executing a pipeline, you can pass in this per-input config.

@pipeline(mode_defs=[ModeDefinition(resource_defs={"my_root_manager": table_loader})])
    def my_pipeline():
        my_solid()

    execute_pipeline(
        my_pipeline,
        run_config={"solids": {"my_solid": {"inputs": {"dataframe": {"table_name": "table1"}}}}},
    )

Using a root input manager with subselection
Experimental
#

You might want to execute a subset of solids in your pipeline and control how the inputs of those solids are loaded. Root input managers also help in these situations, because the inputs at the beginning of the subset become the new "roots".

For example, you might have solid1 that normally produces a table that solid2 consumes. To debug solid2, you might want to run it on a different table than the one normally produced by solid1.

To accomplish this, you can set up the root_manager_key on solid2's InputDefinition to point to an input manager with the desired loading behavior. As before, setting the root_manager_key on an InputDefinition controls how that input is loaded when it has no upstream output.

@root_input_manager(input_config_schema={"table_name": str})
def my_root_input_manager(context):
    return read_dataframe_from_table(name=context.config["table_name"])


class MyIOManager(IOManager):
    def handle_output(self, context, obj):
        table_name = context.name
        write_dataframe_to_table(name=table_name, dataframe=obj)

    def load_input(self, context):
        return read_dataframe_from_table(name=context.upstream_output.name)


@io_manager
def my_io_manager(_):
    return MyIOManager()


@solid(output_defs=[OutputDefinition(io_manager_key="my_io_manager")])
def solid1(_):
    """Do stuff"""


@solid(input_defs=[InputDefinition("dataframe", root_manager_key="my_root_input_manager")])
def solid2(_, dataframe):
    """Do stuff"""


@pipeline(
    mode_defs=[
        ModeDefinition(
            resource_defs={
                "my_io_manager": my_io_manager,
                "my_root_input_manager": my_root_input_manager,
            }
        )
    ]
)
def my_pipeline():
    solid2(solid1())

When running the full pipeline, solid2's input will be loaded using the IO manager on the output of solid1. When running the pipeline subset, solid2's input has no upstream output, so the input manager corresponding to its root_manager_key is used.

execute_pipeline(
        my_pipeline,
        solid_selection=["solid2"],
        run_config={"solids": {"solid2": {"inputs": {"dataframe": {"table_name": "tableX"}}}}},
    )