Skip to content

dbt utils

dbt utilities.

DbtLsException

Bases: Exception

Raised when dbt ls fails.

Source code in src/dbt_score/dbt_utils.py
class DbtLsException(Exception):
    """Raised when dbt ls fails."""

DbtParseException

Bases: Exception

Raised when dbt parse fails.

Source code in src/dbt_score/dbt_utils.py
class DbtParseException(Exception):
    """Raised when dbt parse fails."""

dbt_ls(select)

Run dbt ls.

Source code in src/dbt_score/dbt_utils.py
def dbt_ls(select: Iterable[str] | None) -> Iterable[str]:
    """Run dbt ls."""
    cmd = ["ls", "--resource-type", "model", "--output", "name"]
    if select:
        cmd += ["--select", *select]

    with _disable_dbt_stdout():
        result: dbtRunnerResult = dbtRunner().invoke(cmd)

    if not result.success:
        raise DbtLsException("dbt ls failed.") from result.exception

    selected = cast(Iterable[str], result.result)  # mypy hint
    return selected

dbt_parse()

Parse a dbt project.

Returns:

Type Description
dbtRunnerResult

The dbt parse run result.

Raises:

Type Description
DbtParseException

dbt parse failed.

Source code in src/dbt_score/dbt_utils.py
def dbt_parse() -> dbtRunnerResult:
    """Parse a dbt project.

    Returns:
        The dbt parse run result.

    Raises:
        DbtParseException: dbt parse failed.
    """
    with _disable_dbt_stdout():
        result: dbtRunnerResult = dbtRunner().invoke(["parse"])

    if not result.success:
        raise DbtParseException("dbt parse failed.") from result.exception

    return result

get_default_manifest_path()

Get the manifest path.

Source code in src/dbt_score/dbt_utils.py
def get_default_manifest_path() -> Path:
    """Get the manifest path."""
    return (
        Path().cwd()
        / os.getenv("DBT_PROJECT_DIR", "")
        / os.getenv("DBT_TARGET_DIR", "target")
        / "manifest.json"
    )