Data

Kafka

Guia completo de Kafka — conceitos, CLI, produção, consumo, Spring Boot, Kafka Streams, Schema Registry e comparativo com RabbitMQ

Apache Kafka é uma plataforma de streaming de eventos distribuída. Originalmente criada pela LinkedIn, é usada para processar trilhões de eventos por dia em sistemas de alta escala. Diferentemente de sistemas de filas tradicionais, o Kafka mantém o histórico de mensagens e permite replay.


Conceitos Fundamentais

Broker: servidor Kafka. Um cluster tem múltiplos brokers. Cada broker armazena partições de tópicos.

Topic: categoria ou feed de mensagens. Análogo a uma “fila com histórico”. Mensagens são imutáveis e ordenadas dentro de cada partição.

Partition: subdivisão de um tópico. Mensagens em uma partição têm ordem garantida. Partições diferentes podem ser processadas em paralelo por consumers diferentes.

Consumer Group: grupo de consumers que dividem o processamento de um tópico. Cada partição é atribuída a apenas um consumer no grupo. Escalar consumers = mais paralelismo (até o número de partições).

Offset: número sequencial que identifica a posição de uma mensagem dentro de uma partição. Cada consumer group mantém seu próprio offset por partição.

Replication Factor: número de cópias de cada partição. Com fator 3, há 1 leader e 2 followers. Se o leader falha, um follower assume.

Arquitetura de um cluster Kafka:

Topic "orders" com 3 partições, replication factor 2:

Broker 1                    Broker 2                    Broker 3
┌─────────────────┐         ┌─────────────────┐         ┌─────────────────┐
│ Partition 0     │         │ Partition 0     │         │ Partition 1     │
│ (LEADER)        │────────▶│ (FOLLOWER)      │         │ (FOLLOWER)      │
├─────────────────┤         ├─────────────────┤         ├─────────────────┤
│ Partition 1     │         │ Partition 2     │         │ Partition 2     │
│ (LEADER)        │────────▶│ (FOLLOWER)      │◀────────│ (LEADER)        │
└─────────────────┘         └─────────────────┘         └─────────────────┘

Consumer Group "order-processor" com 2 consumers:
Consumer A → Partition 0 + Partition 1
Consumer B → Partition 2

Kafka CLI — Tópicos

# === VARIÁVEIS DE AMBIENTE ===
KAFKA_BOOTSTRAP="--bootstrap-server localhost:9092"

# Criar tópico
kafka-topics.sh --create \
  --topic orders.placed \
  --partitions 6 \
  --replication-factor 3 \
  --config retention.ms=604800000 \    # 7 dias
  --config min.insync.replicas=2 \
  $KAFKA_BOOTSTRAP

# Listar tópicos
kafka-topics.sh --list $KAFKA_BOOTSTRAP
kafka-topics.sh --list $KAFKA_BOOTSTRAP | grep "orders"

# Descrever tópico (partições, líderes, ISR, replicas)
kafka-topics.sh --describe --topic orders.placed $KAFKA_BOOTSTRAP

# Saída esperada:
# Topic: orders.placed  Partitions: 6  ReplicationFactor: 3  Configs: ...
# Topic: orders.placed  Partition: 0  Leader: 1  Replicas: 1,2,3  Isr: 1,2,3

# Alterar número de partições (apenas aumentar, nunca diminuir)
kafka-topics.sh --alter --topic orders.placed \
  --partitions 12 \
  $KAFKA_BOOTSTRAP

# Alterar configurações de um tópico
kafka-configs.sh --alter --entity-type topics \
  --entity-name orders.placed \
  --add-config "retention.ms=86400000,max.message.bytes=1048576" \
  $KAFKA_BOOTSTRAP

# Ver configurações de um tópico
kafka-configs.sh --describe --entity-type topics \
  --entity-name orders.placed \
  $KAFKA_BOOTSTRAP

# Deletar tópico
kafka-topics.sh --delete --topic orders.placed.DLT $KAFKA_BOOTSTRAP

Kafka CLI — Produzindo e Consumindo

# PRODUZIR mensagens via console
kafka-console-producer.sh \
  --topic orders.placed \
  $KAFKA_BOOTSTRAP

# Produzir com chave (separador padrão: TAB)
kafka-console-producer.sh \
  --topic orders.placed \
  --property key.separator=":" \
  --property parse.key=true \
  $KAFKA_BOOTSTRAP
# Entrada: order-123:{"orderId":"order-123","customerId":"cust-1"}

# Produzir arquivo de mensagens
cat mensagens.json | kafka-console-producer.sh \
  --topic orders.placed \
  $KAFKA_BOOTSTRAP

# CONSUMIR mensagens
# Do início (from-beginning)
kafka-console-consumer.sh \
  --topic orders.placed \
  --from-beginning \
  $KAFKA_BOOTSTRAP

# Apenas as novas mensagens
kafka-console-consumer.sh \
  --topic orders.placed \
  $KAFKA_BOOTSTRAP

# Com chave e timestamp visíveis
kafka-console-consumer.sh \
  --topic orders.placed \
  --from-beginning \
  --property print.key=true \
  --property print.timestamp=true \
  --property key.separator=" | " \
  $KAFKA_BOOTSTRAP

# Consumir de partição específica
kafka-console-consumer.sh \
  --topic orders.placed \
  --partition 2 \
  --offset 100 \
  $KAFKA_BOOTSTRAP

# Consumir com consumer group
kafka-console-consumer.sh \
  --topic orders.placed \
  --from-beginning \
  --group debug-consumer \
  $KAFKA_BOOTSTRAP

# Limitar número de mensagens
kafka-console-consumer.sh \
  --topic orders.placed \
  --from-beginning \
  --max-messages 50 \
  $KAFKA_BOOTSTRAP

Consumer Groups — Lag e Offset Management

# Listar consumer groups
kafka-consumer-groups.sh --list $KAFKA_BOOTSTRAP

# Descrever grupo — mostra LAG por partição
kafka-consumer-groups.sh \
  --describe --group order-processor \
  $KAFKA_BOOTSTRAP

# Saída:
# GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# order-processor orders.placed   0          1000            1050            50
# order-processor orders.placed   1          980             980             0
# order-processor orders.placed   2          1020            1020            0

# LAG total = soma de todos os lags por partição
# LAG > 0 = consumer está atrasado

# Resetar offset — consumer deve estar PARADO
# Para o início (reprocessar tudo)
kafka-consumer-groups.sh \
  --group order-processor \
  --topic orders.placed \
  --reset-offsets --to-earliest \
  --execute \
  $KAFKA_BOOTSTRAP

# Para o final (ignorar mensagens antigas)
kafka-consumer-groups.sh \
  --group order-processor \
  --topic orders.placed \
  --reset-offsets --to-latest \
  --execute \
  $KAFKA_BOOTSTRAP

# Para um offset específico
kafka-consumer-groups.sh \
  --group order-processor \
  --topic orders.placed \
  --reset-offsets --to-offset 1000 \
  --execute \
  $KAFKA_BOOTSTRAP

# Para um timestamp específico (ISO 8601)
kafka-consumer-groups.sh \
  --group order-processor \
  --topic orders.placed:0,1,2 \
  --reset-offsets --to-datetime "2024-01-15T10:00:00.000" \
  --execute \
  $KAFKA_BOOTSTRAP

# Voltar N mensagens
kafka-consumer-groups.sh \
  --group order-processor \
  --topic orders.placed \
  --reset-offsets --shift-by -100 \
  --execute \
  $KAFKA_BOOTSTRAP

# Simulação sem executar (dry-run)
kafka-consumer-groups.sh \
  --group order-processor \
  --topic orders.placed \
  --reset-offsets --to-earliest \
  --dry-run \
  $KAFKA_BOOTSTRAP

Produzindo Mensagens — Acks, Keys e Idempotência

A garantia de entrega do Kafka é controlada pelo parâmetro acks:

  • acks=0 — fire and forget. Sem confirmação. Mais rápido, pode perder mensagens.
  • acks=1 — aguarda confirmação do leader. Risco de perda se leader falha antes de replicar.
  • acks=all (ou -1) — aguarda confirmação de todos os ISR. Mais seguro.
// SPRING BOOT — configuração do producer
@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // ACKS — garantia de entrega
        config.put(ProducerConfig.ACKS_CONFIG, "all");

        // Idempotent producer — garante exatamente uma entrega mesmo com retry
        // Requer acks=all e retries > 0
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // Retries e timeout
        config.put(ProducerConfig.RETRIES_CONFIG, 3);
        config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
        config.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);

        // Performance
        config.put(ProducerConfig.LINGER_MS_CONFIG, 5);          // aguarda 5ms para agrupar
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);      // 16KB por batch
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");  // compressão

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

// ENVIANDO MENSAGENS COM KafkaTemplate
@Service
public class OrderEventPublisher {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private final ObjectMapper objectMapper;

    // Chave de particionamento — mensagens da mesma order vão para a mesma partição
    // Garante ordenação de eventos por order
    public void publishOrderPlaced(Order order) {
        try {
            String payload = objectMapper.writeValueAsString(new OrderPlacedEvent(
                order.id().value(),
                order.customerId().value(),
                order.total().amount(),
                Instant.now()
            ));

            // ProducerRecord com chave explícita
            ProducerRecord<String, String> record = new ProducerRecord<>(
                "orders.placed",
                order.id().value(),  // chave = orderId → mesma partição para a mesma order
                payload
            );

            // Envio assíncrono com callback
            kafkaTemplate.send(record).whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("Falha ao publicar evento OrderPlaced: orderId={}", order.id(), ex);
                    throw new EventPublishingException("Falha ao publicar evento", ex);
                } else {
                    log.info("OrderPlaced publicado: orderId={}, partition={}, offset={}",
                        order.id(),
                        result.getRecordMetadata().partition(),
                        result.getRecordMetadata().offset()
                    );
                }
            });

        } catch (JsonProcessingException e) {
            throw new EventPublishingException("Falha ao serializar evento", e);
        }
    }

    // Envio transacional — todos ou nenhum
    public void publishMultipleEvents(Order order) {
        kafkaTemplate.executeInTransaction(ops -> {
            ops.send("orders.placed", order.id().value(), serializeOrderPlaced(order));
            ops.send("inventory.reserve", order.id().value(), serializeInventoryReserve(order));
            return true;
        });
    }
}

Consumindo Mensagens — Consumer Groups e Offset Management

// CONFIGURAÇÃO DO CONSUMER
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        // AUTO COMMIT — conveniente mas perigoso
        // false = commit manual (mais controle)
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // Quando não há offset para o grupo (primeiro start ou após reset)
        // earliest = do início | latest = apenas novas mensagens
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // Polling
        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);          // mensagens por poll
        config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);   // 5 min timeout de processamento

        // Session timeout — se consumer não fizer heartbeat neste tempo, é removido do grupo
        config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000);
        config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 15000);

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConcurrency(3);  // 3 threads, uma por partição (se tiver 3 partições)
        return factory;
    }
}

// CONSUMER COM COMMIT MANUAL
@Component
public class OrderConsumer {

    @KafkaListener(
        topics = "orders.placed",
        groupId = "order-processor",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void onOrderPlaced(
            ConsumerRecord<String, String> record,
            Acknowledgment acknowledgment) {

        String orderId = record.key();
        String payload = record.value();
        long offset = record.offset();
        int partition = record.partition();

        log.info("Processando OrderPlaced: orderId={}, partition={}, offset={}",
            orderId, partition, offset);

        try {
            OrderPlacedEvent event = objectMapper.readValue(payload, OrderPlacedEvent.class);
            inventoryService.reserve(event.orderId(), event.items());

            // Commit APENAS após processamento bem-sucedido
            acknowledgment.acknowledge();

        } catch (RetryableException e) {
            // Erro transiente — NÃO commita, será reentregue
            log.warn("Erro transiente ao processar OrderPlaced: {}. Não commitando.", e.getMessage());
            throw e;

        } catch (Exception e) {
            // Erro permanente — commita para não travar a fila
            log.error("Erro permanente ao processar OrderPlaced: orderId={}", orderId, e);
            // Envia para DLQ/alerta aqui
            deadLetterService.send("orders.placed", orderId, payload, e.getMessage());
            acknowledgment.acknowledge(); // commit para avançar
        }
    }
}

Retry Topics e Dead Letter Topics com Spring Kafka

// @RetryableTopic — cria automaticamente retry topics e DLT
@Component
public class OrderEventConsumer {

    @RetryableTopic(
        attempts = "4",                          // 1 original + 3 retries
        backoff = @Backoff(
            delay = 1000,                        // 1s antes do primeiro retry
            multiplier = 2.0,                    // duplica a cada retry: 1s → 2s → 4s
            maxDelay = 10000                     // máximo 10s entre retries
        ),
        dltTopicSuffix = ".DLT",                // nome: orders.placed.DLT
        retryTopicSuffix = "-retry",            // nome: orders.placed-retry-0, -retry-1, etc
        autoCreateTopics = "true",
        numPartitions = "3",
        replicationFactor = "1",
        include = {                              // apenas estes erros fazem retry
            ServiceUnavailableException.class,
            RetryableException.class
        }
        // exclude = {} — estes erros vão direto para DLT sem retry
    )
    @KafkaListener(topics = "orders.placed", groupId = "order-processor")
    public void onOrderPlaced(@Payload String payload,
                               @Header(KafkaHeaders.RECEIVED_KEY) String key) {

        OrderPlacedEvent event = deserialize(payload);
        inventoryService.reserve(event.orderId(), event.items());
    }

    // Handler da Dead Letter Topic
    @DltHandler
    public void handleDeadLetter(
            @Payload String payload,
            @Header(KafkaHeaders.EXCEPTION_MESSAGE) String errorMessage,
            @Header(KafkaHeaders.ORIGINAL_TOPIC) String originalTopic,
            @Header(KafkaHeaders.RECEIVED_KEY) String key) {

        log.error("Mensagem na DLT | topic={} | key={} | erro={}",
            originalTopic, key, errorMessage);

        // Persistir para análise e reprocessamento manual
        dltRecordRepository.save(DltRecord.builder()
            .messageKey(key)
            .originalTopic(originalTopic)
            .payload(payload)
            .errorMessage(errorMessage)
            .receivedAt(Instant.now())
            .build()
        );

        // Alerta para time de operações
        alertService.sendDltAlert(originalTopic, key, errorMessage);
    }
}

Configurações Importantes de Tópicos e Brokers

# === CONFIGURAÇÕES DE TÓPICO ===

# Retenção
retention.ms=604800000          # quanto tempo manter mensagens (padrão: 7 dias)
retention.bytes=-1              # tamanho máximo por partição (-1 = sem limite)

# Tamanho de mensagem
max.message.bytes=1048576       # 1MB por mensagem (padrão)

# Compressão
compression.type=snappy         # snappy, gzip, lz4, zstd (snappy = melhor equilíbrio)

# Replicação
replication.factor=3            # cópias de cada partição
min.insync.replicas=2           # mínimo de réplicas sincronizadas para aceitar escrita

# Compaction
cleanup.policy=compact          # mantém apenas o último valor por chave
cleanup.policy=delete           # deleta mensagens antigas (padrão)
cleanup.policy=compact,delete   # compacta + deleta

# Configurar via kafka-configs.sh
kafka-configs.sh --alter \
  --entity-type topics \
  --entity-name orders.placed \
  --add-config "retention.ms=86400000,min.insync.replicas=2,compression.type=snappy" \
  --bootstrap-server localhost:9092

# === CONFIGURAÇÕES DE BROKER ===
# Ver configurações do broker
kafka-configs.sh --describe \
  --entity-type brokers \
  --entity-name 0 \
  --bootstrap-server localhost:9092

# Configurações críticas de produção:
# unclean.leader.election.enable=false  → NÃO permite eleição de réplica fora do ISR
#                                         evita perda de dados, mas pode causar indisponibilidade
# default.replication.factor=3
# min.insync.replicas=2
# log.retention.hours=168               → 7 dias
# num.partitions=6                      → padrão ao criar tópicos

Kafka com Java/Spring Boot — Configuração via application.yml

spring:
  kafka:
    bootstrap-servers: localhost:9092
    
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all
      retries: 3
      properties:
        enable.idempotence: true
        linger.ms: 5
        compression.type: snappy
    
    consumer:
      group-id: order-processor
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
      enable-auto-commit: false
      max-poll-records: 100
      properties:
        max.poll.interval.ms: 300000
        session.timeout.ms: 45000
    
    listener:
      ack-mode: manual_immediate
      concurrency: 3
      # Configuração de retry para listeners sem @RetryableTopic
      retry:
        backoff:
          interval: 1000
          max-interval: 10000
          multiplier: 2.0

Kafka Streams — Processamento em Tempo Real

Kafka Streams é uma biblioteca Java para processamento de streams sobre Kafka. Não requer cluster separado — roda dentro da aplicação.

@Configuration
@EnableKafkaStreams
public class OrderStreamConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration streamsConfig() {
        Map<String, Object> config = new HashMap<>();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return new KafkaStreamsConfiguration(config);
    }
}

@Component
public class OrderAnalyticsStream {

    @Autowired
    void buildPipeline(StreamsBuilder builder) {

        // KStream — sequência de registros (eventos)
        KStream<String, String> ordersStream = builder.stream("orders.placed");

        // Filtrar e transformar
        KStream<String, OrderValue> orderValues = ordersStream
            .filter((key, value) -> value != null)
            .mapValues(value -> {
                OrderPlacedEvent event = deserialize(value);
                return new OrderValue(event.orderId(), event.total());
            });

        // WINDOWED AGGREGATION — total de pedidos por hora
        KTable<Windowed<String>, Long> orderCountByHour = ordersStream
            .groupBy((key, value) -> "all")  // tudo no mesmo grupo
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
            .count(Materialized.as("order-count-by-hour"));

        // Enviar resultado para outro tópico
        orderCountByHour
            .toStream()
            .map((windowedKey, count) -> KeyValue.pair(
                windowedKey.window().startTime().toString(),
                count.toString()
            ))
            .to("orders.hourly-count");

        // JOIN — enriquecer evento de pedido com dados do cliente
        KTable<String, String> customersTable = builder.table("customers");

        KStream<String, String> enrichedOrders = ordersStream.join(
            customersTable,
            (orderJson, customerJson) -> enrichOrder(orderJson, customerJson),
            Joined.with(Serdes.String(), Serdes.String(), Serdes.String())
        );

        enrichedOrders.to("orders.enriched");
    }
}

Schema Registry e Avro

Schema Registry armazena e versiona schemas de mensagens. Avro é o formato de serialização mais comum. Garante compatibilidade entre producers e consumers.

# Registrar schema
curl -X POST http://localhost:8081/subjects/orders.placed-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schema": "{\"type\":\"record\",\"name\":\"OrderPlaced\",\"fields\":[{\"name\":\"orderId\",\"type\":\"string\"},{\"name\":\"customerId\",\"type\":\"string\"},{\"name\":\"total\",\"type\":\"double\"},{\"name\":\"occurredAt\",\"type\":\"long\"}]}"
  }'

# Listar subjects (schemas registrados)
curl http://localhost:8081/subjects

# Ver versões de um schema
curl http://localhost:8081/subjects/orders.placed-value/versions

# Compatibilidade — tipos: BACKWARD, FORWARD, FULL, NONE
curl -X PUT http://localhost:8081/config/orders.placed-value \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "BACKWARD"}'
# BACKWARD: novo schema pode ler dados do schema anterior
# FORWARD:  schema anterior pode ler dados do novo schema
# FULL:     ambos os sentidos
// Spring Boot com Avro + Schema Registry
// pom.xml: io.confluent:kafka-avro-serializer
// Schema em src/main/avro/OrderPlaced.avsc

// application.yml:
// spring.kafka.producer.value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
// spring.kafka.properties.schema.registry.url: http://localhost:8081

// Usando tipo gerado pelo Avro Maven plugin
@Service
public class OrderEventPublisher {
    private final KafkaTemplate<String, OrderPlaced> kafkaTemplate;

    public void publish(Order order) {
        OrderPlaced avroEvent = OrderPlaced.newBuilder()
            .setOrderId(order.id().value())
            .setCustomerId(order.customerId().value())
            .setTotal(order.total().doubleValue())
            .setOccurredAt(Instant.now().toEpochMilli())
            .build();

        kafkaTemplate.send("orders.placed", order.id().value(), avroEvent);
    }
}

Monitoramento — Consumer Lag, JMX e Kafka UI

# Consumer lag via CLI (já mostrado em Consumer Groups)
kafka-consumer-groups.sh --describe --group order-processor $KAFKA_BOOTSTRAP

# Monitoramento de broker — JMX
# Métricas importantes:
# kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec  → throughput de entrada
# kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec     → bytes de entrada
# kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions → partições sem réplica suficiente
# kafka.server:type=KafkaController,name=ActiveControllerCount → deve ser 1 no cluster

# Habilitar JMX
export JMX_PORT=9999
./bin/kafka-server-start.sh config/server.properties
# AKHQ (Kafka UI) via Docker Compose
  akhq:
    image: tchiotludo/akhq:0.25.0
    environment:
      AKHQ_CONFIGURATION: |
        akhq:
          connections:
            local:
              properties:
                bootstrap.servers: "kafka:9092"
              schema-registry:
                url: "http://schema-registry:8081"
    ports:
      - "8080:8080"
    depends_on:
      - kafka

Kafka em Produção — ISR, Compaction e Configurações Críticas

# === ISR (In-Sync Replicas) ===
# ISR são as réplicas que estão sincronizadas com o leader
# Se uma réplica está muito atrasada, é removida do ISR

# Ver ISR por partição
kafka-topics.sh --describe --topic orders.placed $KAFKA_BOOTSTRAP
# Isr: 1,2,3 → todas as 3 réplicas estão sincronizadas (saudável)
# Isr: 1     → apenas o leader está no ISR (ALERTA! réplicas atrasadas)

# UNCLEAN LEADER ELECTION — deve estar DESABILITADO em produção
# Se true, permite eleição de réplica fora do ISR como leader
# Consequência: possível PERDA DE DADOS
unclean.leader.election.enable=false   # nunca mude para true em produção

# min.insync.replicas + acks=all = forte garantia de durabilidade
# Com replication.factor=3 e min.insync.replicas=2:
# → escrita aceita apenas se ao menos 2 réplicas confirmarem
# → 1 broker pode cair sem impacto
# → se 2 brokers caírem, o producer recebe NotEnoughReplicasException

# === TOPIC COMPACTION ===
# Log compaction: mantém apenas o último valor por chave
# Ideal para: tabelas de estado, registros de entidade

# Criar tópico compactado
kafka-topics.sh --create \
  --topic customers \
  --config cleanup.policy=compact \
  --config min.cleanable.dirty.ratio=0.1 \
  --config segment.ms=3600000 \
  $KAFKA_BOOTSTRAP

# Tombstone: mensagem com value=null deleta a chave na compaction

Comparativo Kafka vs RabbitMQ

                    KAFKA                           RABBITMQ
─────────────────────────────────────────────────────────────────
Modelo          Log distribuído                  Message broker tradicional
Persistência    Mensagens retidas por período    Mensagem removida após consume
Replay          SIM — reprocessar histórico      NÃO (sem configuração extra)
Ordenação       Por partição                     Por fila (FIFO estrito)
Throughput      Muito alto (milhões/seg)          Alto (centenas de mil/seg)
Roteamento      Por tópico/partição              Exchanges e bindings flexíveis
Protocolos      Protocolo proprietário           AMQP, STOMP, MQTT
Consumer Groups SIM — partições divididas        Consumers concorrem pela mesma fila
Latência        Milissegundos (configurável)     Sub-milissegundo possível
Setup           Mais complexo                    Mais simples
Monitoramento   JMX, Kafka UI (AKHQ)            Management UI integrada

USE KAFKA QUANDO:
✓ Alto volume de eventos (IoT, logs, clickstream)
✓ Múltiplos consumers com processamentos independentes
✓ Necessidade de replay de eventos (Event Sourcing)
✓ Processamento em stream (Kafka Streams, Flink)
✓ Retenção histórica de eventos

USE RABBITMQ QUANDO:
✓ Roteamento complexo (exchanges, routing keys, bindings)
✓ Mensagens de trabalho (work queues) com ACK granular
✓ Prioridade de mensagens
✓ Request-reply pattern (RPC via messaging)
✓ Integrações com sistemas legados (AMQP)
✓ Menor complexidade operacional