DagsterDocs

Source code for dagster_dbt.cli.solids

from typing import Dict

from dagster import (
    Array,
    AssetMaterialization,
    Bool,
    EventMetadataEntry,
    InputDefinition,
    Noneable,
    Nothing,
    Output,
    OutputDefinition,
    Permissive,
    StringSource,
    solid,
)
from dagster.config.field import Field
from dagster.utils.backcompat import experimental

from .types import DbtCliOutput
from .utils import execute_cli, parse_run_results

DEFAULT_DBT_EXECUTABLE = "dbt"

# The following config fields correspond to flags that apply to all dbt CLI commands. For details
# on dbt CLI flags, see
# https://github.com/fishtown-analytics/dbt/blob/1f8e29276e910c697588c43f08bc881379fff178/core/dbt/main.py#L260-L329
CLI_COMMON_FLAGS_CONFIG_SCHEMA = {
    "project-dir": Field(
        config=StringSource,
        is_required=False,
        description=(
            "Which directory to look in for the dbt_project.yml file. Default is the current "
            "working directory and its parents."
        ),
    ),
    "profiles-dir": Field(
        config=StringSource,
        is_required=False,
        description=(
            "Which directory to look in for the profiles.yml file. Default = $DBT_PROFILES_DIR or "
            "$HOME/.dbt"
        ),
    ),
    "profile": Field(
        config=StringSource,
        is_required=False,
        description="Which profile to load. Overrides setting in dbt_project.yml.",
    ),
    "target": Field(
        config=StringSource,
        is_required=False,
        description="Which target to load for the given profile.",
    ),
    "vars": Field(
        config=Permissive({}),
        is_required=False,
        description=(
            "Supply variables to the project. This argument overrides variables defined in your "
            "dbt_project.yml file. This argument should be a dictionary, eg. "
            "{'my_variable': 'my_value'}"
        ),
    ),
    "bypass-cache": Field(
        config=bool,
        is_required=False,
        description="If set, bypass the adapter-level cache of database state",
        default_value=False,
    ),
}

# The following config fields correspond to options that apply to all CLI solids, but should not be
# formatted as CLI flags.
CLI_COMMON_OPTIONS_CONFIG_SCHEMA = {
    "warn-error": Field(
        config=bool,
        is_required=False,
        description=(
            "If dbt would normally warn, instead raise an exception. Examples include --models "
            "that selects nothing, deprecations, configurations with no associated models, "
            "invalid test configurations, and missing sources/refs in tests."
        ),
        default_value=False,
    ),
    "dbt_executable": Field(
        config=StringSource,
        is_required=False,
        description="Path to the dbt executable. Default is {}".format(DEFAULT_DBT_EXECUTABLE),
        default_value=DEFAULT_DBT_EXECUTABLE,
    ),
    "ignore_handled_error": Field(
        config=bool,
        is_required=False,
        description=(
            "When True, will not raise an exception when the dbt CLI returns error code 1. "
            "Default is False."
        ),
        default_value=False,
    ),
}

CLI_CONFIG_SCHEMA = {**CLI_COMMON_FLAGS_CONFIG_SCHEMA, **CLI_COMMON_OPTIONS_CONFIG_SCHEMA}

CLI_COMMON_FLAGS = set(CLI_COMMON_FLAGS_CONFIG_SCHEMA.keys())


def passthrough_flags_only(solid_config, additional_flags):
    return {
        flag: solid_config[flag]
        for flag in (CLI_COMMON_FLAGS | set(additional_flags))
        if solid_config.get(flag) is not None
    }


[docs]@solid( description="A solid to invoke dbt run via CLI.", input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], output_defs=[OutputDefinition(name="dbt_output", dagster_type=DbtCliOutput)], config_schema={ **CLI_CONFIG_SCHEMA, "threads": Field( config=Noneable(int), default_value=None, is_required=False, description=( "Specify number of threads to use while executing models. Overrides settings " "in profiles.yml." ), ), "models": Field( config=Noneable([str]), default_value=None, is_required=False, description="The dbt models to run.", ), "exclude": Field( config=Noneable([str]), default_value=None, is_required=False, description="The dbt models to exclude.", ), "full-refresh": Field( config=bool, description=( "If specified, DBT will drop incremental models and fully-recalculate the " "incremental table from the model definition. (--full-refresh)" ), is_required=False, default_value=False, ), "fail-fast": Field( config=bool, description="Stop execution upon a first failure. (--fail-fast)", is_required=False, default_value=False, ), "yield_materializations": Field( config=Bool, is_required=False, default_value=True, description=( "If True, materializations corresponding to the results of the dbt operation will " "be yielded when the solid executes. Default: True" ), ), "asset_key_prefix": Field( config=Array(str), is_required=False, default_value=[], description=( "If provided and yield_materializations is True, these components will be used to " "prefix the generated asset keys." ), ), }, tags={"kind": "dbt"}, ) @experimental def dbt_cli_run(context) -> DbtCliOutput: """This solid executes ``dbt run`` via the dbt CLI.""" from ..utils import generate_materializations cli_output = execute_cli( context.solid_config["dbt_executable"], command=("run",), flags_dict=passthrough_flags_only( context.solid_config, ("threads", "models", "exclude", "full-refresh", "fail-fast") ), log=context.log, warn_error=context.solid_config["warn-error"], ignore_handled_error=context.solid_config["ignore_handled_error"], ) run_results = parse_run_results(context.solid_config["project-dir"]) cli_output_dict = {**run_results, **cli_output} cli_output = DbtCliOutput.from_dict(cli_output_dict) if context.solid_config["yield_materializations"]: for materialization in generate_materializations( cli_output, asset_key_prefix=context.solid_config["asset_key_prefix"] ): yield materialization yield AssetMaterialization( asset_key="dbt_run_cli_output", description="Output from the CLI execution of `dbt run`.", metadata_entries=[EventMetadataEntry.json(cli_output_dict, label="CLI Output")], ) yield Output(cli_output, output_name="dbt_output")
[docs]@solid( description="A solid to invoke dbt test via CLI.", input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], output_defs=[OutputDefinition(name="dbt_output", dagster_type=DbtCliOutput)], config_schema={ **CLI_CONFIG_SCHEMA, "data": Field( config=bool, description='Run data tests defined in "tests" directory.', is_required=False, default_value=False, ), "schema": Field( config=bool, description="Run constraint validations from schema.yml files.", is_required=False, default_value=False, ), "fail-fast": Field( config=bool, description="Stop execution upon a first test failure.", is_required=False, default_value=False, ), "threads": Field( config=Noneable(int), default_value=None, is_required=False, description=( "Specify number of threads to use while executing models. Overrides settings " "in profiles.yml." ), ), "models": Field( config=Noneable([str]), default_value=None, is_required=False, description="The dbt models to run.", ), "exclude": Field( config=Noneable([str]), default_value=None, is_required=False, description="The dbt models to exclude.", ), "yield_materializations": Field( config=Bool, is_required=False, default_value=True, description=( "If True, materializations corresponding to the results of the dbt operation will " "be yielded when the solid executes. Default: True" ), ), }, tags={"kind": "dbt"}, ) @experimental def dbt_cli_test(context) -> DbtCliOutput: """This solid executes ``dbt test`` via the dbt CLI.""" cli_output = execute_cli( context.solid_config["dbt_executable"], command=("test",), flags_dict=passthrough_flags_only( context.solid_config, ("data", "schema", "fail-fast", "threads", "models", "exclude") ), log=context.log, warn_error=context.solid_config["warn-error"], ignore_handled_error=context.solid_config["ignore_handled_error"], ) run_results = parse_run_results(context.solid_config["project-dir"]) cli_output = {**run_results, **cli_output} if context.solid_config["yield_materializations"]: yield AssetMaterialization( asset_key="dbt_test_cli_output", description="Output from the CLI execution of `dbt test`.", metadata_entries=[EventMetadataEntry.json(cli_output, label="CLI Output")], ) yield Output(DbtCliOutput.from_dict(cli_output), output_name="dbt_output")
[docs]@solid( description="A solid to invoke dbt snapshot via CLI.", input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], output_defs=[OutputDefinition(dagster_type=Dict)], config_schema={ **CLI_CONFIG_SCHEMA, "threads": Field( config=Noneable(int), default_value=None, is_required=False, description=( "Specify number of threads to use while executing models. Overrides settings in " "profiles.yml." ), ), "select": Field( config=Noneable([str]), default_value=None, is_required=False, description="The dbt models to include.", ), "exclude": Field( config=Noneable([str]), default_value=None, is_required=False, description="The dbt models to exclude.", ), "yield_materializations": Field( config=Bool, is_required=False, default_value=True, description=( "If True, materializations corresponding to the results of the dbt operation will " "be yielded when the solid executes. Default: True" ), ), }, tags={"kind": "dbt"}, ) @experimental def dbt_cli_snapshot(context) -> Dict: """This solid executes ``dbt snapshot`` via the dbt CLI.""" cli_output = execute_cli( context.solid_config["dbt_executable"], command=("snapshot",), flags_dict=passthrough_flags_only(context.solid_config, ("threads", "select", "exclude")), log=context.log, warn_error=context.solid_config["warn-error"], ignore_handled_error=context.solid_config["ignore_handled_error"], ) if context.solid_config["yield_materializations"]: yield AssetMaterialization( asset_key="dbt_snapshot_cli_output", description="Output from the CLI execution of `dbt snapshot`.", metadata_entries=[EventMetadataEntry.json(cli_output, label="CLI Output")], ) yield Output(cli_output)
[docs]@solid( description="A solid to invoke dbt run-operation via CLI.", input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], output_defs=[OutputDefinition(name="result", dagster_type=Dict)], config_schema={ **CLI_CONFIG_SCHEMA, "macro": Field( config=StringSource, description=( "Specify the macro to invoke. dbt will call this macro with the supplied " "arguments and then exit." ), ), "args": Field( config=Permissive({}), is_required=False, description=( "Supply arguments to the macro. This dictionary will be mapped to the keyword " "arguments defined in the selected macro. This argument should be a dictionary, " "eg. {'my_variable': 'my_value'}" ), ), "yield_materializations": Field( config=Bool, is_required=False, default_value=True, description=( "If True, materializations corresponding to the results of the dbt operation will " "be yielded when the solid executes. Default: True" ), ), }, tags={"kind": "dbt"}, ) @experimental def dbt_cli_run_operation(context) -> Dict: """This solid executes ``dbt run-operation`` via the dbt CLI.""" cli_output = execute_cli( context.solid_config["dbt_executable"], command=("run-operation", context.solid_config["macro"]), flags_dict=passthrough_flags_only(context.solid_config, ("args",)), log=context.log, warn_error=context.solid_config["warn-error"], ignore_handled_error=context.solid_config["ignore_handled_error"], ) if context.solid_config["yield_materializations"]: yield AssetMaterialization( asset_key="dbt_run_operation_cli_output", description="Output from the CLI execution of `dbt run-operation`.", metadata_entries=[EventMetadataEntry.json(cli_output, label="CLI Output")], ) yield Output(cli_output)
[docs]@solid( description="A solid to invoke dbt source snapshot-freshness via CLI.", input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], output_defs=[OutputDefinition(name="result", dagster_type=Dict)], config_schema={ **CLI_CONFIG_SCHEMA, "select": Field( config=Noneable([str]), default_value=None, is_required=False, description="Specify the sources to snapshot freshness.", ), "output": Field( config=StringSource, is_required=False, description=( "Specify the output path for the json report. By default, outputs to " "target/sources.json" ), ), "threads": Field( config=Noneable(int), default_value=None, is_required=False, description=( "Specify number of threads to use while executing models. Overrides " "settings in profiles.yml." ), ), "yield_materializations": Field( config=Bool, is_required=False, default_value=True, description=( "If True, materializations corresponding to the results of the dbt operation will " "be yielded when the solid executes. Default: True" ), ), }, tags={"kind": "dbt"}, ) @experimental def dbt_cli_snapshot_freshness(context) -> Dict: """This solid executes ``dbt source snapshot-freshness`` via the dbt CLI.""" cli_output = execute_cli( context.solid_config["dbt_executable"], command=("source", "snapshot-freshness"), flags_dict=passthrough_flags_only(context.solid_config, ("select", "output", "threads")), log=context.log, warn_error=context.solid_config["warn-error"], ignore_handled_error=context.solid_config["ignore_handled_error"], ) if context.solid_config["yield_materializations"]: yield AssetMaterialization( asset_key="dbt_source_snapshot-freshness_cli_output", description="Output from the CLI execution of `dbt source snapshot-freshness`.", metadata_entries=[EventMetadataEntry.json(cli_output, label="CLI Output")], ) yield Output(cli_output)
[docs]@solid( description="A solid to invoke dbt compile via CLI.", input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], output_defs=[OutputDefinition(name="result", dagster_type=Dict)], config_schema={ **CLI_CONFIG_SCHEMA, "parse-only": Field( config=bool, is_required=False, default_value=False, ), "threads": Field( config=Noneable(int), default_value=None, is_required=False, description=( "Specify number of threads to use while executing models. Overrides settings " "in profiles.yml." ), ), "no-version-check": Field( config=bool, description=( "Skip the check that dbt's version matches the one specified in the " "dbt_project.yml file ('require-dbt-version')" ), is_required=False, default_value=False, ), "models": Field( config=Noneable([str]), default_value=None, is_required=False, description="The dbt models to run.", ), "exclude": Field( config=Noneable([str]), default_value=None, is_required=False, description="The dbt models to exclude.", ), "selector": Field( config=Noneable([str]), default_value=None, is_required=False, description="The selector name to use, as defined in your selectors.yml", ), "state": Field( config=Noneable([str]), default_value=None, is_required=False, description=( "If set, use the given directory as the source for json files to compare with " "this project." ), ), "full-refresh": Field( config=bool, description=( "If specified, DBT will drop incremental models and fully-recalculate " "the incremental table from the model definition. (--full-refresh)" ), is_required=False, default_value=False, ), "yield_materializations": Field( config=Bool, is_required=False, default_value=True, description=( "If True, materializations corresponding to the results of the dbt operation will " "be yielded when the solid executes. Default: True" ), ), }, tags={"kind": "dbt"}, ) @experimental def dbt_cli_compile(context) -> Dict: """This solid executes ``dbt compile`` via the dbt CLI.""" cli_output = execute_cli( context.solid_config["dbt_executable"], command=("compile",), flags_dict=passthrough_flags_only( context.solid_config, ( "parse-only", "threads", "no-version-check", "models", "exclude", "selector", "state", "full-refresh", ), ), log=context.log, warn_error=context.solid_config["warn-error"], ignore_handled_error=context.solid_config["ignore_handled_error"], ) if context.solid_config["yield_materializations"]: yield AssetMaterialization( asset_key="dbt_compile_cli_output", description="Output from the CLI execution of `dbt compile`.", metadata_entries=[EventMetadataEntry.json(cli_output, label="CLI Output")], ) yield Output(cli_output)
@solid( description="A solid to invoke dbt docs generate via CLI.", input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], output_defs=[OutputDefinition(name="result", dagster_type=Dict)], config_schema={ **CLI_CONFIG_SCHEMA, "threads": Field( config=Noneable(int), default_value=None, is_required=False, description=( "Specify number of threads to use while executing models. Overrides settings " "in profiles.yml." ), ), "no-version-check": Field( config=bool, description=( "Skip the check that dbt's version matches the one specified in the " "dbt_project.yml file ('require-dbt-version')" ), is_required=False, default_value=False, ), "models": Field( config=Noneable([str]), default_value=None, is_required=False, description="The dbt models to run.", ), "exclude": Field( config=Noneable([str]), default_value=None, is_required=False, description="The dbt models to exclude.", ), "selector": Field( config=Noneable([str]), default_value=None, is_required=False, description="The selector name to use, as defined in your selectors.yml", ), "state": Field( config=Noneable([str]), default_value=None, is_required=False, description=( "If set, use the given directory as the source for json files to compare with " "this project." ), ), "yield_materializations": Field( config=Bool, is_required=False, default_value=True, description=( "If True, materializations corresponding to the results of the dbt operation will " "be yielded when the solid executes. Default: True" ), ), }, tags={"kind": "dbt"}, ) @experimental def dbt_cli_docs_generate(context) -> Dict: """This solid executes ``dbt docs generate`` via the dbt CLI.""" cli_output = execute_cli( context.solid_config["dbt_executable"], command=( "docs", "generate", ), flags_dict=passthrough_flags_only( context.solid_config, ( "threads", "no-version-check", "models", "exclude", "selector", "state", ), ), log=context.log, warn_error=context.solid_config["warn-error"], ignore_handled_error=context.solid_config["ignore_handled_error"], ) if context.solid_config["yield_materializations"]: yield AssetMaterialization( asset_key="dbt_docs_generate_cli_output", description="Output from the CLI execution of `dbt docs generate`.", metadata_entries=[EventMetadataEntry.json(cli_output, label="CLI Output")], ) yield Output(cli_output) @solid( description="A solid to invoke dbt seed via CLI.", input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], output_defs=[OutputDefinition(name="result", dagster_type=Dict)], config_schema={ **CLI_CONFIG_SCHEMA, "full-refresh": Field( config=bool, default_value=False, is_required=False, description=("Drop existing seed tables and recreate them."), ), "show": Field( config=bool, default_value=False, is_required=False, description=("Show a sample of the loaded data in the terminal."), ), "threads": Field( config=Noneable(int), default_value=None, is_required=False, description=( "Specify number of threads to use while executing models. Overrides settings " "in profiles.yml." ), ), "no-version-check": Field( config=bool, description=( "Skip the check that dbt's version matches the one specified in the " "dbt_project.yml file ('require-dbt-version')" ), is_required=False, default_value=False, ), "select": Field( config=Noneable([str]), default_value=None, is_required=False, description="Specify the nodes to include.", ), "exclude": Field( config=Noneable([str]), default_value=None, is_required=False, description="The dbt models to exclude.", ), "selector": Field( config=Noneable([str]), default_value=None, is_required=False, description="The selector name to use, as defined in your selectors.yml", ), "state": Field( config=Noneable([str]), default_value=None, is_required=False, description=( "If set, use the given directory as the source for json files to compare with " "this project." ), ), "yield_materializations": Field( config=Bool, is_required=False, default_value=True, description=( "If True, materializations corresponding to the results of the dbt operation will " "be yielded when the solid executes. Default: True" ), ), }, tags={"kind": "dbt"}, ) @experimental def dbt_cli_seed(context) -> Dict: """This solid executes ``dbt seed`` via the dbt CLI.""" cli_output = execute_cli( context.solid_config["dbt_executable"], command=("seed",), flags_dict=passthrough_flags_only( context.solid_config, ( "full-refresh", "show", "threads", "no-version-check", "select", "exclude", "selector", "state", ), ), log=context.log, warn_error=context.solid_config["warn-error"], ignore_handled_error=context.solid_config["ignore_handled_error"], ) if context.solid_config["yield_materializations"]: yield AssetMaterialization( asset_key="dbt_seed_cli_output", description="Output from the CLI execution of `dbt seed`.", metadata_entries=[EventMetadataEntry.json(cli_output, label="CLI Output")], ) yield Output(cli_output)