Data

Apache Airflow

Referência completa de Apache Airflow — DAGs, TaskFlow API, operadores, sensores, XCom, executores, providers e Airflow 3.x

Apache Airflow

O que é

Airflow é uma plataforma de orquestração de workflows onde os pipelines são definidos como código Python. Em vez de disparar scripts com cron e torcer para que funcionem, você escreve um DAG (Directed Acyclic Graph) que descreve o quê, quando e em que ordem cada etapa executa — com retry automático, alertas de falha e histórico visual na UI.

A diferença central para um cron job: Airflow gerencia dependências entre tarefas, rastreia estado, re-executa tarefas com falha e permite inspecionar cada execução histórica. Um cron job dispara cego; o Airflow orquestra com consciência.

Casos de uso típicos:

  • ETL/ELT — extrair dados de APIs, transformar e carregar no data warehouse
  • ML pipelines — retreinar modelos periodicamente
  • Relatórios agendados
  • Processos de negócio com dependências complexas (ex: só enviar relatório se a ingestão terminou)

Arquitetura

┌─────────────────────────────────────────────────────────────┐
│                        Airflow Cluster                      │
│                                                             │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────────┐  │
│  │  Webserver   │  │  Scheduler   │  │  DAG Processor   │  │
│  │  (UI/API)    │  │  (agenda +   │  │  (parseia DAGs   │  │
│  │  port 8080   │  │   submete)   │  │   → metadata DB) │  │
│  └──────────────┘  └──────────────┘  └──────────────────┘  │
│                           │                                 │
│                    ┌──────────────┐                         │
│                    │   Executor   │  (dentro do scheduler)  │
│                    │ Local/Celery │                         │
│                    │ /Kubernetes  │                         │
│                    └──────────────┘                         │
│                           │                                 │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────────┐  │
│  │  Triggerer   │  │  Worker(s)   │  │  Metadata DB     │  │
│  │  (deferrable │  │  (executam   │  │  (PostgreSQL /   │  │
│  │   operators) │  │   as tasks)  │  │   MySQL)         │  │
│  └──────────────┘  └──────────────┘  └──────────────────┘  │
└─────────────────────────────────────────────────────────────┘
ComponenteFunção
SchedulerLê DAGs, decide quando criar runs, submete tasks ao executor
DAG ProcessorParseia arquivos Python do dags/ e serializa no metadata DB
ExecutorConfig do scheduler — define como tasks são executadas (local, celery, k8s)
WebserverUI e REST API — inspecionar, disparar, monitorar DAGs e runs
TriggererProcesso separado para operadores deferrable (async I/O sem bloquear worker)
WorkerProcesso que efetivamente executa o código de uma task (Celery/K8s)
Metadata DBPostgreSQL ou MySQL — armazena DAGs, runs, task instances, variáveis, conexões

DAGs

Um DAG é um grafo de tarefas sem ciclos. Cada arquivo Python dentro da pasta dags/ é inspecionado pelo DAG Processor.

Forma clássica (with DAG(...))

from airflow.sdk import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "rafael",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
    "email": ["ops@empresa.com"],
}

with DAG(
    dag_id="meu_pipeline",
    description="Exemplo de DAG completo",
    start_date=datetime(2024, 1, 1),
    schedule="0 6 * * *",        # todo dia às 06:00 UTC
    catchup=False,                # não executa runs passadas ao ativar
    max_active_runs=1,            # apenas 1 run simultânea
    default_args=default_args,
    tags=["ingestao", "prod"],
) as dag:

    t1 = BashOperator(task_id="extrai_dados", bash_command="python extract.py")
    t2 = BashOperator(task_id="transforma",   bash_command="python transform.py")
    t3 = BashOperator(task_id="carrega",      bash_command="python load.py")

    t1 >> t2 >> t3

Forma com decorator @dag

from airflow.sdk import dag, task
from datetime import datetime

@dag(
    dag_id="pipeline_decorado",
    start_date=datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
)
def pipeline():
    @task
    def extrai():
        return {"registros": 1000}

    @task
    def transforma(dados: dict):
        return [r * 2 for r in range(dados["registros"])]

    @task
    def carrega(dados: list):
        print(f"Carregando {len(dados)} registros")

    dados = extrai()
    transformados = transforma(dados)
    carrega(transformados)

pipeline()  # instancia o DAG — essa linha é obrigatória

Parâmetros importantes do DAG

ParâmetroTipoDescrição
dag_idstrIdentificador único — usado em URLs e CLI
start_datedatetimeData da primeira run (não necessariamente hoje)
schedulestr | timedelta | NoneCron, @daily, timedelta(hours=1) ou None
catchupboolSe True, cria runs para datas passadas entre start_date e hoje
max_active_runsintLimita concorrência de runs do mesmo DAG
max_active_tasksintLimita tasks simultâneas dentro de uma run
default_argsdictValores padrão para todas as tasks do DAG
tagslist[str]Filtros na UI
on_failure_callbackcallableCallback de falha no nível do DAG

Schedules comuns

schedule="0 6 * * *"      # cron: todo dia às 06:00
schedule="@daily"          # alias para "0 0 * * *"
schedule="@hourly"         # alias para "0 * * * *"
schedule="@weekly"         # alias para "0 0 * * 0"
schedule=timedelta(hours=6)  # a cada 6 horas
schedule=None              # só disparo manual
schedule="@once"           # executa uma vez ao ser ativado

Tasks e Dependências

Operadores de dependência

# Sequencial
t1 >> t2 >> t3

# Paralelo depois de t1
t1 >> [t2, t3] >> t4

# Equivalente com set_upstream/set_downstream
t2.set_upstream(t1)
t2.set_downstream(t3)

# Múltiplas dependências
[t1, t2] >> t3  # t3 espera t1 E t2

trigger_rule — quando uma task é disparada

Por padrão, uma task só roda quando todos os pais terminam com sucesso. Isso pode ser alterado:

from airflow.utils.trigger_rule import TriggerRule

task_final = PythonOperator(
    task_id="finaliza",
    python_callable=minha_funcao,
    trigger_rule=TriggerRule.ALL_DONE,   # roda independente de sucesso/falha
)
RegraDispara quando
ALL_SUCCESS (padrão)Todos os pais sucesso
ALL_FAILEDTodos os pais falharam
ALL_DONETodos os pais terminaram (sucesso, falha ou skip)
ONE_SUCCESSPelo menos um pai com sucesso
ONE_FAILEDPelo menos um pai falhou
NONE_FAILEDNenhum pai falhou (sucesso ou skip aceitos)
NONE_SKIPPEDNenhum pai foi pulado

TaskFlow API

A TaskFlow API usa decoradores Python para definir tasks. Ela elimina o boilerplate de XCom e calcula dependências automaticamente.

Básico

from airflow.sdk import dag, task
from datetime import datetime

@dag(schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False)
def etl_pipeline():

    @task
    def extract() -> dict:
        return {"users": [{"id": 1, "name": "Ana"}, {"id": 2, "name": "Bruno"}]}

    @task
    def transform(raw: dict) -> list:
        return [u["name"].upper() for u in raw["users"]]

    @task
    def load(names: list) -> None:
        for name in names:
            print(f"INSERT INTO users VALUES ('{name}')")

    raw = extract()
    names = transform(raw)
    load(names)

etl_pipeline()

multiple_outputs=True — retornar várias chaves como XComs separados

@task(multiple_outputs=True)
def fetch_stats() -> dict:
    return {"total": 500, "errors": 12, "skipped": 3}

# stats["total"], stats["errors"], stats["skipped"] são XComs individuais
stats = fetch_stats()

@task
def alert(errors: int):
    if errors > 10:
        send_alert(f"Muitos erros: {errors}")

alert(stats["errors"])

Misturando TaskFlow com operadores tradicionais

from airflow.operators.email import EmailOperator

@dag(...)
def misto():
    @task
    def gera_relatorio() -> str:
        return "Relatório gerado em /tmp/report.csv"

    caminho = gera_relatorio()

    # Operador tradicional consumindo XCom de task
    EmailOperator(
        task_id="envia_email",
        to="gestor@empresa.com",
        subject="Relatório diário",
        html_content="{{ task_instance.xcom_pull('gera_relatorio') }}",
    )

misto()

Operadores Built-in

BashOperator

from airflow.operators.bash import BashOperator

run_script = BashOperator(
    task_id="run_etl",
    bash_command="python /opt/etl/extract.py --date {{ ds }}",  # ds = YYYY-MM-DD da run
    env={"DB_URL": "postgresql://localhost/analytics"},
    cwd="/opt/etl",
)

PythonOperator

from airflow.operators.python import PythonOperator

def processa_dados(execution_date, **context):
    print(f"Processando data: {execution_date}")
    ti = context["ti"]
    upstream_result = ti.xcom_pull(task_ids="task_anterior")
    return upstream_result * 2

processa = PythonOperator(
    task_id="processa",
    python_callable=processa_dados,
    op_kwargs={"extra_param": "valor"},  # parâmetros adicionais
)

BranchPythonOperator — desvio condicional

from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator

def escolhe_branch(**context):
    hora = context["logical_date"].hour
    return "processa_dia" if hora < 18 else "processa_noite"

branch = BranchPythonOperator(
    task_id="decide_branch",
    python_callable=escolhe_branch,
)

dia    = EmptyOperator(task_id="processa_dia")
noite  = EmptyOperator(task_id="processa_noite")
fim    = EmptyOperator(task_id="fim", trigger_rule="ONE_SUCCESS")

branch >> [dia, noite] >> fim

EmptyOperator — placeholder / join

from airflow.operators.empty import EmptyOperator

inicio = EmptyOperator(task_id="inicio")
fim    = EmptyOperator(task_id="fim")

EmailOperator

from airflow.operators.email import EmailOperator

alerta = EmailOperator(
    task_id="alerta_falha",
    to=["devops@empresa.com"],
    subject="Pipeline falhou em {{ ds }}",
    html_content="<p>Verifique o Airflow: {{ run_id }}</p>",
)

Operadores de datas com timedelta

from airflow.operators.python import PythonOperator
from datetime import timedelta

task = PythonOperator(
    task_id="minha_task",
    python_callable=minha_funcao,
    execution_timeout=timedelta(hours=2),  # mata a task se passar de 2h
    retries=3,
    retry_delay=timedelta(minutes=10),
    retry_exponential_backoff=True,
    max_retry_delay=timedelta(hours=1),
)

Sensores

Sensores são operadores especiais que aguardam uma condição se tornar verdadeira. Têm dois modos de operação:

ModoComportamentoQuando usar
pokeMantém worker slot ocupado, dorme entre verificaçõesEsperas curtas (< 5 min)
rescheduleLibera o worker, reagenda a task periodicamenteEsperas longas (horas), evita esgotar workers

Sensor de arquivo (exemplo)

from airflow.sensors.filesystem import FileSensor

aguarda_arquivo = FileSensor(
    task_id="aguarda_csv",
    filepath="/data/input/{{ ds }}/dados.csv",
    poke_interval=60,        # verifica a cada 60s
    timeout=3600,            # falha após 1h sem encontrar
    mode="reschedule",       # libera worker entre pokes
    soft_fail=True,          # skipa a task ao invés de falhar
)

Sensor HTTP

from airflow.providers.http.sensors.http import HttpSensor

api_pronta = HttpSensor(
    task_id="api_health_check",
    http_conn_id="minha_api",
    endpoint="/health",
    request_params={},
    response_check=lambda response: response.json()["status"] == "ok",
    poke_interval=30,
    timeout=300,
)

Sensor S3

from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

arquivo_s3 = S3KeySensor(
    task_id="aguarda_s3",
    bucket_name="meu-bucket-dados",
    bucket_key="raw/{{ ds }}/vendas.parquet",
    aws_conn_id="aws_default",
    mode="reschedule",
    poke_interval=120,
)

Sensor customizado

from airflow.sdk import BaseSensorOperator
from airflow.sdk.bases.sensor import PokeReturnValue

class RegistrosDisponivelSensor(BaseSensorOperator):
    template_fields = ("table_name",)

    def __init__(self, table_name: str, min_rows: int = 1, **kwargs):
        super().__init__(**kwargs)
        self.table_name = table_name
        self.min_rows = min_rows

    def poke(self, context) -> bool | PokeReturnValue:
        hook = PostgresHook(postgres_conn_id="postgres_default")
        count = hook.get_first(f"SELECT COUNT(*) FROM {self.table_name}")[0]
        self.log.info(f"Contagem atual: {count}")
        if count >= self.min_rows:
            return PokeReturnValue(is_done=True, xcom_value=count)
        return False

XCom — Troca de dados entre tasks

XCom (Cross-Communication) é o mecanismo de passagem de dados entre tasks. Por padrão, armazenado no metadata DB — adequado para valores pequenos (strings, dicts). Para dados grandes, use S3/GCS.

Push / Pull manual

# Push explícito
def pusha(**context):
    context["ti"].xcom_push(key="resultado", value={"total": 42})

# Pull explícito
def puxa(**context):
    resultado = context["ti"].xcom_pull(
        task_ids="task_que_pushei",
        key="resultado"
    )
    print(resultado["total"])

# Pull de run anterior
def puxa_anterior(**context):
    return context["ti"].xcom_pull(
        task_ids="task_x",
        dag_id="meu_dag",
        include_prior_dates=True,
    )

XCom em templates Jinja

BashOperator(
    task_id="usa_xcom",
    bash_command='echo "Total: {{ ti.xcom_pull(task_ids=\"conta_registros\") }}"',
)

TaskFlow API — XCom automático

Com @task, o valor de return é automaticamente pusheado; parâmetros de outra @task são automaticamente pulados:

@task
def conta() -> int:
    return 42

@task
def imprime(n: int):
    print(f"Recebi: {n}")  # n == 42

imprime(conta())  # XCom gerenciado automaticamente

Dynamic Task Mapping

Permite criar um número variável de tasks em tempo de execução, baseado em dados.

expand() — expandir sobre uma lista

@task
def lista_arquivos() -> list[str]:
    return ["jan.csv", "fev.csv", "mar.csv"]

@task
def processa(arquivo: str) -> dict:
    return {"arquivo": arquivo, "linhas": 1000}

@task
def agrega(resultados: list[dict]) -> int:
    return sum(r["linhas"] for r in resultados)

arquivos = lista_arquivos()
processados = processa.expand(arquivo=arquivos)  # cria 3 tasks em runtime
agrega(processados)

partial() + expand() — argumento fixo + argumento variável

@task
def processa_env(arquivo: str, ambiente: str) -> str:
    return f"{ambiente}/{arquivo}"

# ambiente fixo, arquivo variável
resultados = processa_env.partial(ambiente="prod").expand(arquivo=["a.csv", "b.csv"])

expand_kwargs() — múltiplos argumentos de um dict

@task
def get_configs() -> list[dict]:
    return [
        {"source": "s3", "bucket": "raw"},
        {"source": "gcs", "bucket": "analytics"},
    ]

@task
def ingest(source: str, bucket: str):
    print(f"Ingestando {source}://{bucket}")

ingest.expand_kwargs(get_configs())  # cria 2 tasks

Connections

Conexões armazenam credenciais e endpoints de sistemas externos. Gerenciadas pela UI (Admin > Connections) ou via env var.

Via variável de ambiente (recomendado para produção)

# Formato: AIRFLOW_CONN_<CONN_ID_EM_MAIÚSCULAS>
export AIRFLOW_CONN_POSTGRES_DEFAULT='postgresql://user:pass@localhost:5432/mydb'
export AIRFLOW_CONN_AWS_DEFAULT='{"conn_type":"aws","login":"AKID","password":"SECRET","extra":{"region_name":"us-east-1"}}'

Via CLI

airflow connections add 'postgres_analytics' \
    --conn-type 'postgres' \
    --conn-host 'localhost' \
    --conn-login 'airflow' \
    --conn-password 'airflow' \
    --conn-port 5432 \
    --conn-schema 'analytics'

Usando em código

from airflow.hooks.base import BaseHook

conn = BaseHook.get_connection("postgres_analytics")
print(conn.host, conn.login, conn.password)

Variables

Variáveis são pares chave-valor globais. Úteis para configurações que mudam sem alterar o código.

from airflow.models import Variable

# Leitura
bucket = Variable.get("s3_bucket_analytics")
config = Variable.get("pipeline_config", deserialize_json=True)  # dict automático

# Com valor padrão
env = Variable.get("environment", default_var="dev")

# Escrita (programático)
Variable.set("ultima_execucao", "2024-07-15")
Variable.set("config", {"batch_size": 1000}, serialize_json=True)

Via env var (sem acesso ao metadata DB no startup)

export AIRFLOW_VAR_S3_BUCKET='meu-bucket-producao'
# acessada como Variable.get("s3_bucket")

Hooks

Hooks são wrappers de conexão com sistemas externos. Encapsulam a lógica de autenticação e fornecem métodos de alto nível. Operadores usam hooks internamente.

PostgresHook

from airflow.providers.postgres.hooks.postgres import PostgresHook

hook = PostgresHook(postgres_conn_id="postgres_analytics")

# Executar query
hook.run("INSERT INTO logs VALUES (NOW(), 'pipeline iniciado')")

# Retornar resultados
rows = hook.get_records("SELECT id, name FROM users WHERE active = true")
first = hook.get_first("SELECT COUNT(*) FROM orders")

# Pandas DataFrame
df = hook.get_pandas_df("SELECT * FROM vendas WHERE data = '{{ ds }}'")

# Inserção em lote
hook.insert_rows(
    table="staging.orders",
    rows=[("A001", 150.0), ("A002", 230.0)],
    target_fields=["order_id", "value"],
)

S3Hook

from airflow.providers.amazon.aws.hooks.s3 import S3Hook

hook = S3Hook(aws_conn_id="aws_default")

# Upload
hook.load_file(
    filename="/tmp/dados.csv",
    key="raw/2024-07-15/dados.csv",
    bucket_name="meu-bucket",
    replace=True,
)

# Download
conteudo = hook.read_key(key="raw/2024-07-15/dados.csv", bucket_name="meu-bucket")

# Listar arquivos
chaves = hook.list_keys(bucket_name="meu-bucket", prefix="raw/2024-07-15/")

# Verificar existência
existe = hook.check_for_key(key="raw/dados.csv", bucket_name="meu-bucket")

HttpHook

from airflow.providers.http.hooks.http import HttpHook

hook = HttpHook(method="GET", http_conn_id="minha_api")
response = hook.run(
    endpoint="/v1/data",
    headers={"Authorization": "Bearer {{ var.value.api_token }}"},
)
dados = response.json()

Executores

O Executor define como as tasks são executadas. É uma configuração do Scheduler, não um processo separado (exceto CeleryExecutor que usa workers externos).

ExecutorWorkersQuando usar
SequentialExecutor1 task por vez (mesmo processo)Testes locais, SQLite
LocalExecutorSubprocessos locais, paraleloDev/staging com PostgreSQL
CeleryExecutorWorkers externos via CeleryProdução escalável horizontalmente
KubernetesExecutor1 Pod por taskProdução com isolamento, k8s disponível
CeleryKubernetesExecutorCelery para tasks leves, K8s para pesadasHíbrido

Configurar executor

# airflow.cfg
[core]
executor = LocalExecutor

[database]
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost/airflow
# via env var
export AIRFLOW__CORE__EXECUTOR=CeleryExecutor
export AIRFLOW__CELERY__BROKER_URL=redis://localhost:6379/0
export AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow@localhost/airflow

Pools

Pools limitam a concorrência de tasks para proteger recursos (ex: conexões de banco, chamadas de API).

# Criar pool via CLI
airflow pools set "db_connections" 10 "Limita conexões simultâneas ao PostgreSQL"

# Atribuir task ao pool
task = PythonOperator(
    task_id="query_pesada",
    python_callable=run_query,
    pool="db_connections",       # ocupa 1 slot do pool
    pool_slots=2,                # pode ocupar mais slots se necessário
)

Variáveis de Contexto (Templates Jinja)

Dentro de parâmetros com suporte a templates (bash_command, sql, email_content, etc.):

VariávelValorExemplo
{{ ds }}Data da run (YYYY-MM-DD)2024-07-15
{{ ds_nodash }}Data sem hífens20240715
{{ ts }}Timestamp ISO 86012024-07-15T06:00:00+00:00
{{ logical_date }}datetime da runobjeto datetime
{{ prev_ds }}Data da run anterior2024-07-14
{{ next_ds }}Data da próxima run2024-07-16
{{ run_id }}ID único da runscheduled__2024-07-15T06:00:00+00:00
{{ dag_run.conf }}Config passada no trigger manual{"param": "valor"}
{{ var.value.nome }}Valor de Variablevalor da variável nome
{{ var.json.nome }}Variable como JSONdict/list da variável nome
{{ conn.id.host }}Host de uma Connectionvalor do campo host
{{ ti }}TaskInstance atualobjeto com .xcom_pull()

Providers

Providers são pacotes que estendem o Airflow com operadores, hooks e sensores para serviços externos.

# Instalar providers
pip install apache-airflow-providers-amazon         # AWS
pip install apache-airflow-providers-google         # GCP
pip install apache-airflow-providers-apache-spark   # Spark
pip install apache-airflow-providers-postgres       # PostgreSQL
pip install apache-airflow-providers-slack          # Slack notifications
pip install apache-airflow-providers-http           # HTTP
pip install apache-airflow-providers-ssh            # SSH/SFTP
pip install apache-airflow-providers-databricks     # Databricks
pip install apache-airflow-providers-apache-kafka   # Kafka

Operadores AWS (provider amazon)

from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.emr import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator

glue_job = GlueJobOperator(
    task_id="processa_spark",
    job_name="meu-glue-job",
    aws_conn_id="aws_default",
    region_name="us-east-1",
)

Operadores GCP (provider google)

from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

load_bq = GCSToBigQueryOperator(
    task_id="gcs_to_bq",
    bucket="meu-bucket",
    source_objects=["raw/{{ ds }}/*.parquet"],
    destination_project_dataset_table="projeto.dataset.tabela",
    gcp_conn_id="google_cloud_default",
    write_disposition="WRITE_TRUNCATE",
)

Operadores Spark

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

spark_job = SparkSubmitOperator(
    task_id="spark_transform",
    application="/opt/spark-jobs/transform.py",
    conn_id="spark_default",
    conf={"spark.executor.memory": "4g"},
    application_args=["--date", "{{ ds }}"],
)

Setup Local (Docker Compose)

Modo mais simples — airflow standalone

pip install apache-airflow
export AIRFLOW_HOME=~/airflow
airflow standalone
# UI em http://localhost:8080 — usuário: admin, senha: impressa no terminal

Docker Compose oficial

# Baixar o docker-compose.yaml oficial
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'

# Criar diretórios necessários
mkdir -p ./dags ./logs ./plugins ./config

# Ajustar UID (Linux)
echo -e "AIRFLOW_UID=$(id -u)" > .env

# Inicializar banco e criar usuário admin
docker compose up airflow-init

# Subir todos os serviços
docker compose up -d

# UI em http://localhost:8080 — airflow/airflow

Serviços incluídos no docker-compose padrão

ServiçoPortaFinalidade
airflow-webserver8080UI e REST API
airflow-schedulerScheduler + Executor
airflow-triggererOperadores deferrable
airflow-initInicialização e migração do DB
postgres5432Metadata DB
redis6379Broker do Celery (nessa imagem usa Celery)

CLI — Comandos Essenciais

# DAGs
airflow dags list                        # listar todos os DAGs
airflow dags trigger meu_dag             # disparar manualmente
airflow dags trigger meu_dag --conf '{"param": "valor"}'  # com config
airflow dags pause meu_dag               # pausar
airflow dags unpause meu_dag             # reativar
airflow dags backfill meu_dag \
    --start-date 2024-01-01 \
    --end-date 2024-01-31               # reprocessar período (Airflow 2.x)
airflow dags show meu_dag                # exibir grafo em ASCII

# Tasks
airflow tasks list meu_dag              # listar tasks
airflow tasks test meu_dag task_id 2024-07-15  # testar task sem gravar no DB
airflow tasks run meu_dag task_id run_id       # executar task específica

# Runs
airflow dags list-runs -d meu_dag       # listar runs do DAG
airflow dags state meu_dag 2024-07-15   # estado da run de uma data

# Connections & Variables
airflow connections list
airflow connections add ...
airflow variables list
airflow variables get s3_bucket
airflow variables set s3_bucket "meu-novo-bucket"

# Usuários
airflow users create \
    --username admin --firstname Admin --lastname User \
    --role Admin --email admin@empresa.com --password admin

# Banco
airflow db init       # inicializar banco (primeira vez)
airflow db migrate    # aplicar migrações (após upgrade)
airflow db check      # verificar conectividade

REST API

A partir do Airflow 2.0, há uma REST API estável:

BASE="http://localhost:8080/api/v1"
AUTH="-u airflow:airflow"

# Listar DAGs
curl $AUTH "$BASE/dags"

# Trigger manual
curl $AUTH -X POST "$BASE/dags/meu_dag/dagRuns" \
    -H "Content-Type: application/json" \
    -d '{"conf": {"param": "valor"}}'

# Status de uma run
curl $AUTH "$BASE/dags/meu_dag/dagRuns"

# Limpar e re-executar task
curl $AUTH -X POST "$BASE/dags/meu_dag/clearTaskInstances" \
    -H "Content-Type: application/json" \
    -d '{"task_ids": ["task_falhou"], "start_date": "2024-07-15T00:00:00Z"}'

Airflow 3.0 — Principais Mudanças

Airflow 3.0 (lançado em 2025) traz mudanças arquiteturais significativas:

MudançaDetalhes
Task SDK separadoairflow.sdk — SDK leve para tasks sem dependência pesada do core
Backfill gerenciado pelo Scheduler (AIP-78)Não é mais um processo CLI separado; backfills seguem o mesmo fluxo de scheduling regular
DAG versioningMúltiplas versões do mesmo DAG são rastreadas — runs antigas referenciam a versão do DAG que as criou
Asset-based schedulingDAGs podem ser disparados por atualização de “Assets” (dados) além de schedule por tempo
Novo modelo de execuçãoSeparação clara entre dag-processor, scheduler e api-server
API ServerSubstitui o webserver para a REST API (WebServer foca na UI)
logical_date vs execution_dateexecution_date deprecado; usar logical_date
Remoção de SubDAGsUse TaskGroup em vez de SubDAG (já recomendado desde 2.x)

Asset-based scheduling (Airflow 3.0)

from airflow.sdk import Asset, dag, task

# Definir um asset
vendas_raw = Asset("s3://bucket/raw/vendas/")

@dag(schedule=vendas_raw)  # dispara quando o asset é atualizado
def processa_vendas():
    @task
    def transforma():
        pass
    transforma()

@dag(schedule="@daily")
def ingere_vendas():
    @task(outlets=[vendas_raw])  # marca o asset como atualizado
    def extrai():
        return "dados"
    extrai()

TaskGroups — Organização Visual

from airflow.utils.task_group import TaskGroup

with DAG("pipeline_agrupado", ...) as dag:

    with TaskGroup("ingestao") as ingestao:
        extrai_api   = PythonOperator(task_id="extrai_api", ...)
        extrai_db    = PythonOperator(task_id="extrai_db", ...)
        [extrai_api, extrai_db]

    with TaskGroup("transformacao") as transformacao:
        limpa        = PythonOperator(task_id="limpa", ...)
        enriquece    = PythonOperator(task_id="enriquece", ...)
        limpa >> enriquece

    with TaskGroup("carga") as carga:
        carrega_dw   = PythonOperator(task_id="carrega_dw", ...)

    ingestao >> transformacao >> carga

Callbacks e Alertas

from airflow.utils.email import send_email

def on_failure(context):
    ti = context["task_instance"]
    send_email(
        to=["ops@empresa.com"],
        subject=f"[FALHA] {ti.dag_id}.{ti.task_id}",
        html_content=f"<p>Task {ti.task_id} falhou em {ti.execution_date}</p>"
                     f"<p>Log: {ti.log_url}</p>",
    )

def on_retry(context):
    ti = context["task_instance"]
    print(f"Retry #{ti.try_number} para {ti.task_id}")

def on_sla_miss(dag, task_list, blocking_task_list, slas, blocking_tis):
    send_email(to=["ops@empresa.com"], subject="SLA violado!", html_content="...")

task = PythonOperator(
    task_id="task_critica",
    python_callable=minha_funcao,
    on_failure_callback=on_failure,
    on_retry_callback=on_retry,
    sla=timedelta(hours=2),         # alerta se não terminar em 2h
)

# SLA miss callback fica no DAG
dag = DAG(
    "meu_dag",
    sla_miss_callback=on_sla_miss,
    ...
)

Deferrable Operators (Async)

Operadores deferráveis liberam o worker slot enquanto esperam por um evento externo (I/O), usando o Triggerer ao invés de polling.

from airflow.providers.amazon.aws.operators.emr import EmrServerlessStartJobOperator

# Este operador é deferrable — não bloqueia worker durante execução do job
emr_job = EmrServerlessStartJobOperator(
    task_id="emr_serverless",
    application_id="{{ var.value.emr_app_id }}",
    execution_role_arn="{{ var.value.emr_role_arn }}",
    job_driver={"sparkSubmit": {"entryPoint": "s3://bucket/job.py"}},
    deferrable=True,           # usa Triggerer ao invés de polling no worker
)

Secrets Backend

Para não armazenar senhas no metadata DB, use um secrets backend:

# HashiCorp Vault
export AIRFLOW__SECRETS__BACKEND=airflow.providers.hashicorp.secrets.vault.VaultBackend
export AIRFLOW__SECRETS__BACKEND_KWARGS='{"connections_path": "airflow/connections", "variables_path": "airflow/variables", "url": "http://vault:8200", "token": "root"}'

# AWS Secrets Manager
export AIRFLOW__SECRETS__BACKEND=airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
export AIRFLOW__SECRETS__BACKEND_KWARGS='{"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables"}'

Boas Práticas

PráticaPor quê
catchup=False por padrãoEvita criar dezenas de runs históricas ao ativar um DAG pela primeira vez
Tasks idempotentesRe-execuções não devem duplicar dados — use INSERT OR REPLACE, TRUNCATE antes de inserir
Não usar Variable.get() no topo do arquivoExecutado em cada parse do DAG; prefira dentro do python_callable
Não importar bibliotecas pesadas no top-levelSlows down DAG parsing; importe dentro da função
Usar mode="reschedule" em sensores de longa esperaLibera worker slots; poke bloqueia um worker indefinidamente
Dados entre tasks via XCom devem ser pequenosXCom é armazenado no metadata DB; dados grandes vão para S3/GCS
Usar pool para limitar concorrênciaPrevine sobrecarga em banco de dados ou APIs com rate limit
Secrets nunca em código — usar Connections ou VariablesCredenciais hardcodadas em DAGs viram riscos de segurança no git
Um DAG por arquivoFacilita debugging e reduz tempo de parse
Usar tags nos DAGsFacilita filtros na UI em ambientes com muitos DAGs
max_active_runs=1 para DAGs com estado externoEvita dois runs simultâneos modificando o mesmo dado
Testar localmente com airflow tasks testExecuta sem gravar no DB, ideal para desenvolvimento