DagsterDocs

Source code for dagster_dbt.types

from collections import namedtuple
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Union

from dagster import check
from dateutil import parser


[docs]class StepTiming(namedtuple("_StepTiming", "name started_at completed_at")): """The timing information of an executed step for a dbt node (model). Note that users should not construct instances of this class directly. This class is intended to be constructed from the JSON output of dbt commands. Attributes: name (str): The name of the executed step. started_at (datetime.datetime): An ISO string timestamp of when the step started executing. completed_at (datetime.datetime): An ISO string timestamp of when the step completed execution. """ def __new__(cls, name: str, started_at: datetime, completed_at: datetime): return super().__new__( cls, check.str_param(name, "name"), check.inst_param(started_at, "started_at", datetime), check.inst_param(completed_at, "completed_at", datetime), ) @property def duration(self) -> timedelta: """datetime.timedelta: The execution duration of the step.""" return self.completed_at - self.started_at
[docs]class NodeResult( namedtuple( "_NodeResult", "node error status execution_time thread_id step_timings table fail warn skip", ), ): """The result of executing a dbt node (model). Note that users should not construct instances of this class directly. This class is intended to be constructed from the JSON output of dbt commands. Attributes: node (Dict[str, Any]): Details about the executed dbt node (model). error (Optional[str]): An error message if an error occurred. fail (Optional[Any]): The ``fail`` field from the results of the executed dbt node. warn (Optional[Any]): The ``warn`` field from the results of the executed dbt node. skip (Optional[Any]): The ``skip`` field from the results of the executed dbt node. status (Optional[Union[str,int]]): The status of the executed dbt node (model). execution_time (float): The execution duration (in seconds) of the dbt node (model). thread_id (str): The dbt thread identifier that executed the dbt node (model). step_timings (List[StepTiming]): The timings for each step in the executed dbt node (model). table (Optional[Dict]): Details about the table/view that is created from executing a `run_sql <https://docs.getdbt.com/reference/commands/rpc#executing-a-query>`_ command on an dbt RPC server. """ def __new__( cls, node: Optional[Dict[str, Any]] = None, error: Optional[str] = None, status: Optional[Union[str, int]] = None, execution_time: Optional[float] = None, thread_id: Optional[str] = None, step_timings: List[StepTiming] = None, table: Optional[Dict[str, Any]] = None, fail: Optional[Any] = None, warn: Optional[Any] = None, skip: Optional[Any] = None, ): step_timings = check.list_param(step_timings, "step_timings", of_type=StepTiming) return super().__new__( cls, check.opt_dict_param(node, "node", key_type=str), check.opt_str_param(error, "error"), status, check.opt_float_param(execution_time, "execution_time"), check.opt_str_param(thread_id, "thread_id"), step_timings, check.opt_dict_param(table, "table"), fail, warn, skip, )
[docs] @classmethod def from_dict(cls, d: Dict[str, Any]) -> "NodeResult": """Constructs an instance of :class:`NodeResult <dagster_dbt.NodeResult>` from a dictionary. Args: d (Dict[str, Any]): A dictionary with key-values to construct a :class:`NodeResult <dagster_dbt.NodeResult>`. Returns: NodeResult: An instance of :class:`NodeResult <dagster_dbt.NodeResult>`. """ node = check.opt_dict_elem(d, "node") error = check.opt_str_elem(d, "error") execution_time = check.float_elem(d, "execution_time") thread_id = check.opt_str_elem(d, "thread_id") check.list_elem(d, "timing") step_timings = [ StepTiming( name=d["name"], started_at=parser.isoparse(d["started_at"]), completed_at=parser.isoparse(d["completed_at"]), ) for d in check.is_list(d["timing"], of_type=Dict) ] table = check.opt_dict_elem(d, "table") return cls( step_timings=step_timings, node=node, error=error, execution_time=execution_time, thread_id=thread_id, table=table, )
[docs]class DbtResult(namedtuple("_DbtResult", "logs results generated_at elapsed_time")): """The results of executing a dbt command. Note that users should not construct instances of this class directly. This class is intended to be constructed from the JSON output of dbt commands. Attributes: logs (List[Dict[str, Any]]): JSON log output from the dbt process. results (List[NodeResult]]): Details about each executed dbt node (model) in the run. generated_at (str): An ISO string timestamp of when the run result was generated by dbt. elapsed_time (float): The execution duration (in seconds) of the run. """ def __new__( cls, logs: List[Dict[str, Any]], results: List[NodeResult], generated_at: Optional[str] = None, elapsed_time: Optional[float] = None, ): return super().__new__( cls, check.list_param(logs, "logs", of_type=Dict), results, check.opt_str_param(generated_at, "generated_at"), check.opt_float_param(elapsed_time, "elapsed_time"), )
[docs] @classmethod def from_dict(cls, d: Dict[str, Any]) -> "DbtResult": """Constructs an instance of :class:`DbtResult <dagster_dbt.DbtResult>` from a dictionary. Args: d (Dict[str, Any]): A dictionary with key-values to construct a :class:`DbtResult <dagster_dbt.DbtResult>`. Returns: DbtResult: An instance of :class:`DbtResult <dagster_dbt.DbtResult>`. """ check.list_elem(d, "logs") logs = check.is_list(d["logs"], of_type=Dict) check.list_elem(d, "results") results = [NodeResult.from_dict(d) for d in check.is_list(d["results"], of_type=Dict)] generated_at = check.opt_str_elem(d, "generated_at") elapsed_time = check.float_elem(d, "elapsed_time") return cls( logs=logs, results=results, generated_at=generated_at, elapsed_time=elapsed_time, )
def __len__(self) -> int: return len(self.results)