DagsterDocs

Source code for dagster.core.definitions.decorators.composite_solid

from functools import update_wrapper
from typing import Any, Callable, Dict, List, Optional, Union

from dagster import check

from ..composition import do_composition
from ..input import InputDefinition
from ..output import OutputDefinition
from ..solid import CompositeSolidDefinition


class _CompositeSolid:
    def __init__(
        self,
        name: Optional[str] = None,
        input_defs: Optional[List[InputDefinition]] = None,
        output_defs: Optional[List[OutputDefinition]] = None,
        description: Optional[str] = None,
        config_schema: Any = None,
        config_fn: Optional[Callable[[dict], dict]] = None,
    ):
        self.name = check.opt_str_param(name, "name")
        self.input_defs = check.opt_list_param(input_defs, "input_defs", InputDefinition)
        self.output_defs = check.opt_nullable_list_param(output_defs, "output", OutputDefinition)
        self.description = check.opt_str_param(description, "description")

        self.config_schema = config_schema  # gets validated in do_composition
        self.config_fn = check.opt_callable_param(config_fn, "config_fn")

    def __call__(self, fn: Callable[..., Any]):
        check.callable_param(fn, "fn")

        if not self.name:
            self.name = fn.__name__

        (
            input_mappings,
            output_mappings,
            dependencies,
            solid_defs,
            config_mapping,
            positional_inputs,
        ) = do_composition(
            "@composite_solid",
            self.name,
            fn,
            self.input_defs,
            self.output_defs,
            self.config_schema,
            self.config_fn,
            ignore_output_from_composition_fn=False,
        )

        composite_def = CompositeSolidDefinition(
            name=self.name,
            input_mappings=input_mappings,
            output_mappings=output_mappings,
            dependencies=dependencies,
            solid_defs=solid_defs,
            description=self.description,
            config_mapping=config_mapping,
            positional_inputs=positional_inputs,
        )
        update_wrapper(composite_def, fn)
        return composite_def


[docs]def composite_solid( name: Union[Optional[str], Callable[..., Any]] = None, input_defs: Optional[List[InputDefinition]] = None, output_defs: Optional[List[OutputDefinition]] = None, description: Optional[str] = None, config_schema: Optional[Dict[str, Any]] = None, config_fn: Optional[Callable[[dict], dict]] = None, ) -> _CompositeSolid: """Create a composite solid with the specified parameters from the decorated composition function. Using this decorator allows you to build up the dependency graph of the composite by writing a function that invokes solids and passes the output to other solids. This is similar to the use of the :py:func:`@pipeline <pipeline>` decorator, with the additional ability to remap inputs, outputs, and config across the composite boundary. Args: name (Optional[str]): Name for the new composite solid. Must be unique within any :py:class:`PipelineDefinition` using the solid. description (Optional[str]): Human-readable description of the new composite solid. input_defs (Optional[List[InputDefinition]]): Information about the inputs that this composite solid maps. Information provided here will be combined with what can be inferred from the function signature, with these explicit InputDefinitions taking precedence. Uses of inputs in the body of the decorated composition function will determine the :py:class:`InputMappings <InputMapping>` passed to the underlying :py:class:`CompositeSolidDefinition`. output_defs (Optional[List[OutputDefinition]]): Information about the outputs this composite solid maps. Information provided here will be combined with what can be inferred from the return type signature if there is only one OutputDefinition. Uses of these outputs in the body of the decorated composition function, as well as the return value of the decorated function, will be used to infer the appropriate set of :py:class:`OutputMappings <OutputMapping>` for the underlying :py:class:`CompositeSolidDefinition`. To map multiple outputs, return a dictionary from the composition function. config_schema (Optional[ConfigSchema]): If the `config_fn` argument is provided, this argument can be provided to set the schema for outer config that is passed to the `config_fn`. If `config_fn` is provided, but this argument is not provided, any config will be accepted. config_fn (Callable[[dict], dict]): By specifying a config mapping function, you can override the configuration for the child solids contained within this composite solid. ``config_fn``, maps the config provided to the composite solid to the config that will be provided to the child solids. If this argument is provided, the `config_schema` argument can also be provided to limit what config values can be passed to the composite solid. Examples: .. code-block:: python @lambda_solid def add_one(num: int) -> int: return num + 1 @composite_solid def add_two(num: int) -> int: adder_1 = add_one.alias('adder_1') adder_2 = add_one.alias('adder_2') return adder_2(adder_1(num)) """ if callable(name): check.invariant(input_defs is None) check.invariant(output_defs is None) check.invariant(description is None) check.invariant(config_schema is None) check.invariant(config_fn is None) return _CompositeSolid()(name) return _CompositeSolid( name=name, input_defs=input_defs, output_defs=output_defs, description=description, config_schema=config_schema, config_fn=config_fn, )