DagsterDocs

Source code for dagster.core.execution.results

from collections import defaultdict

from dagster import check
from dagster.core.definitions import GraphDefinition, PipelineDefinition, Solid, SolidHandle
from dagster.core.definitions.utils import DEFAULT_OUTPUT
from dagster.core.errors import DagsterInvariantViolationError
from dagster.core.events import DagsterEvent, DagsterEventType
from dagster.core.execution.plan.outputs import StepOutputHandle
from dagster.core.execution.plan.step import StepKind
from dagster.core.execution.plan.utils import build_resources_for_manager


def _construct_events_by_step_key(event_list):
    events_by_step_key = defaultdict(list)
    for event in event_list:
        events_by_step_key[event.step_key].append(event)

    return dict(events_by_step_key)


class GraphExecutionResult:
    def __init__(
        self,
        container,
        event_list,
        reconstruct_context,
        pipeline_def,
        handle=None,
        output_capture=None,
    ):
        self.container = check.inst_param(container, "container", GraphDefinition)
        self.event_list = check.list_param(event_list, "step_event_list", of_type=DagsterEvent)
        self.reconstruct_context = check.callable_param(reconstruct_context, "reconstruct_context")
        self.pipeline_def = check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition)
        self.handle = check.opt_inst_param(handle, "handle", SolidHandle)
        self.output_capture = check.opt_dict_param(
            output_capture, "output_capture", key_type=StepOutputHandle
        )
        self._events_by_step_key = _construct_events_by_step_key(event_list)

    @property
    def success(self):
        """bool: Whether all steps in the execution were successful."""
        return all([not event.is_failure for event in self.event_list])

    @property
    def step_event_list(self):
        """List[DagsterEvent] The full list of events generated by steps in the execution.

        Excludes events generated by the pipeline lifecycle, e.g., ``PIPELINE_START``.
        """
        return [event for event in self.event_list if event.is_step_event]

    @property
    def events_by_step_key(self):
        return self._events_by_step_key

    def result_for_solid(self, name):
        """Get the result of a top level solid.

        Args:
            name (str): The name of the top-level solid or aliased solid for which to retrieve the
                result.

        Returns:
            Union[CompositeSolidExecutionResult, SolidExecutionResult]: The result of the solid
            execution within the pipeline.
        """
        if not self.container.has_solid_named(name):
            raise DagsterInvariantViolationError(
                "Tried to get result for solid '{name}' in '{container}'. No such top level "
                "solid.".format(name=name, container=self.container.name)
            )

        return self.result_for_handle(SolidHandle(name, None))

    def output_for_solid(self, handle_str, output_name=DEFAULT_OUTPUT):
        """Get the output of a solid by its solid handle string and output name.

        Args:
            handle_str (str): The string handle for the solid.
            output_name (str): Optional. The name of the output, default to DEFAULT_OUTPUT.

        Returns:
            The output value for the handle and output_name.
        """
        check.str_param(handle_str, "handle_str")
        check.str_param(output_name, "output_name")
        return self.result_for_handle(SolidHandle.from_string(handle_str)).output_value(output_name)

    @property
    def solid_result_list(self):
        """List[Union[CompositeSolidExecutionResult, SolidExecutionResult]]: The results for each
        top level solid."""
        return [self.result_for_solid(solid.name) for solid in self.container.solids]

    def _result_for_handle(self, solid, handle):
        if not solid:
            raise DagsterInvariantViolationError(
                "Can not find solid handle {handle_str}.".format(handle_str=handle.to_string())
            )

        events_by_kind = defaultdict(list)

        if solid.is_composite:
            events = []
            for event in self.event_list:
                if event.is_step_event:
                    if event.solid_handle.is_or_descends_from(handle.with_ancestor(self.handle)):
                        events_by_kind[event.step_kind].append(event)
                        events.append(event)

            return CompositeSolidExecutionResult(
                solid,
                events,
                events_by_kind,
                self.reconstruct_context,
                self.pipeline_def,
                handle=handle.with_ancestor(self.handle),
                output_capture=self.output_capture,
            )
        else:
            for event in self.event_list:
                if event.is_step_event:
                    if event.solid_handle.is_or_descends_from(handle.with_ancestor(self.handle)):
                        events_by_kind[event.step_kind].append(event)

            return SolidExecutionResult(
                solid,
                events_by_kind,
                self.reconstruct_context,
                self.pipeline_def,
                output_capture=self.output_capture,
            )

    def result_for_handle(self, handle):
        """Get the result of a solid by its solid handle.

        This allows indexing into top-level solids to retrieve the results of children of
        composite solids.

        Args:
            handle (Union[str,SolidHandle]): The handle for the solid.

        Returns:
            Union[CompositeSolidExecutionResult, SolidExecutionResult]: The result of the given
            solid.
        """
        if isinstance(handle, str):
            handle = SolidHandle.from_string(handle)
        else:
            check.inst_param(handle, "handle", SolidHandle)

        solid = self.container.get_solid(handle)

        return self._result_for_handle(solid, handle)


[docs]class PipelineExecutionResult(GraphExecutionResult): """The result of executing a pipeline. Returned by :py:func:`execute_pipeline`. Users should not instantiate this class directly. """ def __init__( self, pipeline_def, run_id, event_list, reconstruct_context, output_capture=None, ): self.run_id = check.str_param(run_id, "run_id") check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition) super(PipelineExecutionResult, self).__init__( container=pipeline_def, event_list=event_list, reconstruct_context=reconstruct_context, pipeline_def=pipeline_def, output_capture=output_capture, )
[docs]class CompositeSolidExecutionResult(GraphExecutionResult): """Execution result for a composite solid in a pipeline. Users should not instantiate this class directly. """ def __init__( self, solid, event_list, step_events_by_kind, reconstruct_context, pipeline_def, handle=None, output_capture=None, ): check.inst_param(solid, "solid", Solid) check.invariant( solid.is_composite, desc="Tried to instantiate a CompositeSolidExecutionResult with a noncomposite solid", ) self.solid = solid self.step_events_by_kind = check.dict_param( step_events_by_kind, "step_events_by_kind", key_type=StepKind, value_type=list ) self.output_capture = check.opt_dict_param( output_capture, "output_capture", key_type=StepOutputHandle ) super(CompositeSolidExecutionResult, self).__init__( container=solid.definition, event_list=event_list, reconstruct_context=reconstruct_context, pipeline_def=pipeline_def, handle=handle, output_capture=output_capture, ) def output_values_for_solid(self, name): check.str_param(name, "name") return self.result_for_solid(name).output_values def output_values_for_handle(self, handle_str): check.str_param(handle_str, "handle_str") return self.result_for_handle(handle_str).output_values def output_value_for_solid(self, name, output_name=DEFAULT_OUTPUT): check.str_param(name, "name") check.str_param(output_name, "output_name") return self.result_for_solid(name).output_value(output_name) def output_value_for_handle(self, handle_str, output_name=DEFAULT_OUTPUT): check.str_param(handle_str, "handle_str") check.str_param(output_name, "output_name") return self.result_for_handle(handle_str).output_value(output_name) @property def output_values(self): values = {} for output_name in self.solid.definition.output_dict: output_mapping = self.solid.definition.get_output_mapping(output_name) inner_solid_values = self._result_for_handle( self.solid.definition.solid_named(output_mapping.maps_from.solid_name), SolidHandle(output_mapping.maps_from.solid_name, None), ).output_values if inner_solid_values is not None: # may be None if inner solid was skipped if output_mapping.maps_from.output_name in inner_solid_values: values[output_name] = inner_solid_values[output_mapping.maps_from.output_name] return values def output_value(self, output_name=DEFAULT_OUTPUT): check.str_param(output_name, "output_name") if not self.solid.definition.has_output(output_name): raise DagsterInvariantViolationError( "Output '{output_name}' not defined in composite solid '{solid}': " "{outputs_clause}. If you were expecting this output to be present, you may " "be missing an output_mapping from an inner solid to its enclosing composite " "solid.".format( output_name=output_name, solid=self.solid.name, outputs_clause="found outputs {output_names}".format( output_names=str(list(self.solid.definition.output_dict.keys())) ) if self.solid.definition.output_dict else "no output mappings were defined", ) ) output_mapping = self.solid.definition.get_output_mapping(output_name) return self._result_for_handle( self.solid.definition.solid_named(output_mapping.maps_from.solid_name), SolidHandle(output_mapping.maps_from.solid_name, None), ).output_value(output_mapping.maps_from.output_name)
[docs]class SolidExecutionResult: """Execution result for a leaf solid in a pipeline. Users should not instantiate this class. """ def __init__( self, solid, step_events_by_kind, reconstruct_context, pipeline_def, output_capture=None ): check.inst_param(solid, "solid", Solid) check.invariant( not solid.is_composite, desc="Tried to instantiate a SolidExecutionResult with a composite solid", ) self.solid = solid self.step_events_by_kind = check.dict_param( step_events_by_kind, "step_events_by_kind", key_type=StepKind, value_type=list ) self.reconstruct_context = check.callable_param(reconstruct_context, "reconstruct_context") self.output_capture = check.opt_dict_param(output_capture, "output_capture") self.pipeline_def = check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition) @property def compute_input_event_dict(self): """Dict[str, DagsterEvent]: All events of type ``STEP_INPUT``, keyed by input name.""" return {se.event_specific_data.input_name: se for se in self.input_events_during_compute} @property def input_events_during_compute(self): """List[DagsterEvent]: All events of type ``STEP_INPUT``.""" return self._compute_steps_of_type(DagsterEventType.STEP_INPUT)
[docs] def get_output_event_for_compute(self, output_name="result"): """The ``STEP_OUTPUT`` event for the given output name. Throws if not present. Args: output_name (Optional[str]): The name of the output. (default: 'result') Returns: DagsterEvent: The corresponding event. """ events = self.get_output_events_for_compute(output_name) check.invariant( len(events) == 1, "Multiple output events returned, use get_output_events_for_compute" ) return events[0]
@property def compute_output_events_dict(self): """Dict[str, List[DagsterEvent]]: All events of type ``STEP_OUTPUT``, keyed by output name""" results = defaultdict(list) for se in self.output_events_during_compute: results[se.step_output_data.output_name].append(se) return dict(results)
[docs] def get_output_events_for_compute(self, output_name="result"): """The ``STEP_OUTPUT`` event for the given output name. Throws if not present. Args: output_name (Optional[str]): The name of the output. (default: 'result') Returns: List[DagsterEvent]: The corresponding events. """ return self.compute_output_events_dict[output_name]
@property def output_events_during_compute(self): """List[DagsterEvent]: All events of type ``STEP_OUTPUT``.""" return self._compute_steps_of_type(DagsterEventType.STEP_OUTPUT) @property def compute_step_events(self): """List[DagsterEvent]: All events generated by execution of the solid compute function.""" return self.step_events_by_kind.get(StepKind.COMPUTE, []) @property def step_events(self): return self.compute_step_events @property def materializations_during_compute(self): """List[Materialization]: All materializations yielded by the solid.""" return [ mat_event.event_specific_data.materialization for mat_event in self.materialization_events_during_compute ] @property def materialization_events_during_compute(self): """List[DagsterEvent]: All events of type ``ASSET_MATERIALIZATION``.""" return self._compute_steps_of_type(DagsterEventType.ASSET_MATERIALIZATION) @property def expectation_events_during_compute(self): """List[DagsterEvent]: All events of type ``STEP_EXPECTATION_RESULT``.""" return self._compute_steps_of_type(DagsterEventType.STEP_EXPECTATION_RESULT) def _compute_steps_of_type(self, dagster_event_type): return list( filter(lambda se: se.event_type == dagster_event_type, self.compute_step_events) ) @property def expectation_results_during_compute(self): """List[ExpectationResult]: All expectation results yielded by the solid""" return [ expt_event.event_specific_data.expectation_result for expt_event in self.expectation_events_during_compute ]
[docs] def get_step_success_event(self): """DagsterEvent: The ``STEP_SUCCESS`` event, throws if not present.""" for step_event in self.compute_step_events: if step_event.event_type == DagsterEventType.STEP_SUCCESS: return step_event check.failed("Step success not found for solid {}".format(self.solid.name))
@property def compute_step_failure_event(self): """DagsterEvent: The ``STEP_FAILURE`` event, throws if it did not fail.""" if self.success: raise DagsterInvariantViolationError( "Cannot call compute_step_failure_event if successful" ) step_failure_events = self._compute_steps_of_type(DagsterEventType.STEP_FAILURE) check.invariant(len(step_failure_events) == 1) return step_failure_events[0] @property def success(self): """bool: Whether solid execution was successful.""" any_success = False for step_event in self.compute_step_events: if step_event.event_type == DagsterEventType.STEP_FAILURE: return False if step_event.event_type == DagsterEventType.STEP_SUCCESS: any_success = True return any_success @property def skipped(self): """bool: Whether solid execution was skipped.""" return all( [ step_event.event_type == DagsterEventType.STEP_SKIPPED for step_event in self.compute_step_events ] ) @property def output_values(self): """Union[None, Dict[str, Union[Any, Dict[str, Any]]]: The computed output values. Returns ``None`` if execution did not succeed. Returns a dictionary where keys are output names and the values are: * the output values in the normal case * a dictionary from mapping key to corresponding value in the mapped case Note that accessing this property will reconstruct the pipeline context (including, e.g., resources) to retrieve materialized output values. """ if not self.success or not self.compute_step_events: return None results = {} with self.reconstruct_context() as context: for compute_step_event in self.compute_step_events: if compute_step_event.is_successful_output: output = compute_step_event.step_output_data step = context.execution_plan.get_step_by_key(compute_step_event.step_key) value = self._get_value(context.for_step(step), output) check.invariant( not (output.mapping_key and step.get_mapping_key()), "Not set up to handle mapped outputs downstream of mapped steps", ) mapping_key = output.mapping_key or step.get_mapping_key() if mapping_key: if results.get(output.output_name) is None: results[output.output_name] = {mapping_key: value} else: results[output.output_name][mapping_key] = value else: results[output.output_name] = value return results
[docs] def output_value(self, output_name=DEFAULT_OUTPUT): """Get a computed output value. Note that calling this method will reconstruct the pipeline context (including, e.g., resources) to retrieve materialized output values. Args: output_name(str): The output name for which to retrieve the value. (default: 'result') Returns: Union[None, Any, Dict[str, Any]]: ``None`` if execution did not succeed, the output value in the normal case, and a dict of mapping keys to values in the mapped case. """ check.str_param(output_name, "output_name") if not self.solid.definition.has_output(output_name): raise DagsterInvariantViolationError( "Output '{output_name}' not defined in solid '{solid}': found outputs " "{output_names}".format( output_name=output_name, solid=self.solid.name, output_names=str(list(self.solid.definition.output_dict.keys())), ) ) if not self.success: return None with self.reconstruct_context() as context: found = False result = None for compute_step_event in self.compute_step_events: if ( compute_step_event.is_successful_output and compute_step_event.step_output_data.output_name == output_name ): found = True output = compute_step_event.step_output_data step = context.execution_plan.get_step_by_key(compute_step_event.step_key) value = self._get_value(context.for_step(step), output) check.invariant( not (output.mapping_key and step.get_mapping_key()), "Not set up to handle mapped outputs downstream of mapped steps", ) mapping_key = output.mapping_key or step.get_mapping_key() if mapping_key: if result is None: result = {mapping_key: value} else: result[mapping_key] = value else: result = value if found: return result raise DagsterInvariantViolationError( ( "Did not find result {output_name} in solid {self.solid.name} " "execution result" ).format(output_name=output_name, self=self) )
def _get_value(self, context, step_output_data): step_output_handle = step_output_data.step_output_handle # output capture dictionary will only have values in the in process case, but will not have # values from steps launched via step launcher. if self.output_capture and step_output_handle in self.output_capture: return self.output_capture[step_output_handle] manager = context.get_io_manager(step_output_handle) manager_key = context.execution_plan.get_manager_key(step_output_handle, self.pipeline_def) res = manager.load_input( context.for_input_manager( name=None, config=None, metadata=None, dagster_type=self.solid.output_def_named(step_output_data.output_name).dagster_type, source_handle=step_output_handle, resource_config=context.environment_config.resources[manager_key].config, resources=build_resources_for_manager(manager_key, context), ) ) return res @property def failure_data(self): """Union[None, StepFailureData]: Any data corresponding to this step's failure, if it failed.""" for step_event in self.compute_step_events: if step_event.event_type == DagsterEventType.STEP_FAILURE: return step_event.step_failure_data @property def retry_attempts(self) -> int: """Number of times this step retried""" count = 0 for step_event in self.compute_step_events: if step_event.event_type == DagsterEventType.STEP_RESTARTED: count += 1 return count