Apache Airflow
O que é
Airflow é uma plataforma de orquestração de workflows onde os pipelines são definidos como código Python. Em vez de disparar scripts com cron e torcer para que funcionem, você escreve um DAG (Directed Acyclic Graph) que descreve o quê, quando e em que ordem cada etapa executa — com retry automático, alertas de falha e histórico visual na UI.
A diferença central para um cron job: Airflow gerencia dependências entre tarefas, rastreia estado, re-executa tarefas com falha e permite inspecionar cada execução histórica. Um cron job dispara cego; o Airflow orquestra com consciência.
Casos de uso típicos:
- ETL/ELT — extrair dados de APIs, transformar e carregar no data warehouse
- ML pipelines — retreinar modelos periodicamente
- Relatórios agendados
- Processos de negócio com dependências complexas (ex: só enviar relatório se a ingestão terminou)
Arquitetura
┌─────────────────────────────────────────────────────────────┐
│ Airflow Cluster │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ Webserver │ │ Scheduler │ │ DAG Processor │ │
│ │ (UI/API) │ │ (agenda + │ │ (parseia DAGs │ │
│ │ port 8080 │ │ submete) │ │ → metadata DB) │ │
│ └──────────────┘ └──────────────┘ └──────────────────┘ │
│ │ │
│ ┌──────────────┐ │
│ │ Executor │ (dentro do scheduler) │
│ │ Local/Celery │ │
│ │ /Kubernetes │ │
│ └──────────────┘ │
│ │ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ Triggerer │ │ Worker(s) │ │ Metadata DB │ │
│ │ (deferrable │ │ (executam │ │ (PostgreSQL / │ │
│ │ operators) │ │ as tasks) │ │ MySQL) │ │
│ └──────────────┘ └──────────────┘ └──────────────────┘ │
└─────────────────────────────────────────────────────────────┘| Componente | Função |
|---|---|
| Scheduler | Lê DAGs, decide quando criar runs, submete tasks ao executor |
| DAG Processor | Parseia arquivos Python do dags/ e serializa no metadata DB |
| Executor | Config do scheduler — define como tasks são executadas (local, celery, k8s) |
| Webserver | UI e REST API — inspecionar, disparar, monitorar DAGs e runs |
| Triggerer | Processo separado para operadores deferrable (async I/O sem bloquear worker) |
| Worker | Processo que efetivamente executa o código de uma task (Celery/K8s) |
| Metadata DB | PostgreSQL ou MySQL — armazena DAGs, runs, task instances, variáveis, conexões |
DAGs
Um DAG é um grafo de tarefas sem ciclos. Cada arquivo Python dentro da pasta dags/ é inspecionado pelo DAG Processor.
Forma clássica (with DAG(...))
from airflow.sdk import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
"owner": "rafael",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"email": ["ops@empresa.com"],
}
with DAG(
dag_id="meu_pipeline",
description="Exemplo de DAG completo",
start_date=datetime(2024, 1, 1),
schedule="0 6 * * *", # todo dia às 06:00 UTC
catchup=False, # não executa runs passadas ao ativar
max_active_runs=1, # apenas 1 run simultânea
default_args=default_args,
tags=["ingestao", "prod"],
) as dag:
t1 = BashOperator(task_id="extrai_dados", bash_command="python extract.py")
t2 = BashOperator(task_id="transforma", bash_command="python transform.py")
t3 = BashOperator(task_id="carrega", bash_command="python load.py")
t1 >> t2 >> t3Forma com decorator @dag
from airflow.sdk import dag, task
from datetime import datetime
@dag(
dag_id="pipeline_decorado",
start_date=datetime(2024, 1, 1),
schedule="@daily",
catchup=False,
)
def pipeline():
@task
def extrai():
return {"registros": 1000}
@task
def transforma(dados: dict):
return [r * 2 for r in range(dados["registros"])]
@task
def carrega(dados: list):
print(f"Carregando {len(dados)} registros")
dados = extrai()
transformados = transforma(dados)
carrega(transformados)
pipeline() # instancia o DAG — essa linha é obrigatóriaParâmetros importantes do DAG
| Parâmetro | Tipo | Descrição |
|---|---|---|
dag_id | str | Identificador único — usado em URLs e CLI |
start_date | datetime | Data da primeira run (não necessariamente hoje) |
schedule | str | timedelta | None | Cron, @daily, timedelta(hours=1) ou None |
catchup | bool | Se True, cria runs para datas passadas entre start_date e hoje |
max_active_runs | int | Limita concorrência de runs do mesmo DAG |
max_active_tasks | int | Limita tasks simultâneas dentro de uma run |
default_args | dict | Valores padrão para todas as tasks do DAG |
tags | list[str] | Filtros na UI |
on_failure_callback | callable | Callback de falha no nível do DAG |
Schedules comuns
schedule="0 6 * * *" # cron: todo dia às 06:00
schedule="@daily" # alias para "0 0 * * *"
schedule="@hourly" # alias para "0 * * * *"
schedule="@weekly" # alias para "0 0 * * 0"
schedule=timedelta(hours=6) # a cada 6 horas
schedule=None # só disparo manual
schedule="@once" # executa uma vez ao ser ativadoTasks e Dependências
Operadores de dependência
# Sequencial
t1 >> t2 >> t3
# Paralelo depois de t1
t1 >> [t2, t3] >> t4
# Equivalente com set_upstream/set_downstream
t2.set_upstream(t1)
t2.set_downstream(t3)
# Múltiplas dependências
[t1, t2] >> t3 # t3 espera t1 E t2trigger_rule — quando uma task é disparada
Por padrão, uma task só roda quando todos os pais terminam com sucesso. Isso pode ser alterado:
from airflow.utils.trigger_rule import TriggerRule
task_final = PythonOperator(
task_id="finaliza",
python_callable=minha_funcao,
trigger_rule=TriggerRule.ALL_DONE, # roda independente de sucesso/falha
)| Regra | Dispara quando |
|---|---|
ALL_SUCCESS (padrão) | Todos os pais sucesso |
ALL_FAILED | Todos os pais falharam |
ALL_DONE | Todos os pais terminaram (sucesso, falha ou skip) |
ONE_SUCCESS | Pelo menos um pai com sucesso |
ONE_FAILED | Pelo menos um pai falhou |
NONE_FAILED | Nenhum pai falhou (sucesso ou skip aceitos) |
NONE_SKIPPED | Nenhum pai foi pulado |
TaskFlow API
A TaskFlow API usa decoradores Python para definir tasks. Ela elimina o boilerplate de XCom e calcula dependências automaticamente.
Básico
from airflow.sdk import dag, task
from datetime import datetime
@dag(schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False)
def etl_pipeline():
@task
def extract() -> dict:
return {"users": [{"id": 1, "name": "Ana"}, {"id": 2, "name": "Bruno"}]}
@task
def transform(raw: dict) -> list:
return [u["name"].upper() for u in raw["users"]]
@task
def load(names: list) -> None:
for name in names:
print(f"INSERT INTO users VALUES ('{name}')")
raw = extract()
names = transform(raw)
load(names)
etl_pipeline()multiple_outputs=True — retornar várias chaves como XComs separados
@task(multiple_outputs=True)
def fetch_stats() -> dict:
return {"total": 500, "errors": 12, "skipped": 3}
# stats["total"], stats["errors"], stats["skipped"] são XComs individuais
stats = fetch_stats()
@task
def alert(errors: int):
if errors > 10:
send_alert(f"Muitos erros: {errors}")
alert(stats["errors"])Misturando TaskFlow com operadores tradicionais
from airflow.operators.email import EmailOperator
@dag(...)
def misto():
@task
def gera_relatorio() -> str:
return "Relatório gerado em /tmp/report.csv"
caminho = gera_relatorio()
# Operador tradicional consumindo XCom de task
EmailOperator(
task_id="envia_email",
to="gestor@empresa.com",
subject="Relatório diário",
html_content="{{ task_instance.xcom_pull('gera_relatorio') }}",
)
misto()Operadores Built-in
BashOperator
from airflow.operators.bash import BashOperator
run_script = BashOperator(
task_id="run_etl",
bash_command="python /opt/etl/extract.py --date {{ ds }}", # ds = YYYY-MM-DD da run
env={"DB_URL": "postgresql://localhost/analytics"},
cwd="/opt/etl",
)PythonOperator
from airflow.operators.python import PythonOperator
def processa_dados(execution_date, **context):
print(f"Processando data: {execution_date}")
ti = context["ti"]
upstream_result = ti.xcom_pull(task_ids="task_anterior")
return upstream_result * 2
processa = PythonOperator(
task_id="processa",
python_callable=processa_dados,
op_kwargs={"extra_param": "valor"}, # parâmetros adicionais
)BranchPythonOperator — desvio condicional
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
def escolhe_branch(**context):
hora = context["logical_date"].hour
return "processa_dia" if hora < 18 else "processa_noite"
branch = BranchPythonOperator(
task_id="decide_branch",
python_callable=escolhe_branch,
)
dia = EmptyOperator(task_id="processa_dia")
noite = EmptyOperator(task_id="processa_noite")
fim = EmptyOperator(task_id="fim", trigger_rule="ONE_SUCCESS")
branch >> [dia, noite] >> fimEmptyOperator — placeholder / join
from airflow.operators.empty import EmptyOperator
inicio = EmptyOperator(task_id="inicio")
fim = EmptyOperator(task_id="fim")EmailOperator
from airflow.operators.email import EmailOperator
alerta = EmailOperator(
task_id="alerta_falha",
to=["devops@empresa.com"],
subject="Pipeline falhou em {{ ds }}",
html_content="<p>Verifique o Airflow: {{ run_id }}</p>",
)Operadores de datas com timedelta
from airflow.operators.python import PythonOperator
from datetime import timedelta
task = PythonOperator(
task_id="minha_task",
python_callable=minha_funcao,
execution_timeout=timedelta(hours=2), # mata a task se passar de 2h
retries=3,
retry_delay=timedelta(minutes=10),
retry_exponential_backoff=True,
max_retry_delay=timedelta(hours=1),
)Sensores
Sensores são operadores especiais que aguardam uma condição se tornar verdadeira. Têm dois modos de operação:
| Modo | Comportamento | Quando usar |
|---|---|---|
poke | Mantém worker slot ocupado, dorme entre verificações | Esperas curtas (< 5 min) |
reschedule | Libera o worker, reagenda a task periodicamente | Esperas longas (horas), evita esgotar workers |
Sensor de arquivo (exemplo)
from airflow.sensors.filesystem import FileSensor
aguarda_arquivo = FileSensor(
task_id="aguarda_csv",
filepath="/data/input/{{ ds }}/dados.csv",
poke_interval=60, # verifica a cada 60s
timeout=3600, # falha após 1h sem encontrar
mode="reschedule", # libera worker entre pokes
soft_fail=True, # skipa a task ao invés de falhar
)Sensor HTTP
from airflow.providers.http.sensors.http import HttpSensor
api_pronta = HttpSensor(
task_id="api_health_check",
http_conn_id="minha_api",
endpoint="/health",
request_params={},
response_check=lambda response: response.json()["status"] == "ok",
poke_interval=30,
timeout=300,
)Sensor S3
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
arquivo_s3 = S3KeySensor(
task_id="aguarda_s3",
bucket_name="meu-bucket-dados",
bucket_key="raw/{{ ds }}/vendas.parquet",
aws_conn_id="aws_default",
mode="reschedule",
poke_interval=120,
)Sensor customizado
from airflow.sdk import BaseSensorOperator
from airflow.sdk.bases.sensor import PokeReturnValue
class RegistrosDisponivelSensor(BaseSensorOperator):
template_fields = ("table_name",)
def __init__(self, table_name: str, min_rows: int = 1, **kwargs):
super().__init__(**kwargs)
self.table_name = table_name
self.min_rows = min_rows
def poke(self, context) -> bool | PokeReturnValue:
hook = PostgresHook(postgres_conn_id="postgres_default")
count = hook.get_first(f"SELECT COUNT(*) FROM {self.table_name}")[0]
self.log.info(f"Contagem atual: {count}")
if count >= self.min_rows:
return PokeReturnValue(is_done=True, xcom_value=count)
return FalseXCom — Troca de dados entre tasks
XCom (Cross-Communication) é o mecanismo de passagem de dados entre tasks. Por padrão, armazenado no metadata DB — adequado para valores pequenos (strings, dicts). Para dados grandes, use S3/GCS.
Push / Pull manual
# Push explícito
def pusha(**context):
context["ti"].xcom_push(key="resultado", value={"total": 42})
# Pull explícito
def puxa(**context):
resultado = context["ti"].xcom_pull(
task_ids="task_que_pushei",
key="resultado"
)
print(resultado["total"])
# Pull de run anterior
def puxa_anterior(**context):
return context["ti"].xcom_pull(
task_ids="task_x",
dag_id="meu_dag",
include_prior_dates=True,
)XCom em templates Jinja
BashOperator(
task_id="usa_xcom",
bash_command='echo "Total: {{ ti.xcom_pull(task_ids=\"conta_registros\") }}"',
)TaskFlow API — XCom automático
Com @task, o valor de return é automaticamente pusheado; parâmetros de outra @task são automaticamente pulados:
@task
def conta() -> int:
return 42
@task
def imprime(n: int):
print(f"Recebi: {n}") # n == 42
imprime(conta()) # XCom gerenciado automaticamenteDynamic Task Mapping
Permite criar um número variável de tasks em tempo de execução, baseado em dados.
expand() — expandir sobre uma lista
@task
def lista_arquivos() -> list[str]:
return ["jan.csv", "fev.csv", "mar.csv"]
@task
def processa(arquivo: str) -> dict:
return {"arquivo": arquivo, "linhas": 1000}
@task
def agrega(resultados: list[dict]) -> int:
return sum(r["linhas"] for r in resultados)
arquivos = lista_arquivos()
processados = processa.expand(arquivo=arquivos) # cria 3 tasks em runtime
agrega(processados)partial() + expand() — argumento fixo + argumento variável
@task
def processa_env(arquivo: str, ambiente: str) -> str:
return f"{ambiente}/{arquivo}"
# ambiente fixo, arquivo variável
resultados = processa_env.partial(ambiente="prod").expand(arquivo=["a.csv", "b.csv"])expand_kwargs() — múltiplos argumentos de um dict
@task
def get_configs() -> list[dict]:
return [
{"source": "s3", "bucket": "raw"},
{"source": "gcs", "bucket": "analytics"},
]
@task
def ingest(source: str, bucket: str):
print(f"Ingestando {source}://{bucket}")
ingest.expand_kwargs(get_configs()) # cria 2 tasksConnections
Conexões armazenam credenciais e endpoints de sistemas externos. Gerenciadas pela UI (Admin > Connections) ou via env var.
Via variável de ambiente (recomendado para produção)
# Formato: AIRFLOW_CONN_<CONN_ID_EM_MAIÚSCULAS>
export AIRFLOW_CONN_POSTGRES_DEFAULT='postgresql://user:pass@localhost:5432/mydb'
export AIRFLOW_CONN_AWS_DEFAULT='{"conn_type":"aws","login":"AKID","password":"SECRET","extra":{"region_name":"us-east-1"}}'Via CLI
airflow connections add 'postgres_analytics' \
--conn-type 'postgres' \
--conn-host 'localhost' \
--conn-login 'airflow' \
--conn-password 'airflow' \
--conn-port 5432 \
--conn-schema 'analytics'Usando em código
from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection("postgres_analytics")
print(conn.host, conn.login, conn.password)Variables
Variáveis são pares chave-valor globais. Úteis para configurações que mudam sem alterar o código.
from airflow.models import Variable
# Leitura
bucket = Variable.get("s3_bucket_analytics")
config = Variable.get("pipeline_config", deserialize_json=True) # dict automático
# Com valor padrão
env = Variable.get("environment", default_var="dev")
# Escrita (programático)
Variable.set("ultima_execucao", "2024-07-15")
Variable.set("config", {"batch_size": 1000}, serialize_json=True)Via env var (sem acesso ao metadata DB no startup)
export AIRFLOW_VAR_S3_BUCKET='meu-bucket-producao'
# acessada como Variable.get("s3_bucket")Hooks
Hooks são wrappers de conexão com sistemas externos. Encapsulam a lógica de autenticação e fornecem métodos de alto nível. Operadores usam hooks internamente.
PostgresHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id="postgres_analytics")
# Executar query
hook.run("INSERT INTO logs VALUES (NOW(), 'pipeline iniciado')")
# Retornar resultados
rows = hook.get_records("SELECT id, name FROM users WHERE active = true")
first = hook.get_first("SELECT COUNT(*) FROM orders")
# Pandas DataFrame
df = hook.get_pandas_df("SELECT * FROM vendas WHERE data = '{{ ds }}'")
# Inserção em lote
hook.insert_rows(
table="staging.orders",
rows=[("A001", 150.0), ("A002", 230.0)],
target_fields=["order_id", "value"],
)S3Hook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
hook = S3Hook(aws_conn_id="aws_default")
# Upload
hook.load_file(
filename="/tmp/dados.csv",
key="raw/2024-07-15/dados.csv",
bucket_name="meu-bucket",
replace=True,
)
# Download
conteudo = hook.read_key(key="raw/2024-07-15/dados.csv", bucket_name="meu-bucket")
# Listar arquivos
chaves = hook.list_keys(bucket_name="meu-bucket", prefix="raw/2024-07-15/")
# Verificar existência
existe = hook.check_for_key(key="raw/dados.csv", bucket_name="meu-bucket")HttpHook
from airflow.providers.http.hooks.http import HttpHook
hook = HttpHook(method="GET", http_conn_id="minha_api")
response = hook.run(
endpoint="/v1/data",
headers={"Authorization": "Bearer {{ var.value.api_token }}"},
)
dados = response.json()Executores
O Executor define como as tasks são executadas. É uma configuração do Scheduler, não um processo separado (exceto CeleryExecutor que usa workers externos).
| Executor | Workers | Quando usar |
|---|---|---|
SequentialExecutor | 1 task por vez (mesmo processo) | Testes locais, SQLite |
LocalExecutor | Subprocessos locais, paralelo | Dev/staging com PostgreSQL |
CeleryExecutor | Workers externos via Celery | Produção escalável horizontalmente |
KubernetesExecutor | 1 Pod por task | Produção com isolamento, k8s disponível |
CeleryKubernetesExecutor | Celery para tasks leves, K8s para pesadas | Híbrido |
Configurar executor
# airflow.cfg
[core]
executor = LocalExecutor
[database]
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost/airflow# via env var
export AIRFLOW__CORE__EXECUTOR=CeleryExecutor
export AIRFLOW__CELERY__BROKER_URL=redis://localhost:6379/0
export AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow@localhost/airflowPools
Pools limitam a concorrência de tasks para proteger recursos (ex: conexões de banco, chamadas de API).
# Criar pool via CLI
airflow pools set "db_connections" 10 "Limita conexões simultâneas ao PostgreSQL"
# Atribuir task ao pool
task = PythonOperator(
task_id="query_pesada",
python_callable=run_query,
pool="db_connections", # ocupa 1 slot do pool
pool_slots=2, # pode ocupar mais slots se necessário
)Variáveis de Contexto (Templates Jinja)
Dentro de parâmetros com suporte a templates (bash_command, sql, email_content, etc.):
| Variável | Valor | Exemplo |
|---|---|---|
{{ ds }} | Data da run (YYYY-MM-DD) | 2024-07-15 |
{{ ds_nodash }} | Data sem hífens | 20240715 |
{{ ts }} | Timestamp ISO 8601 | 2024-07-15T06:00:00+00:00 |
{{ logical_date }} | datetime da run | objeto datetime |
{{ prev_ds }} | Data da run anterior | 2024-07-14 |
{{ next_ds }} | Data da próxima run | 2024-07-16 |
{{ run_id }} | ID único da run | scheduled__2024-07-15T06:00:00+00:00 |
{{ dag_run.conf }} | Config passada no trigger manual | {"param": "valor"} |
{{ var.value.nome }} | Valor de Variable | valor da variável nome |
{{ var.json.nome }} | Variable como JSON | dict/list da variável nome |
{{ conn.id.host }} | Host de uma Connection | valor do campo host |
{{ ti }} | TaskInstance atual | objeto com .xcom_pull() |
Providers
Providers são pacotes que estendem o Airflow com operadores, hooks e sensores para serviços externos.
# Instalar providers
pip install apache-airflow-providers-amazon # AWS
pip install apache-airflow-providers-google # GCP
pip install apache-airflow-providers-apache-spark # Spark
pip install apache-airflow-providers-postgres # PostgreSQL
pip install apache-airflow-providers-slack # Slack notifications
pip install apache-airflow-providers-http # HTTP
pip install apache-airflow-providers-ssh # SSH/SFTP
pip install apache-airflow-providers-databricks # Databricks
pip install apache-airflow-providers-apache-kafka # KafkaOperadores AWS (provider amazon)
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.emr import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
glue_job = GlueJobOperator(
task_id="processa_spark",
job_name="meu-glue-job",
aws_conn_id="aws_default",
region_name="us-east-1",
)Operadores GCP (provider google)
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
load_bq = GCSToBigQueryOperator(
task_id="gcs_to_bq",
bucket="meu-bucket",
source_objects=["raw/{{ ds }}/*.parquet"],
destination_project_dataset_table="projeto.dataset.tabela",
gcp_conn_id="google_cloud_default",
write_disposition="WRITE_TRUNCATE",
)Operadores Spark
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
spark_job = SparkSubmitOperator(
task_id="spark_transform",
application="/opt/spark-jobs/transform.py",
conn_id="spark_default",
conf={"spark.executor.memory": "4g"},
application_args=["--date", "{{ ds }}"],
)Setup Local (Docker Compose)
Modo mais simples — airflow standalone
pip install apache-airflow
export AIRFLOW_HOME=~/airflow
airflow standalone
# UI em http://localhost:8080 — usuário: admin, senha: impressa no terminalDocker Compose oficial
# Baixar o docker-compose.yaml oficial
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
# Criar diretórios necessários
mkdir -p ./dags ./logs ./plugins ./config
# Ajustar UID (Linux)
echo -e "AIRFLOW_UID=$(id -u)" > .env
# Inicializar banco e criar usuário admin
docker compose up airflow-init
# Subir todos os serviços
docker compose up -d
# UI em http://localhost:8080 — airflow/airflowServiços incluídos no docker-compose padrão
| Serviço | Porta | Finalidade |
|---|---|---|
airflow-webserver | 8080 | UI e REST API |
airflow-scheduler | — | Scheduler + Executor |
airflow-triggerer | — | Operadores deferrable |
airflow-init | — | Inicialização e migração do DB |
postgres | 5432 | Metadata DB |
redis | 6379 | Broker do Celery (nessa imagem usa Celery) |
CLI — Comandos Essenciais
# DAGs
airflow dags list # listar todos os DAGs
airflow dags trigger meu_dag # disparar manualmente
airflow dags trigger meu_dag --conf '{"param": "valor"}' # com config
airflow dags pause meu_dag # pausar
airflow dags unpause meu_dag # reativar
airflow dags backfill meu_dag \
--start-date 2024-01-01 \
--end-date 2024-01-31 # reprocessar período (Airflow 2.x)
airflow dags show meu_dag # exibir grafo em ASCII
# Tasks
airflow tasks list meu_dag # listar tasks
airflow tasks test meu_dag task_id 2024-07-15 # testar task sem gravar no DB
airflow tasks run meu_dag task_id run_id # executar task específica
# Runs
airflow dags list-runs -d meu_dag # listar runs do DAG
airflow dags state meu_dag 2024-07-15 # estado da run de uma data
# Connections & Variables
airflow connections list
airflow connections add ...
airflow variables list
airflow variables get s3_bucket
airflow variables set s3_bucket "meu-novo-bucket"
# Usuários
airflow users create \
--username admin --firstname Admin --lastname User \
--role Admin --email admin@empresa.com --password admin
# Banco
airflow db init # inicializar banco (primeira vez)
airflow db migrate # aplicar migrações (após upgrade)
airflow db check # verificar conectividadeREST API
A partir do Airflow 2.0, há uma REST API estável:
BASE="http://localhost:8080/api/v1"
AUTH="-u airflow:airflow"
# Listar DAGs
curl $AUTH "$BASE/dags"
# Trigger manual
curl $AUTH -X POST "$BASE/dags/meu_dag/dagRuns" \
-H "Content-Type: application/json" \
-d '{"conf": {"param": "valor"}}'
# Status de uma run
curl $AUTH "$BASE/dags/meu_dag/dagRuns"
# Limpar e re-executar task
curl $AUTH -X POST "$BASE/dags/meu_dag/clearTaskInstances" \
-H "Content-Type: application/json" \
-d '{"task_ids": ["task_falhou"], "start_date": "2024-07-15T00:00:00Z"}'Airflow 3.0 — Principais Mudanças
Airflow 3.0 (lançado em 2025) traz mudanças arquiteturais significativas:
| Mudança | Detalhes |
|---|---|
| Task SDK separado | airflow.sdk — SDK leve para tasks sem dependência pesada do core |
| Backfill gerenciado pelo Scheduler (AIP-78) | Não é mais um processo CLI separado; backfills seguem o mesmo fluxo de scheduling regular |
| DAG versioning | Múltiplas versões do mesmo DAG são rastreadas — runs antigas referenciam a versão do DAG que as criou |
| Asset-based scheduling | DAGs podem ser disparados por atualização de “Assets” (dados) além de schedule por tempo |
| Novo modelo de execução | Separação clara entre dag-processor, scheduler e api-server |
| API Server | Substitui o webserver para a REST API (WebServer foca na UI) |
logical_date vs execution_date | execution_date deprecado; usar logical_date |
| Remoção de SubDAGs | Use TaskGroup em vez de SubDAG (já recomendado desde 2.x) |
Asset-based scheduling (Airflow 3.0)
from airflow.sdk import Asset, dag, task
# Definir um asset
vendas_raw = Asset("s3://bucket/raw/vendas/")
@dag(schedule=vendas_raw) # dispara quando o asset é atualizado
def processa_vendas():
@task
def transforma():
pass
transforma()
@dag(schedule="@daily")
def ingere_vendas():
@task(outlets=[vendas_raw]) # marca o asset como atualizado
def extrai():
return "dados"
extrai()TaskGroups — Organização Visual
from airflow.utils.task_group import TaskGroup
with DAG("pipeline_agrupado", ...) as dag:
with TaskGroup("ingestao") as ingestao:
extrai_api = PythonOperator(task_id="extrai_api", ...)
extrai_db = PythonOperator(task_id="extrai_db", ...)
[extrai_api, extrai_db]
with TaskGroup("transformacao") as transformacao:
limpa = PythonOperator(task_id="limpa", ...)
enriquece = PythonOperator(task_id="enriquece", ...)
limpa >> enriquece
with TaskGroup("carga") as carga:
carrega_dw = PythonOperator(task_id="carrega_dw", ...)
ingestao >> transformacao >> cargaCallbacks e Alertas
from airflow.utils.email import send_email
def on_failure(context):
ti = context["task_instance"]
send_email(
to=["ops@empresa.com"],
subject=f"[FALHA] {ti.dag_id}.{ti.task_id}",
html_content=f"<p>Task {ti.task_id} falhou em {ti.execution_date}</p>"
f"<p>Log: {ti.log_url}</p>",
)
def on_retry(context):
ti = context["task_instance"]
print(f"Retry #{ti.try_number} para {ti.task_id}")
def on_sla_miss(dag, task_list, blocking_task_list, slas, blocking_tis):
send_email(to=["ops@empresa.com"], subject="SLA violado!", html_content="...")
task = PythonOperator(
task_id="task_critica",
python_callable=minha_funcao,
on_failure_callback=on_failure,
on_retry_callback=on_retry,
sla=timedelta(hours=2), # alerta se não terminar em 2h
)
# SLA miss callback fica no DAG
dag = DAG(
"meu_dag",
sla_miss_callback=on_sla_miss,
...
)Deferrable Operators (Async)
Operadores deferráveis liberam o worker slot enquanto esperam por um evento externo (I/O), usando o Triggerer ao invés de polling.
from airflow.providers.amazon.aws.operators.emr import EmrServerlessStartJobOperator
# Este operador é deferrable — não bloqueia worker durante execução do job
emr_job = EmrServerlessStartJobOperator(
task_id="emr_serverless",
application_id="{{ var.value.emr_app_id }}",
execution_role_arn="{{ var.value.emr_role_arn }}",
job_driver={"sparkSubmit": {"entryPoint": "s3://bucket/job.py"}},
deferrable=True, # usa Triggerer ao invés de polling no worker
)Secrets Backend
Para não armazenar senhas no metadata DB, use um secrets backend:
# HashiCorp Vault
export AIRFLOW__SECRETS__BACKEND=airflow.providers.hashicorp.secrets.vault.VaultBackend
export AIRFLOW__SECRETS__BACKEND_KWARGS='{"connections_path": "airflow/connections", "variables_path": "airflow/variables", "url": "http://vault:8200", "token": "root"}'
# AWS Secrets Manager
export AIRFLOW__SECRETS__BACKEND=airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
export AIRFLOW__SECRETS__BACKEND_KWARGS='{"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables"}'Boas Práticas
| Prática | Por quê |
|---|---|
catchup=False por padrão | Evita criar dezenas de runs históricas ao ativar um DAG pela primeira vez |
| Tasks idempotentes | Re-execuções não devem duplicar dados — use INSERT OR REPLACE, TRUNCATE antes de inserir |
Não usar Variable.get() no topo do arquivo | Executado em cada parse do DAG; prefira dentro do python_callable |
| Não importar bibliotecas pesadas no top-level | Slows down DAG parsing; importe dentro da função |
Usar mode="reschedule" em sensores de longa espera | Libera worker slots; poke bloqueia um worker indefinidamente |
| Dados entre tasks via XCom devem ser pequenos | XCom é armazenado no metadata DB; dados grandes vão para S3/GCS |
Usar pool para limitar concorrência | Previne sobrecarga em banco de dados ou APIs com rate limit |
| Secrets nunca em código — usar Connections ou Variables | Credenciais hardcodadas em DAGs viram riscos de segurança no git |
| Um DAG por arquivo | Facilita debugging e reduz tempo de parse |
Usar tags nos DAGs | Facilita filtros na UI em ambientes com muitos DAGs |
max_active_runs=1 para DAGs com estado externo | Evita dois runs simultâneos modificando o mesmo dado |
Testar localmente com airflow tasks test | Executa sem gravar no DB, ideal para desenvolvimento |