DagsterDocs

Source code for dagster.core.events

"""Structured representations of system events."""
import logging
import os
from collections import namedtuple
from enum import Enum
from typing import NamedTuple, Optional

from dagster import check
from dagster.core.definitions import (
    AssetKey,
    AssetMaterialization,
    EventMetadataEntry,
    ExpectationResult,
    HookDefinition,
    Materialization,
    SolidHandle,
)
from dagster.core.definitions.events import AssetLineageInfo, ObjectStoreOperationType
from dagster.core.errors import HookExecutionError
from dagster.core.execution.context.system import (
    HookContext,
    IStepContext,
    PlanExecutionContext,
    PlanOrchestrationContext,
    StepExecutionContext,
)
from dagster.core.execution.plan.handle import ResolvedFromDynamicStepHandle, StepHandle
from dagster.core.execution.plan.outputs import StepOutputData
from dagster.core.log_manager import DagsterLogManager
from dagster.serdes import register_serdes_tuple_fallbacks, whitelist_for_serdes
from dagster.utils.error import SerializableErrorInfo, serializable_error_info_from_exc_info
from dagster.utils.timing import format_duration


[docs]class DagsterEventType(Enum): """The types of events that may be yielded by solid and pipeline execution.""" STEP_OUTPUT = "STEP_OUTPUT" STEP_INPUT = "STEP_INPUT" STEP_FAILURE = "STEP_FAILURE" STEP_START = "STEP_START" STEP_SUCCESS = "STEP_SUCCESS" STEP_SKIPPED = "STEP_SKIPPED" STEP_UP_FOR_RETRY = "STEP_UP_FOR_RETRY" # "failed" but want to retry STEP_RESTARTED = "STEP_RESTARTED" ASSET_MATERIALIZATION = "ASSET_MATERIALIZATION" STEP_EXPECTATION_RESULT = "STEP_EXPECTATION_RESULT" PIPELINE_INIT_FAILURE = "PIPELINE_INIT_FAILURE" PIPELINE_ENQUEUED = "PIPELINE_ENQUEUED" PIPELINE_DEQUEUED = "PIPELINE_DEQUEUED" PIPELINE_STARTING = "PIPELINE_STARTING" # Launch is happening, execution hasn't started yet PIPELINE_START = "PIPELINE_START" # Execution has started PIPELINE_SUCCESS = "PIPELINE_SUCCESS" PIPELINE_FAILURE = "PIPELINE_FAILURE" PIPELINE_CANCELING = "PIPELINE_CANCELING" PIPELINE_CANCELED = "PIPELINE_CANCELED" OBJECT_STORE_OPERATION = "OBJECT_STORE_OPERATION" ASSET_STORE_OPERATION = "ASSET_STORE_OPERATION" LOADED_INPUT = "LOADED_INPUT" HANDLED_OUTPUT = "HANDLED_OUTPUT" ENGINE_EVENT = "ENGINE_EVENT" HOOK_COMPLETED = "HOOK_COMPLETED" HOOK_ERRORED = "HOOK_ERRORED" HOOK_SKIPPED = "HOOK_SKIPPED"
STEP_EVENTS = { DagsterEventType.STEP_INPUT, DagsterEventType.STEP_START, DagsterEventType.STEP_OUTPUT, DagsterEventType.STEP_FAILURE, DagsterEventType.STEP_SUCCESS, DagsterEventType.STEP_SKIPPED, DagsterEventType.ASSET_MATERIALIZATION, DagsterEventType.STEP_EXPECTATION_RESULT, DagsterEventType.OBJECT_STORE_OPERATION, DagsterEventType.HANDLED_OUTPUT, DagsterEventType.LOADED_INPUT, DagsterEventType.STEP_RESTARTED, DagsterEventType.STEP_UP_FOR_RETRY, } FAILURE_EVENTS = { DagsterEventType.PIPELINE_INIT_FAILURE, DagsterEventType.PIPELINE_FAILURE, DagsterEventType.STEP_FAILURE, DagsterEventType.PIPELINE_CANCELED, } PIPELINE_EVENTS = { DagsterEventType.PIPELINE_ENQUEUED, DagsterEventType.PIPELINE_DEQUEUED, DagsterEventType.PIPELINE_STARTING, DagsterEventType.PIPELINE_START, DagsterEventType.PIPELINE_SUCCESS, DagsterEventType.PIPELINE_INIT_FAILURE, DagsterEventType.PIPELINE_FAILURE, DagsterEventType.PIPELINE_CANCELING, DagsterEventType.PIPELINE_CANCELED, } HOOK_EVENTS = { DagsterEventType.HOOK_COMPLETED, DagsterEventType.HOOK_ERRORED, DagsterEventType.HOOK_SKIPPED, } def _assert_type(method, expected_type, actual_type): check.invariant( expected_type == actual_type, ( "{method} only callable when event_type is {expected_type}, called on {actual_type}" ).format(method=method, expected_type=expected_type, actual_type=actual_type), ) def _validate_event_specific_data(event_type, event_specific_data): from dagster.core.execution.plan.objects import StepFailureData, StepSuccessData from dagster.core.execution.plan.inputs import StepInputData if event_type == DagsterEventType.STEP_OUTPUT: check.inst_param(event_specific_data, "event_specific_data", StepOutputData) elif event_type == DagsterEventType.STEP_FAILURE: check.inst_param(event_specific_data, "event_specific_data", StepFailureData) elif event_type == DagsterEventType.STEP_SUCCESS: check.inst_param(event_specific_data, "event_specific_data", StepSuccessData) elif event_type == DagsterEventType.ASSET_MATERIALIZATION: check.inst_param(event_specific_data, "event_specific_data", StepMaterializationData) elif event_type == DagsterEventType.STEP_EXPECTATION_RESULT: check.inst_param(event_specific_data, "event_specific_data", StepExpectationResultData) elif event_type == DagsterEventType.STEP_INPUT: check.inst_param(event_specific_data, "event_specific_data", StepInputData) elif event_type == DagsterEventType.ENGINE_EVENT: check.inst_param(event_specific_data, "event_specific_data", EngineEventData) elif event_type == DagsterEventType.HOOK_ERRORED: check.inst_param(event_specific_data, "event_specific_data", HookErroredData) return event_specific_data def log_step_event(step_context: IStepContext, event: "DagsterEvent") -> None: check.inst_param(step_context, "step_context", IStepContext) check.inst_param(event, "event", DagsterEvent) event_type = DagsterEventType(event.event_type_value) log_fn = step_context.log.error if event_type in FAILURE_EVENTS else step_context.log.debug log_fn( event.message or "{event_type} for step {step_key}".format( event_type=event_type, step_key=step_context.step.key ), dagster_event=event, pipeline_name=step_context.pipeline_name, ) def log_pipeline_event(pipeline_context, event, step_key): event_type = DagsterEventType(event.event_type_value) log_fn = ( pipeline_context.log.error if event_type in FAILURE_EVENTS else pipeline_context.log.debug ) log_fn( event.message or "{event_type} for pipeline {pipeline_name}".format( event_type=event_type, pipeline_name=pipeline_context.pipeline_name ), dagster_event=event, pipeline_name=pipeline_context.pipeline_name, step_key=step_key, ) def log_resource_event(log_manager, pipeline_name, event): check.inst_param(log_manager, "log_manager", DagsterLogManager) check.inst_param(event, "event", DagsterEvent) check.inst(event.event_specific_data, EngineEventData) log_fn = log_manager.error if event.event_specific_data.error else log_manager.debug log_fn(event.message, dagster_event=event, pipeline_name=pipeline_name, step_key=event.step_key)
[docs]@whitelist_for_serdes class DagsterEvent( namedtuple( "_DagsterEvent", "event_type_value pipeline_name step_handle solid_handle step_kind_value " "logging_tags event_specific_data message pid step_key", ) ): """Events yielded by solid and pipeline execution. Users should not instantiate this class. Attributes: event_type_value (str): Value for a DagsterEventType. pipeline_name (str) step_key (str) solid_handle (SolidHandle) step_kind_value (str): Value for a StepKind. logging_tags (Dict[str, str]) event_specific_data (Any): Type must correspond to event_type_value. message (str) pid (int) step_key (Optional[str]): DEPRECATED """ @staticmethod def from_step( event_type: "DagsterEventType", step_context: IStepContext, event_specific_data: Optional[NamedTuple] = None, message: Optional[str] = None, ) -> "DagsterEvent": event = DagsterEvent( event_type_value=check.inst_param(event_type, "event_type", DagsterEventType).value, pipeline_name=step_context.pipeline_name, step_handle=step_context.step.handle, solid_handle=step_context.step.solid_handle, step_kind_value=step_context.step.kind.value, logging_tags=step_context.logging_tags, event_specific_data=_validate_event_specific_data(event_type, event_specific_data), message=check.opt_str_param(message, "message"), pid=os.getpid(), ) log_step_event(step_context, event) return event @staticmethod def from_pipeline( event_type, pipeline_context, message=None, event_specific_data=None, step_handle=None ): check.opt_inst_param( step_handle, "step_handle", (StepHandle, ResolvedFromDynamicStepHandle) ) event = DagsterEvent( event_type_value=check.inst_param(event_type, "event_type", DagsterEventType).value, pipeline_name=pipeline_context.pipeline_name, message=check.opt_str_param(message, "message"), event_specific_data=_validate_event_specific_data(event_type, event_specific_data), step_handle=step_handle, pid=os.getpid(), ) step_key = step_handle.to_key() if step_handle else None log_pipeline_event(pipeline_context, event, step_key) return event @staticmethod def from_resource( pipeline_name, execution_plan, log_manager, message=None, event_specific_data=None ): from dagster.core.execution.plan.plan import ExecutionPlan check.inst_param(execution_plan, "execution_plan", ExecutionPlan) event = DagsterEvent( DagsterEventType.ENGINE_EVENT.value, pipeline_name=pipeline_name, message=check.opt_str_param(message, "message"), event_specific_data=_validate_event_specific_data( DagsterEventType.ENGINE_EVENT, event_specific_data ), step_handle=execution_plan.step_handle_for_single_step_plans(), pid=os.getpid(), ) log_resource_event(log_manager, pipeline_name, event) return event def __new__( cls, event_type_value, pipeline_name, step_handle=None, solid_handle=None, step_kind_value=None, logging_tags=None, event_specific_data=None, message=None, pid=None, # legacy step_key=None, ): event_type_value, event_specific_data = _handle_back_compat( event_type_value, event_specific_data ) # old events may contain solid_handle but not step_handle if solid_handle is not None and step_handle is None: step_handle = StepHandle(solid_handle) # Legacy events may have step_key set directly, preserve those to stay in sync # with legacy execution plan snapshots. if step_handle is not None and step_key is None: step_key = step_handle.to_key() return super(DagsterEvent, cls).__new__( cls, check.str_param(event_type_value, "event_type_value"), check.str_param(pipeline_name, "pipeline_name"), check.opt_inst_param( step_handle, "step_handle", (StepHandle, ResolvedFromDynamicStepHandle) ), check.opt_inst_param(solid_handle, "solid_handle", SolidHandle), check.opt_str_param(step_kind_value, "step_kind_value"), check.opt_dict_param(logging_tags, "logging_tags"), _validate_event_specific_data(DagsterEventType(event_type_value), event_specific_data), check.opt_str_param(message, "message"), check.opt_int_param(pid, "pid"), check.opt_str_param(step_key, "step_key"), ) @property def solid_name(self): return self.solid_handle.name @property def event_type(self): """DagsterEventType: The type of this event.""" return DagsterEventType(self.event_type_value) @property def is_step_event(self): return self.event_type in STEP_EVENTS @property def is_hook_event(self): return self.event_type in HOOK_EVENTS @property def step_kind(self): from dagster.core.execution.plan.step import StepKind return StepKind(self.step_kind_value) @property def is_step_success(self): return self.event_type == DagsterEventType.STEP_SUCCESS @property def is_successful_output(self): return self.event_type == DagsterEventType.STEP_OUTPUT @property def is_step_start(self): return self.event_type == DagsterEventType.STEP_START @property def is_step_failure(self): return self.event_type == DagsterEventType.STEP_FAILURE @property def is_step_skipped(self): return self.event_type == DagsterEventType.STEP_SKIPPED @property def is_step_up_for_retry(self): return self.event_type == DagsterEventType.STEP_UP_FOR_RETRY @property def is_step_restarted(self): return self.event_type == DagsterEventType.STEP_RESTARTED @property def is_pipeline_success(self): return self.event_type == DagsterEventType.PIPELINE_SUCCESS @property def is_pipeline_failure(self): return self.event_type == DagsterEventType.PIPELINE_FAILURE @property def is_pipeline_init_failure(self): return self.event_type == DagsterEventType.PIPELINE_INIT_FAILURE @property def is_failure(self): return self.event_type in FAILURE_EVENTS @property def is_pipeline_event(self): return self.event_type in PIPELINE_EVENTS @property def is_engine_event(self): return self.event_type == DagsterEventType.ENGINE_EVENT @property def is_handled_output(self): return self.event_type == DagsterEventType.HANDLED_OUTPUT @property def is_loaded_input(self): return self.event_type == DagsterEventType.LOADED_INPUT @property def is_step_materialization(self): return self.event_type == DagsterEventType.ASSET_MATERIALIZATION @property def asset_key(self): if self.event_type != DagsterEventType.ASSET_MATERIALIZATION: return None return self.step_materialization_data.materialization.asset_key @property def partition(self): if self.event_type != DagsterEventType.ASSET_MATERIALIZATION: return None return self.step_materialization_data.materialization.partition @property def step_input_data(self): _assert_type("step_input_data", DagsterEventType.STEP_INPUT, self.event_type) return self.event_specific_data @property def step_output_data(self): _assert_type("step_output_data", DagsterEventType.STEP_OUTPUT, self.event_type) return self.event_specific_data @property def step_success_data(self): _assert_type("step_success_data", DagsterEventType.STEP_SUCCESS, self.event_type) return self.event_specific_data @property def step_failure_data(self): _assert_type("step_failure_data", DagsterEventType.STEP_FAILURE, self.event_type) return self.event_specific_data @property def step_retry_data(self): _assert_type("step_retry_data", DagsterEventType.STEP_UP_FOR_RETRY, self.event_type) return self.event_specific_data @property def step_materialization_data(self): _assert_type( "step_materialization_data", DagsterEventType.ASSET_MATERIALIZATION, self.event_type ) return self.event_specific_data @property def step_expectation_result_data(self): _assert_type( "step_expectation_result_data", DagsterEventType.STEP_EXPECTATION_RESULT, self.event_type, ) return self.event_specific_data @property def pipeline_init_failure_data(self): _assert_type( "pipeline_init_failure_data", DagsterEventType.PIPELINE_INIT_FAILURE, self.event_type ) return self.event_specific_data @property def pipeline_failure_data(self): _assert_type("pipeline_failure_data", DagsterEventType.PIPELINE_FAILURE, self.event_type) return self.event_specific_data @property def engine_event_data(self): _assert_type("engine_event_data", DagsterEventType.ENGINE_EVENT, self.event_type) return self.event_specific_data @property def hook_completed_data(self): _assert_type("hook_completed_data", DagsterEventType.HOOK_COMPLETED, self.event_type) return self.event_specific_data @property def hook_errored_data(self): _assert_type("hook_errored_data", DagsterEventType.HOOK_ERRORED, self.event_type) return self.event_specific_data @property def hook_skipped_data(self): _assert_type("hook_skipped_data", DagsterEventType.HOOK_SKIPPED, self.event_type) return self.event_specific_data @staticmethod def step_output_event(step_context, step_output_data): check.inst_param(step_output_data, "step_output_data", StepOutputData) output_def = step_context.solid.output_def_named( step_output_data.step_output_handle.output_name ) return DagsterEvent.from_step( event_type=DagsterEventType.STEP_OUTPUT, step_context=step_context, event_specific_data=step_output_data, message='Yielded output "{output_name}"{mapping_clause} of type "{output_type}".{type_check_clause}'.format( output_name=step_output_data.step_output_handle.output_name, output_type=output_def.dagster_type.display_name, type_check_clause=( " Warning! Type check failed." if not step_output_data.type_check_data.success else " (Type check passed)." ) if step_output_data.type_check_data else " (No type check).", mapping_clause=f' mapping key "{step_output_data.step_output_handle.mapping_key}"' if step_output_data.step_output_handle.mapping_key else "", ), ) @staticmethod def step_failure_event(step_context, step_failure_data): return DagsterEvent.from_step( event_type=DagsterEventType.STEP_FAILURE, step_context=step_context, event_specific_data=step_failure_data, message='Execution of step "{step_key}" failed.'.format(step_key=step_context.step.key), ) @staticmethod def step_retry_event(step_context, step_retry_data): return DagsterEvent.from_step( event_type=DagsterEventType.STEP_UP_FOR_RETRY, step_context=step_context, event_specific_data=step_retry_data, message='Execution of step "{step_key}" failed and has requested a retry{wait_str}.'.format( step_key=step_context.step.key, wait_str=" in {n} seconds".format(n=step_retry_data.seconds_to_wait) if step_retry_data.seconds_to_wait else "", ), ) @staticmethod def step_input_event(step_context, step_input_data): step_input = step_context.step.step_input_named(step_input_data.input_name) input_def = step_input.source.get_input_def(step_context.pipeline_def) return DagsterEvent.from_step( event_type=DagsterEventType.STEP_INPUT, step_context=step_context, event_specific_data=step_input_data, message='Got input "{input_name}" of type "{input_type}".{type_check_clause}'.format( input_name=step_input_data.input_name, input_type=input_def.dagster_type.display_name, type_check_clause=( " Warning! Type check failed." if not step_input_data.type_check_data.success else " (Type check passed)." ) if step_input_data.type_check_data else " (No type check).", ), ) @staticmethod def step_start_event(step_context): return DagsterEvent.from_step( event_type=DagsterEventType.STEP_START, step_context=step_context, message='Started execution of step "{step_key}".'.format( step_key=step_context.step.key ), ) @staticmethod def step_restarted_event(step_context, previous_attempts): return DagsterEvent.from_step( event_type=DagsterEventType.STEP_RESTARTED, step_context=step_context, message='Started re-execution (attempt # {n}) of step "{step_key}".'.format( step_key=step_context.step.key, n=previous_attempts + 1 ), ) @staticmethod def step_success_event(step_context, success): return DagsterEvent.from_step( event_type=DagsterEventType.STEP_SUCCESS, step_context=step_context, event_specific_data=success, message='Finished execution of step "{step_key}" in {duration}.'.format( step_key=step_context.step.key, duration=format_duration(success.duration_ms), ), ) @staticmethod def step_skipped_event(step_context): return DagsterEvent.from_step( event_type=DagsterEventType.STEP_SKIPPED, step_context=step_context, message='Skipped execution of step "{step_key}".'.format( step_key=step_context.step.key ), ) @staticmethod def asset_materialization(step_context, materialization, asset_lineage=None): check.inst_param( materialization, "materialization", (AssetMaterialization, Materialization) ) check.opt_list_param(asset_lineage, "asset_lineage", AssetLineageInfo) return DagsterEvent.from_step( event_type=DagsterEventType.ASSET_MATERIALIZATION, step_context=step_context, event_specific_data=StepMaterializationData(materialization, asset_lineage), message=materialization.description if materialization.description else "Materialized value{label_clause}.".format( label_clause=" {label}".format(label=materialization.label) if materialization.label else "" ), ) @staticmethod def step_expectation_result(step_context, expectation_result): check.inst_param(expectation_result, "expectation_result", ExpectationResult) def _msg(): if expectation_result.description: return expectation_result.description return "Expectation{label_clause} {result_verb}".format( label_clause=" " + expectation_result.label if expectation_result.label else "", result_verb="passed" if expectation_result.success else "failed", ) return DagsterEvent.from_step( event_type=DagsterEventType.STEP_EXPECTATION_RESULT, step_context=step_context, event_specific_data=StepExpectationResultData(expectation_result), message=_msg(), ) @staticmethod def pipeline_start(pipeline_context): return DagsterEvent.from_pipeline( DagsterEventType.PIPELINE_START, pipeline_context, message='Started execution of pipeline "{pipeline_name}".'.format( pipeline_name=pipeline_context.pipeline_name ), ) @staticmethod def pipeline_success(pipeline_context): return DagsterEvent.from_pipeline( DagsterEventType.PIPELINE_SUCCESS, pipeline_context, message='Finished execution of pipeline "{pipeline_name}".'.format( pipeline_name=pipeline_context.pipeline_name ), ) @staticmethod def pipeline_failure(pipeline_context, context_msg, error_info=None): return DagsterEvent.from_pipeline( DagsterEventType.PIPELINE_FAILURE, pipeline_context, message='Execution of pipeline "{pipeline_name}" failed. {context_msg}'.format( pipeline_name=pipeline_context.pipeline_name, context_msg=check.str_param(context_msg, "context_msg"), ), event_specific_data=PipelineFailureData( check.opt_inst_param(error_info, "error_info", SerializableErrorInfo) ), ) @staticmethod def pipeline_canceled(pipeline_context, error_info=None): return DagsterEvent.from_pipeline( DagsterEventType.PIPELINE_CANCELED, pipeline_context, message='Execution of pipeline "{pipeline_name}" canceled.'.format( pipeline_name=pipeline_context.pipeline_name ), event_specific_data=PipelineCanceledData( check.opt_inst_param(error_info, "error_info", SerializableErrorInfo) ), ) @staticmethod def resource_init_start(pipeline_name, execution_plan, log_manager, resource_keys): from dagster.core.execution.plan.plan import ExecutionPlan return DagsterEvent.from_resource( pipeline_name=check.str_param(pipeline_name, "pipeline_name"), execution_plan=check.inst_param(execution_plan, "execution_plan", ExecutionPlan), log_manager=check.inst_param(log_manager, "log_manager", DagsterLogManager), message="Starting initialization of resources [{}].".format( ", ".join(sorted(resource_keys)) ), event_specific_data=EngineEventData(metadata_entries=[], marker_start="resources"), ) @staticmethod def resource_init_success( pipeline_name, execution_plan, log_manager, resource_instances, resource_init_times ): from dagster.core.execution.plan.plan import ExecutionPlan metadata_entries = [] for resource_key in resource_instances.keys(): resource_obj = resource_instances[resource_key] resource_time = resource_init_times[resource_key] metadata_entries.append( EventMetadataEntry.python_artifact( resource_obj.__class__, resource_key, "Initialized in {}".format(resource_time) ) ) return DagsterEvent.from_resource( pipeline_name=check.str_param(pipeline_name, "pipeline_name"), execution_plan=check.inst_param(execution_plan, "execution_plan", ExecutionPlan), log_manager=check.inst_param(log_manager, "log_manager", DagsterLogManager), message="Finished initialization of resources [{}].".format( ", ".join(sorted(resource_init_times.keys())) ), event_specific_data=EngineEventData( metadata_entries=metadata_entries, marker_end="resources", ), ) @staticmethod def resource_init_failure(pipeline_name, execution_plan, log_manager, resource_keys, error): from dagster.core.execution.plan.plan import ExecutionPlan return DagsterEvent.from_resource( pipeline_name=check.str_param(pipeline_name, "pipeline_name"), execution_plan=check.inst_param(execution_plan, "execution_plan", ExecutionPlan), log_manager=check.inst_param(log_manager, "log_manager", DagsterLogManager), message="Initialization of resources [{}] failed.".format(", ".join(resource_keys)), event_specific_data=EngineEventData( metadata_entries=[], marker_end="resources", error=error, ), ) @staticmethod def resource_teardown_failure(pipeline_name, execution_plan, log_manager, resource_keys, error): from dagster.core.execution.plan.plan import ExecutionPlan return DagsterEvent.from_resource( pipeline_name=check.str_param(pipeline_name, "pipeline_name"), execution_plan=check.inst_param(execution_plan, "execution_plan", ExecutionPlan), log_manager=check.inst_param(log_manager, "log_manager", DagsterLogManager), message="Teardown of resources [{}] failed.".format(", ".join(resource_keys)), event_specific_data=EngineEventData( metadata_entries=[], marker_start=None, marker_end=None, error=error, ), ) @staticmethod def pipeline_init_failure(pipeline_name, failure_data, log_manager): check.inst_param(failure_data, "failure_data", PipelineInitFailureData) check.inst_param(log_manager, "log_manager", DagsterLogManager) # this failure happens trying to bring up context so can't use from_pipeline event = DagsterEvent( event_type_value=DagsterEventType.PIPELINE_INIT_FAILURE.value, pipeline_name=pipeline_name, event_specific_data=failure_data, message=( 'Pipeline failure during initialization for pipeline "{pipeline_name}". ' "This may be due to a failure in initializing the executor or one of the loggers." ).format(pipeline_name=pipeline_name), pid=os.getpid(), ) log_manager.error( event.message or "{event_type} for pipeline {pipeline_name}".format( event_type=DagsterEventType.PIPELINE_INIT_FAILURE, pipeline_name=pipeline_name ), dagster_event=event, pipeline_name=pipeline_name, ) return event @staticmethod def engine_event(pipeline_context, message, event_specific_data=None, step_handle=None): return DagsterEvent.from_pipeline( DagsterEventType.ENGINE_EVENT, pipeline_context, message, event_specific_data=event_specific_data, step_handle=step_handle, ) @staticmethod def object_store_operation(step_context, object_store_operation_result): from dagster.core.definitions.events import ObjectStoreOperation check.inst_param( object_store_operation_result, "object_store_operation_result", ObjectStoreOperation ) object_store_name = ( "{object_store_name} ".format( object_store_name=object_store_operation_result.object_store_name ) if object_store_operation_result.object_store_name else "" ) serialization_strategy_modifier = ( " using {serialization_strategy_name}".format( serialization_strategy_name=object_store_operation_result.serialization_strategy_name ) if object_store_operation_result.serialization_strategy_name else "" ) value_name = object_store_operation_result.value_name if ( ObjectStoreOperationType(object_store_operation_result.op) == ObjectStoreOperationType.SET_OBJECT ): message = ( "Stored intermediate object for output {value_name} in " "{object_store_name}object store{serialization_strategy_modifier}." ).format( value_name=value_name, object_store_name=object_store_name, serialization_strategy_modifier=serialization_strategy_modifier, ) elif ( ObjectStoreOperationType(object_store_operation_result.op) == ObjectStoreOperationType.GET_OBJECT ): message = ( "Retrieved intermediate object for input {value_name} in " "{object_store_name}object store{serialization_strategy_modifier}." ).format( value_name=value_name, object_store_name=object_store_name, serialization_strategy_modifier=serialization_strategy_modifier, ) elif ( ObjectStoreOperationType(object_store_operation_result.op) == ObjectStoreOperationType.CP_OBJECT ): message = ( "Copied intermediate object for input {value_name} from {key} to {dest_key}" ).format( value_name=value_name, key=object_store_operation_result.key, dest_key=object_store_operation_result.dest_key, ) else: message = "" return DagsterEvent.from_step( DagsterEventType.OBJECT_STORE_OPERATION, step_context, event_specific_data=ObjectStoreOperationResultData( op=object_store_operation_result.op, value_name=value_name, address=object_store_operation_result.key, metadata_entries=[ EventMetadataEntry.path(object_store_operation_result.key, label="key") ], version=object_store_operation_result.version, mapping_key=object_store_operation_result.mapping_key, ), message=message, ) @staticmethod def handled_output( step_context, output_name, manager_key, message_override=None, metadata_entries=None, ): check.str_param(output_name, "output_name") check.str_param(manager_key, "manager_key") message = f'Handled output "{output_name}" using IO manager "{manager_key}"' return DagsterEvent.from_step( event_type=DagsterEventType.HANDLED_OUTPUT, step_context=step_context, event_specific_data=HandledOutputData( output_name=output_name, manager_key=manager_key, metadata_entries=metadata_entries if metadata_entries else [], ), message=message_override or message, ) @staticmethod def loaded_input( step_context, input_name, manager_key, upstream_output_name=None, upstream_step_key=None, message_override=None, ): check.str_param(input_name, "input_name") check.str_param(manager_key, "manager_key") check.opt_str_param(upstream_output_name, "upstream_output_name") check.opt_str_param(upstream_step_key, "upstream_step_key") message = f'Loaded input "{input_name}" using input manager "{manager_key}"' if upstream_output_name: message += f', from output "{upstream_output_name}" of step ' f'"{upstream_step_key}"' return DagsterEvent.from_step( event_type=DagsterEventType.LOADED_INPUT, step_context=step_context, event_specific_data=LoadedInputData( input_name=input_name, manager_key=manager_key, upstream_output_name=upstream_output_name, upstream_step_key=upstream_step_key, ), message=message_override or message, ) @staticmethod def hook_completed( step_context: StepExecutionContext, hook_def: HookDefinition ) -> "DagsterEvent": event_type = DagsterEventType.HOOK_COMPLETED event = DagsterEvent( event_type_value=event_type.value, pipeline_name=step_context.pipeline_name, step_handle=step_context.step.handle, solid_handle=step_context.step.solid_handle, step_kind_value=step_context.step.kind.value, logging_tags=step_context.logging_tags, message=( 'Finished the execution of hook "{hook_name}" triggered for solid "{solid_name}".' ).format(hook_name=hook_def.name, solid_name=step_context.solid.name), ) step_context.log.debug( event.message, dagster_event=event, pipeline_name=step_context.pipeline_name, ) return event @staticmethod def hook_errored( step_context: StepExecutionContext, error: HookExecutionError ) -> "DagsterEvent": event_type = DagsterEventType.HOOK_ERRORED event = DagsterEvent( event_type_value=event_type.value, pipeline_name=step_context.pipeline_name, step_handle=step_context.step.handle, solid_handle=step_context.step.solid_handle, step_kind_value=step_context.step.kind.value, logging_tags=step_context.logging_tags, event_specific_data=_validate_event_specific_data( event_type, HookErroredData( error=serializable_error_info_from_exc_info(error.original_exc_info) ), ), ) step_context.log.error( str(error), dagster_event=event, pipeline_name=step_context.pipeline_name, ) return event @staticmethod def hook_skipped( step_context: StepExecutionContext, hook_def: HookDefinition ) -> "DagsterEvent": event_type = DagsterEventType.HOOK_SKIPPED event = DagsterEvent( event_type_value=event_type.value, pipeline_name=step_context.pipeline_name, step_handle=step_context.step.handle, solid_handle=step_context.step.solid_handle, step_kind_value=step_context.step.kind.value, logging_tags=step_context.logging_tags, message=( 'Skipped the execution of hook "{hook_name}". It did not meet its triggering ' 'condition during the execution of solid "{solid_name}".' ).format(hook_name=hook_def.name, solid_name=step_context.solid.name), ) step_context.log.debug( event.message, dagster_event=event, pipeline_name=step_context.pipeline_name, ) return event
def get_step_output_event(events, step_key, output_name="result"): check.list_param(events, "events", of_type=DagsterEvent) check.str_param(step_key, "step_key") check.str_param(output_name, "output_name") for event in events: if ( event.event_type == DagsterEventType.STEP_OUTPUT and event.step_key == step_key and event.step_output_data.output_name == output_name ): return event return None @whitelist_for_serdes class StepMaterializationData( namedtuple("_StepMaterializationData", "materialization asset_lineage") ): def __new__(cls, materialization, asset_lineage=None): return super(StepMaterializationData, cls).__new__( cls, materialization=materialization, asset_lineage=check.opt_list_param(asset_lineage, "asset_lineage", AssetLineageInfo), ) @whitelist_for_serdes class StepExpectationResultData(namedtuple("_StepExpectationResultData", "expectation_result")): pass @whitelist_for_serdes class ObjectStoreOperationResultData( namedtuple( "_ObjectStoreOperationResultData", "op value_name metadata_entries address version mapping_key", ) ): def __new__( cls, op, value_name, metadata_entries, address=None, version=None, mapping_key=None ): return super(ObjectStoreOperationResultData, cls).__new__( cls, op=check.opt_str_param(op, "op"), value_name=check.opt_str_param(value_name, "value_name"), metadata_entries=check.opt_list_param(metadata_entries, "metadata_entries"), address=check.opt_str_param(address, "address"), version=check.opt_str_param(version, "version"), mapping_key=check.opt_str_param(mapping_key, "mapping_key"), ) @whitelist_for_serdes class EngineEventData( namedtuple("_EngineEventData", "metadata_entries error marker_start marker_end") ): # serdes log # * added optional error # * added marker_start / marker_end # def __new__(cls, metadata_entries=None, error=None, marker_start=None, marker_end=None): return super(EngineEventData, cls).__new__( cls, metadata_entries=check.opt_list_param( metadata_entries, "metadata_entries", EventMetadataEntry ), error=check.opt_inst_param(error, "error", SerializableErrorInfo), marker_start=check.opt_str_param(marker_start, "marker_start"), marker_end=check.opt_str_param(marker_end, "marker_end"), ) @staticmethod def in_process(pid, step_keys_to_execute=None, marker_end=None): check.int_param(pid, "pid") check.opt_list_param(step_keys_to_execute, "step_keys_to_execute") return EngineEventData( metadata_entries=[EventMetadataEntry.text(str(pid), "pid")] + ( [EventMetadataEntry.text(str(step_keys_to_execute), "step_keys")] if step_keys_to_execute else [] ), marker_end=marker_end, ) @staticmethod def multiprocess(pid, step_keys_to_execute=None): check.int_param(pid, "pid") check.opt_list_param(step_keys_to_execute, "step_keys_to_execute") return EngineEventData( metadata_entries=[EventMetadataEntry.text(str(pid), "pid")] + ( [EventMetadataEntry.text(str(step_keys_to_execute), "step_keys")] if step_keys_to_execute else [] ) ) @staticmethod def interrupted(steps_interrupted): check.list_param(steps_interrupted, "steps_interrupted", str) return EngineEventData( metadata_entries=[EventMetadataEntry.text(str(steps_interrupted), "steps_interrupted")] ) @staticmethod def engine_error(error): check.inst_param(error, "error", SerializableErrorInfo) return EngineEventData(metadata_entries=[], error=error) @whitelist_for_serdes class PipelineInitFailureData(namedtuple("_PipelineInitFailureData", "error")): def __new__(cls, error): return super(PipelineInitFailureData, cls).__new__( cls, error=check.inst_param(error, "error", SerializableErrorInfo) ) @whitelist_for_serdes class PipelineFailureData(namedtuple("_PipelineFailureData", "error")): def __new__(cls, error): return super(PipelineFailureData, cls).__new__( cls, error=check.opt_inst_param(error, "error", SerializableErrorInfo) ) @whitelist_for_serdes class PipelineCanceledData(namedtuple("_PipelineCanceledData", "error")): def __new__(cls, error): return super(PipelineCanceledData, cls).__new__( cls, error=check.opt_inst_param(error, "error", SerializableErrorInfo) ) @whitelist_for_serdes class HookErroredData(namedtuple("_HookErroredData", "error")): def __new__(cls, error): return super(HookErroredData, cls).__new__( cls, error=check.inst_param(error, "error", SerializableErrorInfo), ) @whitelist_for_serdes class HandledOutputData( namedtuple("_HandledOutputData", "output_name manager_key metadata_entries") ): def __new__(cls, output_name, manager_key, metadata_entries=None): return super(HandledOutputData, cls).__new__( cls, output_name=check.str_param(output_name, "output_name"), manager_key=check.str_param(manager_key, "manager_key"), metadata_entries=check.opt_list_param( metadata_entries, "metadata_entries", EventMetadataEntry ), ) @whitelist_for_serdes class LoadedInputData( namedtuple("_LoadedInputData", "input_name manager_key upstream_output_name upstream_step_key") ): def __new__(cls, input_name, manager_key, upstream_output_name=None, upstream_step_key=None): return super(LoadedInputData, cls).__new__( cls, input_name=check.str_param(input_name, "input_name"), manager_key=check.str_param(manager_key, "manager_key"), upstream_output_name=check.opt_str_param(upstream_output_name, "upstream_output_name"), upstream_step_key=check.opt_str_param(upstream_step_key, "upstream_step_key"), ) ################################################################################################### # THE GRAVEYARD # # -|- -|- # | | # _-'~~~~~`-_ . _-'~~~~~`-_ # .' '. .' '. # | R I P | | R I P | # | | | | # | Synthetic | | Asset | # | Process | | Store | # | Events | | Operations | # | | | | ################################################################################################### # Keep these around to prevent issues like https://github.com/dagster-io/dagster/issues/3533 @whitelist_for_serdes class AssetStoreOperationData( namedtuple("_AssetStoreOperationData", "op step_key output_name asset_store_key") ): pass @whitelist_for_serdes class AssetStoreOperationType(Enum): SET_ASSET = "SET_ASSET" GET_ASSET = "GET_ASSET" def _handle_back_compat(event_type_value, event_specific_data): # transform old specific process events in to engine events if event_type_value == "PIPELINE_PROCESS_START": return DagsterEventType.ENGINE_EVENT.value, EngineEventData([]) elif event_type_value == "PIPELINE_PROCESS_STARTED": return DagsterEventType.ENGINE_EVENT.value, EngineEventData([]) elif event_type_value == "PIPELINE_PROCESS_EXITED": return DagsterEventType.ENGINE_EVENT.value, EngineEventData([]) # changes asset store ops in to get/set asset elif event_type_value == "ASSET_STORE_OPERATION": if event_specific_data.op in ("GET_ASSET", AssetStoreOperationType.GET_ASSET): return ( DagsterEventType.LOADED_INPUT.value, LoadedInputData( event_specific_data.output_name, event_specific_data.asset_store_key ), ) if event_specific_data.op in ("SET_ASSET", AssetStoreOperationType.SET_ASSET): return ( DagsterEventType.HANDLED_OUTPUT.value, HandledOutputData( event_specific_data.output_name, event_specific_data.asset_store_key, [] ), ) # previous name for ASSET_MATERIALIZATION was STEP_MATERIALIZATION if event_type_value == "STEP_MATERIALIZATION": return DagsterEventType.ASSET_MATERIALIZATION.value, event_specific_data else: return event_type_value, event_specific_data register_serdes_tuple_fallbacks( { "PipelineProcessStartedData": None, "PipelineProcessExitedData": None, "PipelineProcessStartData": None, } )