Fundamentos Databricks
Databricks vs Apache Spark puro
| Aspecto | Apache Spark puro | Databricks |
|---|---|---|
| Instalação | Manual (cluster manager próprio) | Gerenciado, zero-ops |
| Notebooks | Não inclui | Colaborativos, versionados |
| Otimizações | Community | Photon, Delta Engine, Liquid Clustering |
| Delta Lake | Opcional (biblioteca) | Nativo, default |
| Segurança | Básica | Unity Catalog, RBAC granular |
| MLflow | Biblioteca separada | Integrado, Model Registry |
| Custo | Infraestrutura própria | DBU + cloud compute |
| Suporte | Community / pago | Enterprise SLA |
Lakehouse Platform
Unifica Data Warehouse (SQL + BI) com Data Lake (arquivos brutos, ML) sobre Delta Lake — formato aberto Parquet + transaction log.
Componentes principais
- Workspace — interface web, notebooks, jobs, clusters, repos
- Cluster — pool de VMs com Spark; All-Purpose ou Job Cluster
- Notebook — células multi-linguagem (Python, SQL, Scala, R)
- DBFS — Databricks File System; abstração sobre blob storage
- Unity Catalog — governança centralizada multi-workspace
- Workflows — orquestração de jobs com DAG de tarefas
- MLflow — tracking de experimentos, Model Registry, serving
- Delta Sharing — compartilhamento seguro de dados entre orgs
Planos e Clouds
- Planos: Standard, Premium (Unity Catalog + RBAC), Enterprise
- Clouds: AWS (us-east-1, etc.), Azure (East US, etc.), GCP
- Conta de cloud separada — Databricks cobra DBU; VMs cobradas pelo cloud provider
Clusters
All-Purpose vs Job Clusters
| Característica | All-Purpose | Job Cluster |
|---|---|---|
| Uso | Desenvolvimento interativo | Execução de jobs automatizados |
| Ciclo de vida | Manual (start/stop) | Criado e destruído com o job |
| Custo | Mais caro (DBU/h enquanto ligado) | Mais barato por execução |
| Concorrência | Múltiplos usuários | Isolado por job/run |
| Quando usar | Exploração, notebooks ad-hoc | Produção, pipelines agendados |
Databricks Runtime (DBR)
- Standard — Spark + Delta Lake
- ML — Standard + PyTorch, TensorFlow, scikit-learn, MLflow
- Photon — Standard + engine C++ para SQL/Delta; melhor para cargas analíticas
- LTS (Long Term Support) — versão estável por 2 anos; use em produção
Configuração JSON de cluster com autoscaling
{
"cluster_name": "data-engineering-prod",
"spark_version": "15.4.x-scala2.12",
"node_type_id": "i3.xlarge",
"driver_node_type_id": "i3.xlarge",
"autoscale": {
"min_workers": 2,
"max_workers": 10
},
"aws_attributes": {
"availability": "SPOT_WITH_FALLBACK",
"spot_bid_price_percent": 100
},
"spark_conf": {
"spark.databricks.delta.optimizeWrite.enabled": "true",
"spark.databricks.delta.autoCompact.enabled": "true"
},
"custom_tags": {
"Team": "data-engineering",
"CostCenter": "analytics",
"Environment": "prod"
},
"init_scripts": [
{
"workspace": {
"destination": "/Shared/init/install-libs.sh"
}
}
]
}Init script exemplo
#!/bin/bash
pip install great-expectations==0.18.0
pip install dbt-databricks==1.8.0Cluster tags para FinOps
Tags propagam para VMs do cloud provider — use Team, CostCenter, Environment, Project para atribuição de custo no AWS Cost Explorer / Azure Cost Management.
Single Node vs Multi-Node
- Single Node — driver sem workers; útil para pandas, ML single-machine, testes locais
- Multi-Node — driver + N workers; necessário para Spark distribuído real
Notebooks
Magic commands
# Trocar linguagem na célula
%python # Python (padrão)
%sql # Spark SQL
%scala # Scala
%r # R
%sh # Shell bash no driver
%fs # Atalho para dbutils.fs
%md # Markdown (documentação)
%pip # Instalar pacotes Python na sessão# %pip — instala em todos os nós do cluster
%pip install faker==24.0.0Widgets (dbutils.widgets)
# Criar widgets
dbutils.widgets.text("env", "dev", "Ambiente")
dbutils.widgets.dropdown("table", "orders", ["orders", "customers", "products"])
dbutils.widgets.combobox("date", "2024-01-01", ["2024-01-01", "2024-02-01"])
dbutils.widgets.multiselect("regions", "BR", ["BR", "US", "EU"])
# Ler valor
env = dbutils.widgets.get("env")
table = dbutils.widgets.get("table")
# Remover
dbutils.widgets.remove("env")
dbutils.widgets.removeAll()Passar variáveis entre linguagens via spark.conf
# Célula Python
spark.conf.set("pipeline.run_date", "2024-06-01")
spark.conf.set("pipeline.env", "prod")-- Célula SQL — ler variável
SELECT *
FROM catalog_prod.bronze.orders
WHERE order_date = '${pipeline.run_date}'dbutils.notebook — orquestração
# Executar notebook filho e capturar resultado
result = dbutils.notebook.run(
"/Shared/pipelines/process_orders",
timeout_seconds=600,
arguments={"env": "prod", "date": "2024-06-01"}
)
print(result) # string retornada com dbutils.notebook.exit("ok")
# %run — executa inline (variáveis ficam no escopo)
# %run ../utils/helpers (sem aspas, sem .py)Git integration
Repos conectam notebooks diretamente ao Git (GitHub, GitLab, Bitbucket, Azure DevOps). Commitar, criar branches e fazer PR direto da UI do Workspace.
DBFS e dbutils
dbutils.fs
# Listar arquivos
display(dbutils.fs.ls("dbfs:/mnt/datalake/raw/"))
display(dbutils.fs.ls("abfss://container@account.dfs.core.windows.net/"))
# Operações
dbutils.fs.cp("dbfs:/source/file.parquet", "dbfs:/dest/file.parquet")
dbutils.fs.mv("dbfs:/old/path/", "dbfs:/new/path/", recurse=True)
dbutils.fs.rm("dbfs:/tmp/old-data/", recurse=True)
dbutils.fs.mkdirs("dbfs:/mnt/datalake/processed/orders/")
# Ler arquivo texto
content = dbutils.fs.head("dbfs:/mnt/configs/pipeline.json", maxBytes=1000)dbutils.secrets — Secret Scopes
# Ler secret (valor nunca aparece em logs)
storage_key = dbutils.secrets.get(scope="azure-kv", key="adls-access-key")
db_password = dbutils.secrets.get(scope="databricks-secrets", key="postgres-pwd")
# Listar scopes e secrets disponíveis
dbutils.secrets.listScopes()
dbutils.secrets.list(scope="azure-kv")Criar scope via CLI:
# Databricks-backed scope
databricks secrets create-scope --scope databricks-secrets
# Azure Key Vault-backed scope
databricks secrets create-scope \
--scope azure-kv \
--scope-backend-type AZURE_KEYVAULT \
--resource-id "/subscriptions/.../vaults/my-vault" \
--dns-name "https://my-vault.vault.azure.net/"
# Adicionar secret
databricks secrets put-secret databricks-secrets postgres-pwd --string-value "s3cr3t"Mount ADLS Gen2 com Service Principal (legado)
configs = {
"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": dbutils.secrets.get("azure-kv", "sp-client-id"),
"fs.azure.account.oauth2.client.secret": dbutils.secrets.get("azure-kv", "sp-client-secret"),
"fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token",
}
dbutils.fs.mount(
source="abfss://raw@mystorageaccount.dfs.core.windows.net/",
mount_point="/mnt/datalake/raw",
extra_configs=configs
)Moderno: prefira UC External Locations + Volumes — sem mount, sem credenciais no notebook.
UC Volumes (substituto moderno)
-- Criar volume e usar diretamente
CREATE VOLUME catalog_prod.bronze.raw_files;
-- Acessar via path
-- /Volumes/catalog_prod/bronze/raw_files/orders/2024-06-01/df = spark.read.json("/Volumes/catalog_prod/bronze/raw_files/events/")Unity Catalog
Hierarquia
Metastore (1 por região)
└── Catalog
└── Schema (= Database)
├── Table (managed ou external)
├── View
└── Volume (arquivos não-tabulares)DDL essencial
-- Catalog
CREATE CATALOG IF NOT EXISTS catalog_prod
COMMENT 'Dados de produção';
-- Schema
CREATE SCHEMA IF NOT EXISTS catalog_prod.bronze
COMMENT 'Camada raw/ingestão';
-- Managed table (UC gerencia os dados)
CREATE TABLE catalog_prod.silver.orders (
order_id BIGINT NOT NULL,
customer_id BIGINT,
amount DECIMAL(18,2),
status STRING,
created_at TIMESTAMP
)
USING DELTA
COMMENT 'Pedidos normalizados';
-- External table (dados em storage externo)
CREATE TABLE catalog_prod.bronze.raw_events
USING PARQUET
LOCATION 'abfss://raw@account.dfs.core.windows.net/events/'GRANT/REVOKE granular
-- Catalog
GRANT USE CATALOG ON CATALOG catalog_prod TO `data-engineers`;
-- Schema
GRANT USE SCHEMA, CREATE TABLE ON SCHEMA catalog_prod.silver TO `data-engineers`;
-- Tabela
GRANT SELECT ON TABLE catalog_prod.gold.kpis TO `data-analysts`;
GRANT MODIFY ON TABLE catalog_prod.silver.orders TO `data-engineers`;
-- Column-level security
GRANT SELECT (order_id, amount, status) ON TABLE catalog_prod.silver.orders TO `bi-team`;
-- Row-level security via dynamic view
CREATE VIEW catalog_prod.silver.orders_filtered AS
SELECT * FROM catalog_prod.silver.orders
WHERE region = current_user_region(); -- função customizada
REVOKE MODIFY ON TABLE catalog_prod.silver.orders FROM `analysts`;Lineage e Audit
-- Auditoria via system tables
SELECT event_time, user_identity, request_params
FROM system.access.audit
WHERE service_name = 'unityCatalog'
AND action_name IN ('getTable', 'createTable')
ORDER BY event_time DESC
LIMIT 100;
-- Delta Sharing — compartilhar dados externos
CREATE SHARE orders_share;
ALTER SHARE orders_share ADD TABLE catalog_prod.gold.orders_summary;
CREATE RECIPIENT partner_company;Delta Lake no Databricks
Criação e CTAS
-- CTAS com partição
CREATE TABLE catalog_prod.silver.orders
USING DELTA
PARTITIONED BY (order_year, order_month)
TBLPROPERTIES (
'delta.enableChangeDataFeed' = 'true',
'delta.autoOptimize.optimizeWrite' = 'true'
)
AS SELECT
*,
year(created_at) AS order_year,
month(created_at) AS order_month
FROM catalog_prod.bronze.raw_orders;MERGE INTO (upsert)
MERGE INTO catalog_prod.silver.customers AS target
USING catalog_prod.bronze.customers_updates AS source
ON target.customer_id = source.customer_id
WHEN MATCHED AND source.updated_at > target.updated_at THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE AND target.is_active = true THEN
UPDATE SET target.is_active = false;# PySpark DeltaTable API
from delta.tables import DeltaTable
delta_table = DeltaTable.forName(spark, "catalog_prod.silver.customers")
delta_table.alias("target").merge(
updates_df.alias("source"),
"target.customer_id = source.customer_id"
).whenMatchedUpdateAll(
condition="source.updated_at > target.updated_at"
).whenNotMatchedInsertAll().execute()Time Travel
-- Por versão
SELECT * FROM catalog_prod.silver.orders VERSION AS OF 42;
-- Por timestamp
SELECT * FROM catalog_prod.silver.orders TIMESTAMP AS OF '2024-05-01 00:00:00';
-- Histórico
DESCRIBE HISTORY catalog_prod.silver.orders;
-- Restaurar
RESTORE TABLE catalog_prod.silver.orders TO VERSION AS OF 40;
RESTORE TABLE catalog_prod.silver.orders TO TIMESTAMP AS OF '2024-05-01';Manutenção
-- Compactar small files + Z-Order
OPTIMIZE catalog_prod.silver.orders
ZORDER BY (customer_id, created_at);
-- Remover arquivos antigos (default 7 dias de retenção)
VACUUM catalog_prod.silver.orders RETAIN 168 HOURS;
-- Forçar vacuum (cuidado — quebra time travel anterior)
SET spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM catalog_prod.silver.orders RETAIN 0 HOURS;Change Data Feed (CDF)
# Ler mudanças desde versão X
changes_df = (
spark.read.format("delta")
.option("readChangeData", "true")
.option("startingVersion", 10)
.table("catalog_prod.silver.orders")
)
# _change_type: insert | update_preimage | update_postimage | delete
display(changes_df.filter("_change_type = 'update_postimage'"))Spark SQL e Funções
Funções de data
SELECT
date_trunc('month', created_at) AS month_start,
date_format(created_at, 'yyyy-MM') AS year_month,
datediff(current_date(), created_at) AS days_ago,
date_add(created_at, 30) AS plus_30_days,
last_day(created_at) AS last_day_of_month
FROM catalog_prod.silver.orders;Regex
SELECT
regexp_extract(email, '^([^@]+)@(.+)$', 1) AS username,
regexp_replace(phone, '[^0-9]', '') AS clean_phone
FROM catalog_prod.silver.customers;Arrays e Structs
from pyspark.sql import functions as F
df = spark.table("catalog_prod.silver.orders_with_items")
result = df.select(
"order_id",
F.transform("items", lambda x: x["price"] * x["qty"]).alias("line_totals"),
F.filter("items", lambda x: x["qty"] > 1).alias("multi_qty_items"),
F.aggregate("items", F.lit(0.0), lambda acc, x: acc + x["price"] * x["qty"]).alias("total"),
F.explode("items").alias("item"),
F.from_json("metadata_json", schema).alias("metadata"),
F.to_json(F.struct("order_id", "status")).alias("json_out")
)Window Functions e QUALIFY
SELECT
customer_id,
order_id,
amount,
SUM(amount) OVER w AS running_total,
LAG(amount, 1) OVER w AS prev_amount,
ROW_NUMBER() OVER w AS rn,
RANK() OVER w AS rnk
FROM catalog_prod.silver.orders
WINDOW w AS (PARTITION BY customer_id ORDER BY created_at)
QUALIFY ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY created_at DESC) = 1;PIVOT / UNPIVOT e CTEs
WITH monthly AS (
SELECT customer_id, date_format(created_at, 'yyyy-MM') AS ym, amount
FROM catalog_prod.silver.orders
)
SELECT * FROM monthly
PIVOT (SUM(amount) FOR ym IN ('2024-01', '2024-02', '2024-03'));Delta Live Tables (DLT)
Estrutura básica
import dlt
from pyspark.sql import functions as F
# Tabela bronze — ingestão streaming
@dlt.table(
name="raw_orders",
comment="Ingestão raw de pedidos via Auto Loader",
table_properties={"quality": "bronze"}
)
def raw_orders():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/Volumes/catalog_dev/bronze/checkpoints/orders_schema")
.load("/Volumes/catalog_prod/bronze/raw_files/orders/")
)
# Tabela silver — com expectativas de qualidade
@dlt.table(
name="clean_orders",
comment="Pedidos validados e normalizados",
table_properties={"quality": "silver"}
)
@dlt.expect("valid_order_id", "order_id IS NOT NULL")
@dlt.expect_or_drop("positive_amount", "amount > 0")
@dlt.expect_or_fail("valid_status", "status IN ('pending','confirmed','shipped','cancelled')")
def clean_orders():
return (
dlt.read_stream("raw_orders")
.withColumn("amount", F.col("amount").cast("decimal(18,2)"))
.withColumn("created_at", F.to_timestamp("created_at"))
.select("order_id", "customer_id", "amount", "status", "created_at")
)
# View intermediária
@dlt.view
def orders_with_customers():
return dlt.read("clean_orders").join(
dlt.read("clean_customers"), "customer_id"
)
# Tabela gold — batch
@dlt.table(name="daily_revenue", table_properties={"quality": "gold"})
def daily_revenue():
return (
dlt.read("clean_orders")
.filter("status = 'confirmed'")
.groupBy(F.date_trunc("day", F.col("created_at")).alias("day"))
.agg(F.sum("amount").alias("revenue"), F.count("*").alias("orders"))
)CDC com dlt.apply_changes (SCD1 e SCD2)
# SCD Tipo 1 — sobrescreve
dlt.create_streaming_table("customers_scd1")
dlt.apply_changes(
target="customers_scd1",
source="raw_customers_cdc",
keys=["customer_id"],
sequence_by="updated_at",
apply_as_deletes=F.expr("op = 'DELETE'"),
stored_as_scd_type=1
)
# SCD Tipo 2 — histórico completo
dlt.create_streaming_table("customers_scd2")
dlt.apply_changes(
target="customers_scd2",
source="raw_customers_cdc",
keys=["customer_id"],
sequence_by="updated_at",
stored_as_scd_type=2,
track_history_column_list=["name", "email", "tier"]
)Monitorar qualidade via event_log
SELECT
origin.flow_name,
details:flow_progress:data_quality:dropped_records::int AS dropped,
details:flow_progress:data_quality:expectations AS expectations
FROM event_log("catalog_prod.bronze.raw_orders")
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC;Databricks Workflows (Jobs)
Estrutura de job multi-task (JSON/YAML)
name: pipeline-orders-daily
schedule:
quartz_cron_expression: "0 0 6 * * ?"
timezone_id: "America/Sao_Paulo"
job_clusters:
- job_cluster_key: main_cluster
new_cluster:
spark_version: "15.4.x-scala2.12"
node_type_id: "i3.2xlarge"
num_workers: 4
spark_conf:
spark.databricks.delta.optimizeWrite.enabled: "true"
tasks:
- task_key: ingest_raw
job_cluster_key: main_cluster
notebook_task:
notebook_path: /Repos/data-team/pipelines/01_ingest_raw
base_parameters:
env: "prod"
date: "{{job.start_time.iso_date}}"
retry_on_timeout: false
max_retries: 2
min_retry_interval_millis: 60000
- task_key: transform_silver
depends_on:
- task_key: ingest_raw
job_cluster_key: main_cluster
python_wheel_task:
package_name: orders_pipeline
entry_point: transform_silver
parameters: ["--env", "prod"]
- task_key: dlt_pipeline
depends_on:
- task_key: transform_silver
pipeline_task:
pipeline_id: "abc123-pipeline-id"
- task_key: dbt_models
depends_on:
- task_key: dlt_pipeline
dbt_task:
project_directory: /Repos/data-team/dbt_project
commands: ["dbt run --select tag:daily", "dbt test --select tag:daily"]Criar e triggerar via CLI
# Criar job
databricks jobs create --json @job_config.json
# Triggerar run imediato com parâmetros
databricks jobs run-now --job-id 12345 \
--notebook-params '{"env": "prod", "date": "2024-06-01"}'
# Listar runs
databricks runs list --job-id 12345 --limit 10
# Cancelar run
databricks runs cancel --run-id 67890MLflow no Databricks
Tracking de experimentos
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import f1_score, roc_auc_score
mlflow.set_experiment("/Shared/experiments/churn-prediction")
with mlflow.start_run(run_name="gbm_v3") as run:
# Parâmetros
params = {"n_estimators": 200, "max_depth": 5, "learning_rate": 0.05}
mlflow.log_params(params)
model = GradientBoostingClassifier(**params)
model.fit(X_train, y_train)
preds = model.predict(X_test)
# Métricas
mlflow.log_metrics({
"f1": f1_score(y_test, preds),
"auc": roc_auc_score(y_test, model.predict_proba(X_test)[:, 1])
})
# Artefatos
mlflow.log_artifact("reports/feature_importance.png")
mlflow.sklearn.log_model(model, "model", registered_model_name="churn-model-prod")
print(f"Run ID: {run.info.run_id}")Autolog
mlflow.sklearn.autolog(log_input_examples=True, log_model_signatures=True)
# Agora fit() loga tudo automaticamente
model.fit(X_train, y_train)Model Registry UC — aliases champion/challenger
from mlflow import MlflowClient
client = MlflowClient()
# Registrar no UC
model_uri = f"runs:/{run_id}/model"
mv = mlflow.register_model(model_uri, "catalog_prod.ml.churn_model")
# Definir aliases
client.set_registered_model_alias("catalog_prod.ml.churn_model", "challenger", mv.version)
# Promover após validação
client.set_registered_model_alias("catalog_prod.ml.churn_model", "champion", mv.version)
# Carregar pelo alias
model = mlflow.sklearn.load_model("models:/catalog_prod.ml.churn_model@champion")Model Serving
# Via UI: Models > Serving > Create Endpoint
# Via API:
import requests, json
endpoint_config = {
"name": "churn-prediction-endpoint",
"config": {
"served_models": [{
"model_name": "catalog_prod.ml.churn_model",
"model_version": "3",
"workload_size": "Small",
"scale_to_zero_enabled": True
}]
}
}
# POST /api/2.0/serving-endpointsPerformance e Custo
Photon — quando realmente ajuda
- Queries SQL analíticas sobre Delta (scans, aggregations, joins)
- OPTIMIZE, VACUUM, MERGE grandes
- Não ajuda: pandas UDFs, Python RDD, modelos ML custom
Delta Cache vs Spark Cache
| Característica | Delta Cache | Spark Cache |
|---|---|---|
| Armazena em | Disco SSD local do worker | Memória (RAM) |
| Persistência | Entre queries (cluster ligado) | Apenas na sessão Spark |
| Formato | Binário otimizado | RDD serializado |
| Ativação | Automática | df.cache() / df.persist() |
| Uso ideal | Tabelas Delta lidas repetidamente | DataFrames intermediários |
Z-Order vs Liquid Clustering
| Aspecto | Z-Order | Liquid Clustering (DBR 13.3+) |
|---|---|---|
| Sintaxe | OPTIMIZE t ZORDER BY (col) | CLUSTER BY (col) na criação |
| Rewrite | Total (OPTIMIZE necessário) | Incremental, automático |
| Partição | Combinado com PARTITIONED BY | Substitui partição física |
| Manutenção | Manual (agendar OPTIMIZE) | Automática |
| Quando usar | DBR < 13.3 ou legado | Novas tabelas em DBR 13.3+ |
Heurísticas de sizing
# Regra geral: 128MB–1GB por partição após shuffle
# Ajustar número de shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", 200) # default; ajustar para ~2-4x cores
# Verificar tamanho de partições
df.rdd.getNumPartitions()
# Repartition vs Coalesce
df_repartitioned = df.repartition(100, "customer_id") # shuffle completo, balanceado
df_coalesced = df.coalesce(10) # reduz sem shuffle (evite aumentar com coalesce)Hints e skew
-- Broadcast join para tabelas pequenas (< 200MB)
SELECT /*+ BROADCAST(dim) */ f.*, dim.name
FROM fact_orders f JOIN dim_customers dim ON f.customer_id = dim.customer_id;
-- Skew hint (Databricks-specific)
SELECT /*+ SKEW('orders', 'customer_id') */ *
FROM orders JOIN customers USING (customer_id);Custo via system tables
SELECT
sku_name,
SUM(usage_quantity) AS dbus,
SUM(usage_quantity * list_price) AS estimated_cost_usd
FROM system.billing.usage
JOIN system.billing.list_prices USING (sku_name, cloud)
WHERE usage_date >= current_date() - 30
AND custom_tags['Team'] = 'data-engineering'
GROUP BY sku_name
ORDER BY estimated_cost_usd DESC;CI/CD e Boas Práticas
Databricks Asset Bundles (DAB)
# databricks.yml (raiz do projeto)
bundle:
name: orders-pipeline
variables:
env:
default: dev
catalog:
default: catalog_dev
targets:
dev:
mode: development
default: true
workspace:
host: https://adb-xxx.azuredatabricks.net
variables:
env: dev
catalog: catalog_dev
prod:
mode: production
workspace:
host: https://adb-yyy.azuredatabricks.net
variables:
env: prod
catalog: catalog_prod
run_as:
service_principal_name: sp-databricks-prod# resources/jobs/orders_daily.yml
resources:
jobs:
orders_daily:
name: orders-daily-${var.env}
schedule:
quartz_cron_expression: "0 0 6 * * ?"
timezone_id: "America/Sao_Paulo"
tasks:
- task_key: ingest
notebook_task:
notebook_path: ./notebooks/01_ingest
base_parameters:
catalog: ${var.catalog}
env: ${var.env}Comandos DAB
# Autenticar
databricks auth login --host https://adb-xxx.azuredatabricks.net
# Validar bundle
databricks bundle validate
# Deploy para target
databricks bundle deploy --target dev
databricks bundle deploy --target prod
# Rodar job manualmente após deploy
databricks bundle run orders_daily --target prodEstrutura de projeto recomendada
orders-pipeline/
├── databricks.yml
├── resources/
│ ├── jobs/
│ │ └── orders_daily.yml
│ └── pipelines/
│ └── dlt_orders.yml
├── src/
│ ├── orders/
│ │ ├── __init__.py
│ │ ├── ingest.py
│ │ ├── transform.py
│ │ └── models.py
│ └── utils/
│ ├── spark_utils.py
│ └── delta_utils.py
├── notebooks/
│ ├── 01_ingest.py
│ └── 02_transform.py
├── tests/
│ ├── unit/
│ │ ├── test_transform.py
│ │ └── test_models.py
│ └── integration/
│ └── test_pipeline_e2e.py
├── pyproject.toml
└── .github/workflows/ci.ymlTestes com PySpark local
# tests/unit/test_transform.py
import pytest
from pyspark.sql import SparkSession
from src.orders.transform import clean_orders_df
@pytest.fixture(scope="session")
def spark():
return (
SparkSession.builder
.master("local[2]")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
)
def test_clean_orders_drops_null_order_id(spark):
data = [
(1, 100, 50.0, "confirmed"),
(None, 200, 30.0, "pending"), # deve ser removido
]
df = spark.createDataFrame(data, ["order_id", "customer_id", "amount", "status"])
result = clean_orders_df(df)
assert result.count() == 1
assert result.filter("order_id IS NULL").count() == 0GitHub Actions — CI pipeline
# .github/workflows/ci.yml
name: CI
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with: { python-version: "3.11" }
- run: pip install pyspark==3.5.0 delta-spark==3.2.0 pytest
- run: pytest tests/unit/ -v
deploy-dev:
needs: test
if: github.ref == 'refs/heads/develop'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: databricks/setup-cli@main
- run: databricks bundle deploy --target dev
env:
DATABRICKS_HOST: ${{ secrets.DBX_HOST_DEV }}
DATABRICKS_TOKEN: ${{ secrets.DBX_TOKEN_DEV }}
deploy-prod:
needs: test
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: databricks/setup-cli@main
- run: databricks bundle deploy --target prod
env:
DATABRICKS_HOST: ${{ secrets.DBX_HOST_PROD }}
DATABRICKS_TOKEN: ${{ secrets.DBX_TOKEN_PROD }}Checklist de produção
- Job usa Job Cluster (não All-Purpose) com Spot instances + fallback
- Secrets via Secret Scopes — nenhuma credencial hardcoded
- Unity Catalog: tabelas managed, GRANT mínimo necessário
- Delta:
optimizeWrite+autoCompacthabilitados - Liquid Clustering em tabelas novas (DBR 13.3+)
- VACUUM agendado (semanal, retain >= 7 dias)
- DLT expectations documentando contrato de dados
- MLflow tracking ativo; modelos versionados no UC Model Registry
- Cluster tags:
Team,CostCenter,Environment -
system.billing.usagemonitorado com alertas de custo - DAB com targets dev/staging/prod e
run_asService Principal em prod - Testes unitários passando em CI antes de deploy
- Retry policy configurado (max 2 retries, intervalo 60s)
- Notificações de falha configuradas (email/Slack on failure)