Source code for dagster.core.storage.init
from collections import namedtuple
from dagster import check
from dagster.core.definitions import (
IntermediateStorageDefinition,
ModeDefinition,
PipelineDefinition,
)
from dagster.core.instance import DagsterInstance
from dagster.core.storage.pipeline_run import PipelineRun
from dagster.core.storage.type_storage import TypeStoragePluginRegistry
from dagster.core.system_config.objects import EnvironmentConfig
[docs]class InitIntermediateStorageContext(
namedtuple(
"InitIntermediateStorageContext",
(
"pipeline_def mode_def intermediate_storage_def pipeline_run instance environment_config "
"type_storage_plugin_registry resources intermediate_storage_config"
),
)
):
"""Intermediate storage-specific initialization context.
Attributes:
pipeline_def (PipelineDefinition): The definition of the pipeline in context.
mode_def (ModeDefinition): The definition of the mode in context.
intermediate_storage_def (IntermediateStorageDefinition): The definition of the intermediate storage to be
constructed.
pipeline_run (PipelineRun): The pipeline run in context.
instance (DagsterInstance): The instance.
environment_config (EnvironmentConfig): The environment config.
type_storage_plugin_registry (TypeStoragePluginRegistry): Registry containing custom type
storage plugins.
resources (Any): Resources available in context.
intermediate_storage_config (Dict[str, Any]): The intermediate storage-specific configuration data
provided by the environment config. The schema for this data is defined by the
``config_schema`` argument to :py:class:`IntermediateStorageDefinition`.
"""
def __new__(
cls,
pipeline_def,
mode_def,
intermediate_storage_def,
pipeline_run,
instance,
environment_config,
type_storage_plugin_registry,
resources,
intermediate_storage_config,
):
return super(InitIntermediateStorageContext, cls).__new__(
cls,
pipeline_def=check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition),
mode_def=check.inst_param(mode_def, "mode_def", ModeDefinition),
intermediate_storage_def=check.inst_param(
intermediate_storage_def, "intermediate_storage_def", IntermediateStorageDefinition
),
pipeline_run=check.inst_param(pipeline_run, "pipeline_run", PipelineRun),
instance=check.inst_param(instance, "instance", DagsterInstance),
environment_config=check.inst_param(
environment_config, "environment_config", EnvironmentConfig
),
type_storage_plugin_registry=check.inst_param(
type_storage_plugin_registry,
"type_storage_plugin_registry",
TypeStoragePluginRegistry,
),
resources=check.not_none_param(resources, "resources"),
intermediate_storage_config=check.dict_param(
intermediate_storage_config, intermediate_storage_config, key_type=str
),
)