from collections import namedtuple
from dagster import check
from dagster.core.definitions.executor import ExecutorDefinition, default_executors
from dagster.loggers import default_loggers
from dagster.utils.merger import merge_dicts
from .logger import LoggerDefinition
from .resource import ResourceDefinition
from .utils import check_valid_name
DEFAULT_MODE_NAME = "default"
[docs]class ModeDefinition(
namedtuple(
"_ModeDefinition",
"name resource_defs loggers executor_defs description intermediate_storage_defs",
)
):
"""Define a mode in which a pipeline can operate.
A mode provides pipelines with a set of resource implementations, loggers, system storages,
and executors.
Args:
name (Optional[str]): The name of the mode. Must be unique within the
:py:class:`PipelineDefinition` to which the mode is attached. (default: "default").
resource_defs (Optional[Dict[str, ResourceDefinition]]): A dictionary of string resource
keys to their implementations. Individual solids may require resources to be present by
these keys.
logger_defs (Optional[Dict[str, LoggerDefinition]]): A dictionary of string logger
identifiers to their implementations.
executor_defs (Optional[List[ExecutorDefinition]]): The set of executors available when
executing in this mode. By default, this will be the 'in_process' and 'multiprocess'
executors (:py:data:`~dagster.default_executors`).
description (Optional[str]): A human-readable description of the mode.
intermediate_storage_defs (Optional[List[IntermediateStorageDefinition]]): The set of intermediate storage
options available when executing in this mode. By default, this will be the 'in_memory'
and 'filesystem' system storages.
"""
def __new__(
cls,
name=None,
resource_defs=None,
logger_defs=None,
executor_defs=None,
description=None,
intermediate_storage_defs=None,
):
from dagster.core.storage.system_storage import default_intermediate_storage_defs
from .intermediate_storage import IntermediateStorageDefinition
check.opt_dict_param(
resource_defs, "resource_defs", key_type=str, value_type=ResourceDefinition
)
if resource_defs and "io_manager" in resource_defs:
resource_defs_with_defaults = resource_defs
else:
from dagster.core.storage.mem_io_manager import mem_io_manager
resource_defs_with_defaults = merge_dicts(
{"io_manager": mem_io_manager}, resource_defs or {}
)
return super(ModeDefinition, cls).__new__(
cls,
name=check_valid_name(name) if name else DEFAULT_MODE_NAME,
resource_defs=resource_defs_with_defaults,
loggers=(
check.opt_dict_param(
logger_defs, "logger_defs", key_type=str, value_type=LoggerDefinition
)
or default_loggers()
),
intermediate_storage_defs=check.list_param(
intermediate_storage_defs
if intermediate_storage_defs
else default_intermediate_storage_defs,
"intermediate_storage_defs",
of_type=IntermediateStorageDefinition,
),
executor_defs=check.list_param(
executor_defs if executor_defs else default_executors,
"executor_defs",
of_type=ExecutorDefinition,
),
description=check.opt_str_param(description, "description"),
)
@property
def resource_key_set(self):
return frozenset(self.resource_defs.keys())
def get_intermediate_storage_def(self, name):
check.str_param(name, "name")
for intermediate_storage_def in self.intermediate_storage_defs:
if intermediate_storage_def.name == name:
return intermediate_storage_def
check.failed("{} storage definition not found".format(name))
@staticmethod
def from_resources(resources, name=None):
check.dict_param(resources, "resources", key_type=str)
return ModeDefinition(
name=name,
resource_defs={
resource_name: ResourceDefinition.hardcoded_resource(resource)
for resource_name, resource in resources.items()
},
)