Data

Databricks

Referência completa de Databricks — clusters, notebooks, Unity Catalog, Delta Live Tables, Workflows, MLflow e boas práticas de engenharia de dados

Fundamentos Databricks

Databricks vs Apache Spark puro

AspectoApache Spark puroDatabricks
InstalaçãoManual (cluster manager próprio)Gerenciado, zero-ops
NotebooksNão incluiColaborativos, versionados
OtimizaçõesCommunityPhoton, Delta Engine, Liquid Clustering
Delta LakeOpcional (biblioteca)Nativo, default
SegurançaBásicaUnity Catalog, RBAC granular
MLflowBiblioteca separadaIntegrado, Model Registry
CustoInfraestrutura própriaDBU + cloud compute
SuporteCommunity / pagoEnterprise SLA

Lakehouse Platform

Unifica Data Warehouse (SQL + BI) com Data Lake (arquivos brutos, ML) sobre Delta Lake — formato aberto Parquet + transaction log.

Componentes principais

  • Workspace — interface web, notebooks, jobs, clusters, repos
  • Cluster — pool de VMs com Spark; All-Purpose ou Job Cluster
  • Notebook — células multi-linguagem (Python, SQL, Scala, R)
  • DBFS — Databricks File System; abstração sobre blob storage
  • Unity Catalog — governança centralizada multi-workspace
  • Workflows — orquestração de jobs com DAG de tarefas
  • MLflow — tracking de experimentos, Model Registry, serving
  • Delta Sharing — compartilhamento seguro de dados entre orgs

Planos e Clouds

  • Planos: Standard, Premium (Unity Catalog + RBAC), Enterprise
  • Clouds: AWS (us-east-1, etc.), Azure (East US, etc.), GCP
  • Conta de cloud separada — Databricks cobra DBU; VMs cobradas pelo cloud provider

Clusters

All-Purpose vs Job Clusters

CaracterísticaAll-PurposeJob Cluster
UsoDesenvolvimento interativoExecução de jobs automatizados
Ciclo de vidaManual (start/stop)Criado e destruído com o job
CustoMais caro (DBU/h enquanto ligado)Mais barato por execução
ConcorrênciaMúltiplos usuáriosIsolado por job/run
Quando usarExploração, notebooks ad-hocProdução, pipelines agendados

Databricks Runtime (DBR)

  • Standard — Spark + Delta Lake
  • ML — Standard + PyTorch, TensorFlow, scikit-learn, MLflow
  • Photon — Standard + engine C++ para SQL/Delta; melhor para cargas analíticas
  • LTS (Long Term Support) — versão estável por 2 anos; use em produção

Configuração JSON de cluster com autoscaling

{
  "cluster_name": "data-engineering-prod",
  "spark_version": "15.4.x-scala2.12",
  "node_type_id": "i3.xlarge",
  "driver_node_type_id": "i3.xlarge",
  "autoscale": {
    "min_workers": 2,
    "max_workers": 10
  },
  "aws_attributes": {
    "availability": "SPOT_WITH_FALLBACK",
    "spot_bid_price_percent": 100
  },
  "spark_conf": {
    "spark.databricks.delta.optimizeWrite.enabled": "true",
    "spark.databricks.delta.autoCompact.enabled": "true"
  },
  "custom_tags": {
    "Team": "data-engineering",
    "CostCenter": "analytics",
    "Environment": "prod"
  },
  "init_scripts": [
    {
      "workspace": {
        "destination": "/Shared/init/install-libs.sh"
      }
    }
  ]
}

Init script exemplo

#!/bin/bash
pip install great-expectations==0.18.0
pip install dbt-databricks==1.8.0

Cluster tags para FinOps

Tags propagam para VMs do cloud provider — use Team, CostCenter, Environment, Project para atribuição de custo no AWS Cost Explorer / Azure Cost Management.

Single Node vs Multi-Node

  • Single Node — driver sem workers; útil para pandas, ML single-machine, testes locais
  • Multi-Node — driver + N workers; necessário para Spark distribuído real

Notebooks

Magic commands

# Trocar linguagem na célula
%python   # Python (padrão)
%sql      # Spark SQL
%scala    # Scala
%r        # R
%sh       # Shell bash no driver
%fs       # Atalho para dbutils.fs
%md       # Markdown (documentação)
%pip      # Instalar pacotes Python na sessão
# %pip — instala em todos os nós do cluster
%pip install faker==24.0.0

Widgets (dbutils.widgets)

# Criar widgets
dbutils.widgets.text("env", "dev", "Ambiente")
dbutils.widgets.dropdown("table", "orders", ["orders", "customers", "products"])
dbutils.widgets.combobox("date", "2024-01-01", ["2024-01-01", "2024-02-01"])
dbutils.widgets.multiselect("regions", "BR", ["BR", "US", "EU"])

# Ler valor
env = dbutils.widgets.get("env")
table = dbutils.widgets.get("table")

# Remover
dbutils.widgets.remove("env")
dbutils.widgets.removeAll()

Passar variáveis entre linguagens via spark.conf

# Célula Python
spark.conf.set("pipeline.run_date", "2024-06-01")
spark.conf.set("pipeline.env", "prod")
-- Célula SQL — ler variável
SELECT *
FROM catalog_prod.bronze.orders
WHERE order_date = '${pipeline.run_date}'

dbutils.notebook — orquestração

# Executar notebook filho e capturar resultado
result = dbutils.notebook.run(
    "/Shared/pipelines/process_orders",
    timeout_seconds=600,
    arguments={"env": "prod", "date": "2024-06-01"}
)
print(result)  # string retornada com dbutils.notebook.exit("ok")

# %run — executa inline (variáveis ficam no escopo)
# %run ../utils/helpers   (sem aspas, sem .py)

Git integration

Repos conectam notebooks diretamente ao Git (GitHub, GitLab, Bitbucket, Azure DevOps). Commitar, criar branches e fazer PR direto da UI do Workspace.


DBFS e dbutils

dbutils.fs

# Listar arquivos
display(dbutils.fs.ls("dbfs:/mnt/datalake/raw/"))
display(dbutils.fs.ls("abfss://container@account.dfs.core.windows.net/"))

# Operações
dbutils.fs.cp("dbfs:/source/file.parquet", "dbfs:/dest/file.parquet")
dbutils.fs.mv("dbfs:/old/path/", "dbfs:/new/path/", recurse=True)
dbutils.fs.rm("dbfs:/tmp/old-data/", recurse=True)
dbutils.fs.mkdirs("dbfs:/mnt/datalake/processed/orders/")

# Ler arquivo texto
content = dbutils.fs.head("dbfs:/mnt/configs/pipeline.json", maxBytes=1000)

dbutils.secrets — Secret Scopes

# Ler secret (valor nunca aparece em logs)
storage_key = dbutils.secrets.get(scope="azure-kv", key="adls-access-key")
db_password = dbutils.secrets.get(scope="databricks-secrets", key="postgres-pwd")

# Listar scopes e secrets disponíveis
dbutils.secrets.listScopes()
dbutils.secrets.list(scope="azure-kv")

Criar scope via CLI:

# Databricks-backed scope
databricks secrets create-scope --scope databricks-secrets

# Azure Key Vault-backed scope
databricks secrets create-scope \
  --scope azure-kv \
  --scope-backend-type AZURE_KEYVAULT \
  --resource-id "/subscriptions/.../vaults/my-vault" \
  --dns-name "https://my-vault.vault.azure.net/"

# Adicionar secret
databricks secrets put-secret databricks-secrets postgres-pwd --string-value "s3cr3t"

Mount ADLS Gen2 com Service Principal (legado)

configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": dbutils.secrets.get("azure-kv", "sp-client-id"),
    "fs.azure.account.oauth2.client.secret": dbutils.secrets.get("azure-kv", "sp-client-secret"),
    "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token",
}

dbutils.fs.mount(
    source="abfss://raw@mystorageaccount.dfs.core.windows.net/",
    mount_point="/mnt/datalake/raw",
    extra_configs=configs
)

Moderno: prefira UC External Locations + Volumes — sem mount, sem credenciais no notebook.

UC Volumes (substituto moderno)

-- Criar volume e usar diretamente
CREATE VOLUME catalog_prod.bronze.raw_files;

-- Acessar via path
-- /Volumes/catalog_prod/bronze/raw_files/orders/2024-06-01/
df = spark.read.json("/Volumes/catalog_prod/bronze/raw_files/events/")

Unity Catalog

Hierarquia

Metastore (1 por região)
  └── Catalog
        └── Schema (= Database)
              ├── Table (managed ou external)
              ├── View
              └── Volume (arquivos não-tabulares)

DDL essencial

-- Catalog
CREATE CATALOG IF NOT EXISTS catalog_prod
  COMMENT 'Dados de produção';

-- Schema
CREATE SCHEMA IF NOT EXISTS catalog_prod.bronze
  COMMENT 'Camada raw/ingestão';

-- Managed table (UC gerencia os dados)
CREATE TABLE catalog_prod.silver.orders (
  order_id BIGINT NOT NULL,
  customer_id BIGINT,
  amount DECIMAL(18,2),
  status STRING,
  created_at TIMESTAMP
)
USING DELTA
COMMENT 'Pedidos normalizados';

-- External table (dados em storage externo)
CREATE TABLE catalog_prod.bronze.raw_events
USING PARQUET
LOCATION 'abfss://raw@account.dfs.core.windows.net/events/'

GRANT/REVOKE granular

-- Catalog
GRANT USE CATALOG ON CATALOG catalog_prod TO `data-engineers`;

-- Schema
GRANT USE SCHEMA, CREATE TABLE ON SCHEMA catalog_prod.silver TO `data-engineers`;

-- Tabela
GRANT SELECT ON TABLE catalog_prod.gold.kpis TO `data-analysts`;
GRANT MODIFY ON TABLE catalog_prod.silver.orders TO `data-engineers`;

-- Column-level security
GRANT SELECT (order_id, amount, status) ON TABLE catalog_prod.silver.orders TO `bi-team`;

-- Row-level security via dynamic view
CREATE VIEW catalog_prod.silver.orders_filtered AS
SELECT * FROM catalog_prod.silver.orders
WHERE region = current_user_region();  -- função customizada

REVOKE MODIFY ON TABLE catalog_prod.silver.orders FROM `analysts`;

Lineage e Audit

-- Auditoria via system tables
SELECT event_time, user_identity, request_params
FROM system.access.audit
WHERE service_name = 'unityCatalog'
  AND action_name IN ('getTable', 'createTable')
ORDER BY event_time DESC
LIMIT 100;

-- Delta Sharing — compartilhar dados externos
CREATE SHARE orders_share;
ALTER SHARE orders_share ADD TABLE catalog_prod.gold.orders_summary;
CREATE RECIPIENT partner_company;

Delta Lake no Databricks

Criação e CTAS

-- CTAS com partição
CREATE TABLE catalog_prod.silver.orders
USING DELTA
PARTITIONED BY (order_year, order_month)
TBLPROPERTIES (
  'delta.enableChangeDataFeed' = 'true',
  'delta.autoOptimize.optimizeWrite' = 'true'
)
AS SELECT
  *,
  year(created_at) AS order_year,
  month(created_at) AS order_month
FROM catalog_prod.bronze.raw_orders;

MERGE INTO (upsert)

MERGE INTO catalog_prod.silver.customers AS target
USING catalog_prod.bronze.customers_updates AS source
ON target.customer_id = source.customer_id
WHEN MATCHED AND source.updated_at > target.updated_at THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE AND target.is_active = true THEN
  UPDATE SET target.is_active = false;
# PySpark DeltaTable API
from delta.tables import DeltaTable

delta_table = DeltaTable.forName(spark, "catalog_prod.silver.customers")
delta_table.alias("target").merge(
    updates_df.alias("source"),
    "target.customer_id = source.customer_id"
).whenMatchedUpdateAll(
    condition="source.updated_at > target.updated_at"
).whenNotMatchedInsertAll().execute()

Time Travel

-- Por versão
SELECT * FROM catalog_prod.silver.orders VERSION AS OF 42;

-- Por timestamp
SELECT * FROM catalog_prod.silver.orders TIMESTAMP AS OF '2024-05-01 00:00:00';

-- Histórico
DESCRIBE HISTORY catalog_prod.silver.orders;

-- Restaurar
RESTORE TABLE catalog_prod.silver.orders TO VERSION AS OF 40;
RESTORE TABLE catalog_prod.silver.orders TO TIMESTAMP AS OF '2024-05-01';

Manutenção

-- Compactar small files + Z-Order
OPTIMIZE catalog_prod.silver.orders
ZORDER BY (customer_id, created_at);

-- Remover arquivos antigos (default 7 dias de retenção)
VACUUM catalog_prod.silver.orders RETAIN 168 HOURS;

-- Forçar vacuum (cuidado — quebra time travel anterior)
SET spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM catalog_prod.silver.orders RETAIN 0 HOURS;

Change Data Feed (CDF)

# Ler mudanças desde versão X
changes_df = (
    spark.read.format("delta")
    .option("readChangeData", "true")
    .option("startingVersion", 10)
    .table("catalog_prod.silver.orders")
)
# _change_type: insert | update_preimage | update_postimage | delete
display(changes_df.filter("_change_type = 'update_postimage'"))

Spark SQL e Funções

Funções de data

SELECT
  date_trunc('month', created_at)           AS month_start,
  date_format(created_at, 'yyyy-MM')        AS year_month,
  datediff(current_date(), created_at)      AS days_ago,
  date_add(created_at, 30)                  AS plus_30_days,
  last_day(created_at)                      AS last_day_of_month
FROM catalog_prod.silver.orders;

Regex

SELECT
  regexp_extract(email, '^([^@]+)@(.+)$', 1)  AS username,
  regexp_replace(phone, '[^0-9]', '')           AS clean_phone
FROM catalog_prod.silver.customers;

Arrays e Structs

from pyspark.sql import functions as F

df = spark.table("catalog_prod.silver.orders_with_items")

result = df.select(
    "order_id",
    F.transform("items", lambda x: x["price"] * x["qty"]).alias("line_totals"),
    F.filter("items", lambda x: x["qty"] > 1).alias("multi_qty_items"),
    F.aggregate("items", F.lit(0.0), lambda acc, x: acc + x["price"] * x["qty"]).alias("total"),
    F.explode("items").alias("item"),
    F.from_json("metadata_json", schema).alias("metadata"),
    F.to_json(F.struct("order_id", "status")).alias("json_out")
)

Window Functions e QUALIFY

SELECT
  customer_id,
  order_id,
  amount,
  SUM(amount)    OVER w AS running_total,
  LAG(amount, 1) OVER w AS prev_amount,
  ROW_NUMBER()   OVER w AS rn,
  RANK()         OVER w AS rnk
FROM catalog_prod.silver.orders
WINDOW w AS (PARTITION BY customer_id ORDER BY created_at)
QUALIFY ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY created_at DESC) = 1;

PIVOT / UNPIVOT e CTEs

WITH monthly AS (
  SELECT customer_id, date_format(created_at, 'yyyy-MM') AS ym, amount
  FROM catalog_prod.silver.orders
)
SELECT * FROM monthly
PIVOT (SUM(amount) FOR ym IN ('2024-01', '2024-02', '2024-03'));

Delta Live Tables (DLT)

Estrutura básica

import dlt
from pyspark.sql import functions as F

# Tabela bronze — ingestão streaming
@dlt.table(
    name="raw_orders",
    comment="Ingestão raw de pedidos via Auto Loader",
    table_properties={"quality": "bronze"}
)
def raw_orders():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", "/Volumes/catalog_dev/bronze/checkpoints/orders_schema")
        .load("/Volumes/catalog_prod/bronze/raw_files/orders/")
    )

# Tabela silver — com expectativas de qualidade
@dlt.table(
    name="clean_orders",
    comment="Pedidos validados e normalizados",
    table_properties={"quality": "silver"}
)
@dlt.expect("valid_order_id", "order_id IS NOT NULL")
@dlt.expect_or_drop("positive_amount", "amount > 0")
@dlt.expect_or_fail("valid_status", "status IN ('pending','confirmed','shipped','cancelled')")
def clean_orders():
    return (
        dlt.read_stream("raw_orders")
        .withColumn("amount", F.col("amount").cast("decimal(18,2)"))
        .withColumn("created_at", F.to_timestamp("created_at"))
        .select("order_id", "customer_id", "amount", "status", "created_at")
    )

# View intermediária
@dlt.view
def orders_with_customers():
    return dlt.read("clean_orders").join(
        dlt.read("clean_customers"), "customer_id"
    )

# Tabela gold — batch
@dlt.table(name="daily_revenue", table_properties={"quality": "gold"})
def daily_revenue():
    return (
        dlt.read("clean_orders")
        .filter("status = 'confirmed'")
        .groupBy(F.date_trunc("day", F.col("created_at")).alias("day"))
        .agg(F.sum("amount").alias("revenue"), F.count("*").alias("orders"))
    )

CDC com dlt.apply_changes (SCD1 e SCD2)

# SCD Tipo 1 — sobrescreve
dlt.create_streaming_table("customers_scd1")
dlt.apply_changes(
    target="customers_scd1",
    source="raw_customers_cdc",
    keys=["customer_id"],
    sequence_by="updated_at",
    apply_as_deletes=F.expr("op = 'DELETE'"),
    stored_as_scd_type=1
)

# SCD Tipo 2 — histórico completo
dlt.create_streaming_table("customers_scd2")
dlt.apply_changes(
    target="customers_scd2",
    source="raw_customers_cdc",
    keys=["customer_id"],
    sequence_by="updated_at",
    stored_as_scd_type=2,
    track_history_column_list=["name", "email", "tier"]
)

Monitorar qualidade via event_log

SELECT
  origin.flow_name,
  details:flow_progress:data_quality:dropped_records::int AS dropped,
  details:flow_progress:data_quality:expectations AS expectations
FROM event_log("catalog_prod.bronze.raw_orders")
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC;

Databricks Workflows (Jobs)

Estrutura de job multi-task (JSON/YAML)

name: pipeline-orders-daily
schedule:
  quartz_cron_expression: "0 0 6 * * ?"
  timezone_id: "America/Sao_Paulo"

job_clusters:
  - job_cluster_key: main_cluster
    new_cluster:
      spark_version: "15.4.x-scala2.12"
      node_type_id: "i3.2xlarge"
      num_workers: 4
      spark_conf:
        spark.databricks.delta.optimizeWrite.enabled: "true"

tasks:
  - task_key: ingest_raw
    job_cluster_key: main_cluster
    notebook_task:
      notebook_path: /Repos/data-team/pipelines/01_ingest_raw
      base_parameters:
        env: "prod"
        date: "{{job.start_time.iso_date}}"
    retry_on_timeout: false
    max_retries: 2
    min_retry_interval_millis: 60000

  - task_key: transform_silver
    depends_on:
      - task_key: ingest_raw
    job_cluster_key: main_cluster
    python_wheel_task:
      package_name: orders_pipeline
      entry_point: transform_silver
      parameters: ["--env", "prod"]

  - task_key: dlt_pipeline
    depends_on:
      - task_key: transform_silver
    pipeline_task:
      pipeline_id: "abc123-pipeline-id"

  - task_key: dbt_models
    depends_on:
      - task_key: dlt_pipeline
    dbt_task:
      project_directory: /Repos/data-team/dbt_project
      commands: ["dbt run --select tag:daily", "dbt test --select tag:daily"]

Criar e triggerar via CLI

# Criar job
databricks jobs create --json @job_config.json

# Triggerar run imediato com parâmetros
databricks jobs run-now --job-id 12345 \
  --notebook-params '{"env": "prod", "date": "2024-06-01"}'

# Listar runs
databricks runs list --job-id 12345 --limit 10

# Cancelar run
databricks runs cancel --run-id 67890

MLflow no Databricks

Tracking de experimentos

import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import f1_score, roc_auc_score

mlflow.set_experiment("/Shared/experiments/churn-prediction")

with mlflow.start_run(run_name="gbm_v3") as run:
    # Parâmetros
    params = {"n_estimators": 200, "max_depth": 5, "learning_rate": 0.05}
    mlflow.log_params(params)

    model = GradientBoostingClassifier(**params)
    model.fit(X_train, y_train)
    preds = model.predict(X_test)

    # Métricas
    mlflow.log_metrics({
        "f1": f1_score(y_test, preds),
        "auc": roc_auc_score(y_test, model.predict_proba(X_test)[:, 1])
    })

    # Artefatos
    mlflow.log_artifact("reports/feature_importance.png")
    mlflow.sklearn.log_model(model, "model", registered_model_name="churn-model-prod")

    print(f"Run ID: {run.info.run_id}")

Autolog

mlflow.sklearn.autolog(log_input_examples=True, log_model_signatures=True)
# Agora fit() loga tudo automaticamente
model.fit(X_train, y_train)

Model Registry UC — aliases champion/challenger

from mlflow import MlflowClient

client = MlflowClient()

# Registrar no UC
model_uri = f"runs:/{run_id}/model"
mv = mlflow.register_model(model_uri, "catalog_prod.ml.churn_model")

# Definir aliases
client.set_registered_model_alias("catalog_prod.ml.churn_model", "challenger", mv.version)
# Promover após validação
client.set_registered_model_alias("catalog_prod.ml.churn_model", "champion", mv.version)

# Carregar pelo alias
model = mlflow.sklearn.load_model("models:/catalog_prod.ml.churn_model@champion")

Model Serving

# Via UI: Models > Serving > Create Endpoint
# Via API:
import requests, json

endpoint_config = {
    "name": "churn-prediction-endpoint",
    "config": {
        "served_models": [{
            "model_name": "catalog_prod.ml.churn_model",
            "model_version": "3",
            "workload_size": "Small",
            "scale_to_zero_enabled": True
        }]
    }
}
# POST /api/2.0/serving-endpoints

Performance e Custo

Photon — quando realmente ajuda

  • Queries SQL analíticas sobre Delta (scans, aggregations, joins)
  • OPTIMIZE, VACUUM, MERGE grandes
  • Não ajuda: pandas UDFs, Python RDD, modelos ML custom

Delta Cache vs Spark Cache

CaracterísticaDelta CacheSpark Cache
Armazena emDisco SSD local do workerMemória (RAM)
PersistênciaEntre queries (cluster ligado)Apenas na sessão Spark
FormatoBinário otimizadoRDD serializado
AtivaçãoAutomáticadf.cache() / df.persist()
Uso idealTabelas Delta lidas repetidamenteDataFrames intermediários

Z-Order vs Liquid Clustering

AspectoZ-OrderLiquid Clustering (DBR 13.3+)
SintaxeOPTIMIZE t ZORDER BY (col)CLUSTER BY (col) na criação
RewriteTotal (OPTIMIZE necessário)Incremental, automático
PartiçãoCombinado com PARTITIONED BYSubstitui partição física
ManutençãoManual (agendar OPTIMIZE)Automática
Quando usarDBR < 13.3 ou legadoNovas tabelas em DBR 13.3+

Heurísticas de sizing

# Regra geral: 128MB–1GB por partição após shuffle
# Ajustar número de shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", 200)  # default; ajustar para ~2-4x cores

# Verificar tamanho de partições
df.rdd.getNumPartitions()

# Repartition vs Coalesce
df_repartitioned = df.repartition(100, "customer_id")  # shuffle completo, balanceado
df_coalesced = df.coalesce(10)  # reduz sem shuffle (evite aumentar com coalesce)

Hints e skew

-- Broadcast join para tabelas pequenas (< 200MB)
SELECT /*+ BROADCAST(dim) */ f.*, dim.name
FROM fact_orders f JOIN dim_customers dim ON f.customer_id = dim.customer_id;

-- Skew hint (Databricks-specific)
SELECT /*+ SKEW('orders', 'customer_id') */ *
FROM orders JOIN customers USING (customer_id);

Custo via system tables

SELECT
  sku_name,
  SUM(usage_quantity) AS dbus,
  SUM(usage_quantity * list_price) AS estimated_cost_usd
FROM system.billing.usage
JOIN system.billing.list_prices USING (sku_name, cloud)
WHERE usage_date >= current_date() - 30
  AND custom_tags['Team'] = 'data-engineering'
GROUP BY sku_name
ORDER BY estimated_cost_usd DESC;

CI/CD e Boas Práticas

Databricks Asset Bundles (DAB)

# databricks.yml (raiz do projeto)
bundle:
  name: orders-pipeline

variables:
  env:
    default: dev
  catalog:
    default: catalog_dev

targets:
  dev:
    mode: development
    default: true
    workspace:
      host: https://adb-xxx.azuredatabricks.net
    variables:
      env: dev
      catalog: catalog_dev

  prod:
    mode: production
    workspace:
      host: https://adb-yyy.azuredatabricks.net
    variables:
      env: prod
      catalog: catalog_prod
    run_as:
      service_principal_name: sp-databricks-prod
# resources/jobs/orders_daily.yml
resources:
  jobs:
    orders_daily:
      name: orders-daily-${var.env}
      schedule:
        quartz_cron_expression: "0 0 6 * * ?"
        timezone_id: "America/Sao_Paulo"
      tasks:
        - task_key: ingest
          notebook_task:
            notebook_path: ./notebooks/01_ingest
            base_parameters:
              catalog: ${var.catalog}
              env: ${var.env}

Comandos DAB

# Autenticar
databricks auth login --host https://adb-xxx.azuredatabricks.net

# Validar bundle
databricks bundle validate

# Deploy para target
databricks bundle deploy --target dev
databricks bundle deploy --target prod

# Rodar job manualmente após deploy
databricks bundle run orders_daily --target prod

Estrutura de projeto recomendada

orders-pipeline/
├── databricks.yml
├── resources/
│   ├── jobs/
│   │   └── orders_daily.yml
│   └── pipelines/
│       └── dlt_orders.yml
├── src/
│   ├── orders/
│   │   ├── __init__.py
│   │   ├── ingest.py
│   │   ├── transform.py
│   │   └── models.py
│   └── utils/
│       ├── spark_utils.py
│       └── delta_utils.py
├── notebooks/
│   ├── 01_ingest.py
│   └── 02_transform.py
├── tests/
│   ├── unit/
│   │   ├── test_transform.py
│   │   └── test_models.py
│   └── integration/
│       └── test_pipeline_e2e.py
├── pyproject.toml
└── .github/workflows/ci.yml

Testes com PySpark local

# tests/unit/test_transform.py
import pytest
from pyspark.sql import SparkSession
from src.orders.transform import clean_orders_df

@pytest.fixture(scope="session")
def spark():
    return (
        SparkSession.builder
        .master("local[2]")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .getOrCreate()
    )

def test_clean_orders_drops_null_order_id(spark):
    data = [
        (1, 100, 50.0, "confirmed"),
        (None, 200, 30.0, "pending"),  # deve ser removido
    ]
    df = spark.createDataFrame(data, ["order_id", "customer_id", "amount", "status"])
    result = clean_orders_df(df)
    assert result.count() == 1
    assert result.filter("order_id IS NULL").count() == 0

GitHub Actions — CI pipeline

# .github/workflows/ci.yml
name: CI

on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with: { python-version: "3.11" }
      - run: pip install pyspark==3.5.0 delta-spark==3.2.0 pytest
      - run: pytest tests/unit/ -v

  deploy-dev:
    needs: test
    if: github.ref == 'refs/heads/develop'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: databricks/setup-cli@main
      - run: databricks bundle deploy --target dev
        env:
          DATABRICKS_HOST: ${{ secrets.DBX_HOST_DEV }}
          DATABRICKS_TOKEN: ${{ secrets.DBX_TOKEN_DEV }}

  deploy-prod:
    needs: test
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: databricks/setup-cli@main
      - run: databricks bundle deploy --target prod
        env:
          DATABRICKS_HOST: ${{ secrets.DBX_HOST_PROD }}
          DATABRICKS_TOKEN: ${{ secrets.DBX_TOKEN_PROD }}

Checklist de produção

  • Job usa Job Cluster (não All-Purpose) com Spot instances + fallback
  • Secrets via Secret Scopes — nenhuma credencial hardcoded
  • Unity Catalog: tabelas managed, GRANT mínimo necessário
  • Delta: optimizeWrite + autoCompact habilitados
  • Liquid Clustering em tabelas novas (DBR 13.3+)
  • VACUUM agendado (semanal, retain >= 7 dias)
  • DLT expectations documentando contrato de dados
  • MLflow tracking ativo; modelos versionados no UC Model Registry
  • Cluster tags: Team, CostCenter, Environment
  • system.billing.usage monitorado com alertas de custo
  • DAB com targets dev/staging/prod e run_as Service Principal em prod
  • Testes unitários passando em CI antes de deploy
  • Retry policy configurado (max 2 retries, intervalo 60s)
  • Notificações de falha configuradas (email/Slack on failure)