This module contains the execution context objects that are internal to the system.
Not every property on these should be exposed to random Jane or Joe dagster user
so we have a different layer of objects that encode the explicit public API
in the user_context module
import warnings
from abc import ABC, abstractmethod, abstractproperty
from typing import TYPE_CHECKING, Any, Dict, Iterable, NamedTuple, Optional, Set, cast
from dagster import check
from dagster.core.definitions.hook import HookDefinition
from dagster.core.definitions.mode import ModeDefinition
from dagster.core.definitions.pipeline import PipelineDefinition
from dagster.core.definitions.pipeline_base import IPipeline
from dagster.core.definitions.reconstructable import ReconstructablePipeline
from dagster.core.definitions.resource import ScopedResourcesBuilder
from dagster.core.definitions.solid import SolidDefinition
from dagster.core.definitions.step_launcher import StepLauncher
from dagster.core.errors import DagsterInvariantViolationError
from dagster.core.execution.plan.outputs import StepOutputHandle
from dagster.core.execution.plan.step import ExecutionStep
from dagster.core.execution.retries import RetryMode
from dagster.core.executor.base import Executor
from dagster.core.log_manager import DagsterLogManager
from dagster.core.storage.io_manager import IOManager
from dagster.core.storage.pipeline_run import PipelineRun
from dagster.core.system_config.objects import EnvironmentConfig
from dagster.core.types.dagster_type import DagsterType
from .input import InputContext
from .output import OutputContext, get_output_context
from dagster.core.definitions.intermediate_storage import IntermediateStorageDefinition
from dagster.core.definitions.dependency import Solid, SolidHandle
from dagster.core.storage.intermediate_storage import IntermediateStorage
from dagster.core.instance import DagsterInstance
from dagster.core.execution.plan.plan import ExecutionPlan
from dagster.core.definitions.resource import Resources
class IPlanContext(ABC):
"""Context interface to represent run information that does not require access to user code.
The information available via this interface is accessible to the system throughout a run.
def plan_data(self) -> "PlanData":
raise NotImplementedError()
def for_step(self, step: ExecutionStep) -> "IStepContext":
raise NotImplementedError()
def pipeline(self) -> IPipeline:
return self.plan_data.pipeline
def pipeline_run(self) -> PipelineRun:
return self.plan_data.pipeline_run
def run_id(self) -> str:
return self.pipeline_run.run_id
def run_config(self) -> dict:
return self.pipeline_run.run_config
def pipeline_name(self) -> str:
return self.pipeline_run.pipeline_name
def instance(self) -> "DagsterInstance":
return self.plan_data.instance
def raise_on_error(self) -> bool:
return self.plan_data.raise_on_error
def retry_mode(self) -> RetryMode:
return self.plan_data.retry_mode
def execution_plan(self):
return self.plan_data.execution_plan
def output_capture(self) -> Optional[Dict[StepOutputHandle, Any]]:
raise NotImplementedError()
def log(self) -> DagsterLogManager:
raise NotImplementedError()
def logging_tags(self) -> Dict[str, str]:
return self.log.logging_tags
def has_tag(self, key: str) -> bool:
check.str_param(key, "key")
return key in self.logging_tags
def get_tag(self, key: str) -> Optional[str]:
check.str_param(key, "key")
return self.logging_tags.get(key)
class PlanData(NamedTuple):
"""The data about a run that is available during both orchestration and execution.
This object does not contain any information that requires access to user code, such as the
pipeline definition, resources, or intermediate storage.
pipeline: IPipeline
pipeline_run: PipelineRun
instance: "DagsterInstance"
execution_plan: "ExecutionPlan"
raise_on_error: bool = False
retry_mode: RetryMode = RetryMode.DISABLED
class ExecutionData(NamedTuple):
"""The data that is available to the system during execution.
This object contains information that requires access to user code, such as the pipeline
definition, resources, and intermediate storage.
scoped_resources_builder: ScopedResourcesBuilder
intermediate_storage: "IntermediateStorage"
intermediate_storage_def: "IntermediateStorageDefinition"
environment_config: EnvironmentConfig
pipeline_def: PipelineDefinition
mode_def: ModeDefinition
class IStepContext(IPlanContext):
"""Interface to represent data to be available during either step orchestration or execution."""
def step(self) -> ExecutionStep:
raise NotImplementedError()
def solid_handle(self) -> "SolidHandle":
raise NotImplementedError()
class PlanOrchestrationContext(IPlanContext):
"""Context for the orchestration of a run.
This context assumes inability to run user code directly.
def __init__(
plan_data: PlanData,
log_manager: DagsterLogManager,
executor: Executor,
output_capture: Optional[Dict[StepOutputHandle, Any]],
self._plan_data = plan_data
self._log_manager = log_manager
self._executor = executor
self._output_capture = output_capture
def plan_data(self) -> PlanData:
return self._plan_data
def reconstructable_pipeline(self) -> ReconstructablePipeline:
if not isinstance(self.pipeline, ReconstructablePipeline):
raise DagsterInvariantViolationError(
"reconstructable_pipeline property must be a ReconstructablePipeline"
return self.pipeline
def log(self) -> DagsterLogManager:
return self._log_manager
def executor(self) -> Executor:
return self._executor
def output_capture(self) -> Optional[Dict[StepOutputHandle, Any]]:
return self._output_capture
def for_step(self, step: ExecutionStep) -> "IStepContext":
return StepOrchestrationContext(
class StepOrchestrationContext(PlanOrchestrationContext, IStepContext):
"""Context for the orchestration of a step.
This context assumes inability to run user code directly. Thus, it does not include any resource
or intermediate storage information.
def __init__(self, plan_data, log_manager, executor, step, output_capture):
super(StepOrchestrationContext, self).__init__(
plan_data, log_manager, executor, output_capture
self._step = step
def step(self) -> ExecutionStep:
return self._step
def solid_handle(self) -> "SolidHandle":
return self.step.solid_handle
class PlanExecutionContext(IPlanContext):
"""Context for the execution of a plan.
This context assumes that user code can be run directly, and thus includes resource and
intermediate storage information.
def __init__(
plan_data: PlanData,
execution_data: ExecutionData,
log_manager: DagsterLogManager,
output_capture: Optional[Dict[StepOutputHandle, Any]] = None,
self._plan_data = plan_data
self._execution_data = execution_data
self._log_manager = log_manager
self._output_capture = output_capture
def plan_data(self) -> PlanData:
return self._plan_data
def output_capture(self) -> Optional[Dict[StepOutputHandle, Any]]:
return self._output_capture
def for_step(self, step: ExecutionStep) -> IStepContext:
return StepExecutionContext(
def pipeline_def(self) -> PipelineDefinition:
return self._execution_data.pipeline_def
def environment_config(self) -> EnvironmentConfig:
return self._execution_data.environment_config
def intermediate_storage_def(self) -> "IntermediateStorageDefinition":
return self._execution_data.intermediate_storage_def
def intermediate_storage(self) -> "IntermediateStorage":
return self._execution_data.intermediate_storage
def scoped_resources_builder(self) -> ScopedResourcesBuilder:
return self._execution_data.scoped_resources_builder
def log(self) -> DagsterLogManager:
return self._log_manager
def for_type(self, dagster_type: DagsterType) -> "TypeCheckContext":
return TypeCheckContext(self, dagster_type)
class StepExecutionContext(PlanExecutionContext, IStepContext):
"""Context for the execution of a step.
This context assumes that user code can be run directly, and thus includes resource and
intermediate storage information.
def __init__(
plan_data: PlanData,
execution_data: ExecutionData,
log_manager: DagsterLogManager,
step: ExecutionStep,
output_capture: Optional[Dict[StepOutputHandle, Any]],
from dagster.core.execution.resources_init import get_required_resource_keys_for_step
super(StepExecutionContext, self).__init__(
self._step = step
self._required_resource_keys = get_required_resource_keys_for_step(
self._resources = execution_data.scoped_resources_builder.build(
resources_iter = cast(Iterable, self._resources)
step_launcher_resources = [
resource for resource in resources_iter if isinstance(resource, StepLauncher)
self._step_launcher: Optional[StepLauncher] = None
if len(step_launcher_resources) > 1:
raise DagsterInvariantViolationError(
"Multiple required resources for solid {solid_name} have inherit StepLauncher"
"There should be at most one step launcher resource per solid.".format(
elif len(step_launcher_resources) == 1:
self._step_launcher = step_launcher_resources[0]
def step(self) -> ExecutionStep:
return self._step
def solid_handle(self) -> "SolidHandle":
return self.step.solid_handle
def required_resource_keys(self) -> Set[str]:
return self._required_resource_keys
def resources(self) -> "Resources":
return self._resources
def step_launcher(self) -> Optional[StepLauncher]:
return self._step_launcher
def solid_def(self) -> SolidDefinition:
return self.solid.definition
def pipeline_def(self) -> PipelineDefinition:
return self._execution_data.pipeline_def
def mode_def(self) -> ModeDefinition:
return self._execution_data.mode_def
def solid(self) -> "Solid":
return self.pipeline_def.get_solid(self._step.solid_handle)
def get_io_manager(self, step_output_handle) -> IOManager:
step_output = self.execution_plan.get_step_output(step_output_handle)
io_manager_key = (
# backcompat: if intermediate storage is specified and the user hasn't overridden
# io_manager_key on the output, use the intermediate storage.
if io_manager_key == "io_manager" and not self.using_default_intermediate_storage():
from dagster.core.storage.intermediate_storage import IntermediateStorageAdapter
output_manager = IntermediateStorageAdapter(self.intermediate_storage)
output_manager = getattr(self.resources, io_manager_key)
return check.inst(output_manager, IOManager)
def using_default_intermediate_storage(self) -> bool:
from dagster.core.storage.system_storage import mem_intermediate_storage
# pylint: disable=comparison-with-callable
return (
self.intermediate_storage_def is None
or self.intermediate_storage_def == mem_intermediate_storage
def get_output_context(self, step_output_handle) -> OutputContext:
return get_output_context(
def for_input_manager(
name: str,
config: dict,
metadata: Any,
dagster_type: DagsterType,
source_handle: Optional[StepOutputHandle] = None,
resource_config: Any = None,
resources: Optional["Resources"] = None,
) -> InputContext:
return InputContext(
upstream_output=self.get_output_context(source_handle) if source_handle else None,
def for_hook(self, hook_def: HookDefinition) -> "HookContext":
return HookContext(self, hook_def)
def _get_source_run_id(self, step_output_handle: StepOutputHandle) -> str:
# determine if the step is skipped
if (
# this is re-execution
# we are not re-executing the entire pipeline
and self.pipeline_run.step_keys_to_execute is not None
# this step is not being executed
and step_output_handle.step_key not in self.pipeline_run.step_keys_to_execute
return self.pipeline_run.parent_run_id
return self.pipeline_run.run_id
[docs]class TypeCheckContext:
"""The ``context`` object available to a type check function on a DagsterType.
log (DagsterLogManager): Centralized log dispatch from user code.
resources (Any): An object whose attributes contain the resources available to this solid.
run_id (str): The id of this pipeline run.
def __init__(
plan_execution_context: PlanExecutionContext,
dagster_type: DagsterType,
self._plan_execution_context = plan_execution_context
self._resources = plan_execution_context.scoped_resources_builder.build(
def resources(self) -> "Resources":
return self._resources
def run_id(self) -> str:
return self._plan_execution_context.run_id
def log(self) -> DagsterLogManager:
return self._plan_execution_context.log
[docs]class HookContext:
"""The ``context`` object available to a hook function on an DagsterEvent.
log (DagsterLogManager): Centralized log dispatch from user code.
hook_def (HookDefinition): The hook that the context object belongs to.
solid (Solid): The solid instance associated with the hook.
step_key (str): The key for the step where this hook is being triggered.
resources (NamedTuple): Resources available in the hook context.
solid_config (Any): The parsed config specific to this solid.
pipeline_name (str): The name of the pipeline where this hook is being triggered.
run_id (str): The id of the run where this hook is being triggered.
mode_def (ModeDefinition): The mode with which the pipeline is being run.
def __init__(
step_execution_context: StepExecutionContext,
hook_def: HookDefinition,
self._step_execution_context = step_execution_context
self._hook_def = check.inst_param(hook_def, "hook_def", HookDefinition)
self._required_resource_keys = hook_def.required_resource_keys
self._resources = step_execution_context.scoped_resources_builder.build(
def pipeline_name(self) -> str:
return self._step_execution_context.pipeline_name
def run_id(self) -> str:
return self._step_execution_context.run_id
def hook_def(self) -> HookDefinition:
return self._hook_def
def solid(self) -> "Solid":
return self._step_execution_context.solid
def step(self) -> ExecutionStep:
"The step property of HookContext has been deprecated, and will be removed "
"in a future release."
return self._step_execution_context.step
def step_key(self) -> str:
return self._step_execution_context.step.key
def mode_def(self) -> ModeDefinition:
return self._step_execution_context.mode_def
def required_resource_keys(self) -> Set[str]:
return self._required_resource_keys
def resources(self) -> "Resources":
return self._resources
def solid_config(self) -> Any:
solid_config = self._step_execution_context.environment_config.solids.get(
return solid_config.config if solid_config else None
# Because of the fact that we directly use the log manager of the step, if a user calls
# hook_context.log.with_tags, then they will end up mutating the step's logging tags as well.
# This is not problematic because the hook only runs after the step has been completed.
def log(self) -> DagsterLogManager:
return self._step_execution_context.log