Apache Kafka é uma plataforma de streaming de eventos distribuída. Originalmente criada pela LinkedIn, é usada para processar trilhões de eventos por dia em sistemas de alta escala. Diferentemente de sistemas de filas tradicionais, o Kafka mantém o histórico de mensagens e permite replay.
Conceitos Fundamentais
Broker: servidor Kafka. Um cluster tem múltiplos brokers. Cada broker armazena partições de tópicos.
Topic: categoria ou feed de mensagens. Análogo a uma “fila com histórico”. Mensagens são imutáveis e ordenadas dentro de cada partição.
Partition: subdivisão de um tópico. Mensagens em uma partição têm ordem garantida. Partições diferentes podem ser processadas em paralelo por consumers diferentes.
Consumer Group: grupo de consumers que dividem o processamento de um tópico. Cada partição é atribuída a apenas um consumer no grupo. Escalar consumers = mais paralelismo (até o número de partições).
Offset: número sequencial que identifica a posição de uma mensagem dentro de uma partição. Cada consumer group mantém seu próprio offset por partição.
Replication Factor: número de cópias de cada partição. Com fator 3, há 1 leader e 2 followers. Se o leader falha, um follower assume.
Arquitetura de um cluster Kafka:
Topic "orders" com 3 partições, replication factor 2:
Broker 1 Broker 2 Broker 3
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Partition 0 │ │ Partition 0 │ │ Partition 1 │
│ (LEADER) │────────▶│ (FOLLOWER) │ │ (FOLLOWER) │
├─────────────────┤ ├─────────────────┤ ├─────────────────┤
│ Partition 1 │ │ Partition 2 │ │ Partition 2 │
│ (LEADER) │────────▶│ (FOLLOWER) │◀────────│ (LEADER) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Consumer Group "order-processor" com 2 consumers:
Consumer A → Partition 0 + Partition 1
Consumer B → Partition 2Kafka CLI — Tópicos
# === VARIÁVEIS DE AMBIENTE ===
KAFKA_BOOTSTRAP="--bootstrap-server localhost:9092"
# Criar tópico
kafka-topics.sh --create \
--topic orders.placed \
--partitions 6 \
--replication-factor 3 \
--config retention.ms=604800000 \ # 7 dias
--config min.insync.replicas=2 \
$KAFKA_BOOTSTRAP
# Listar tópicos
kafka-topics.sh --list $KAFKA_BOOTSTRAP
kafka-topics.sh --list $KAFKA_BOOTSTRAP | grep "orders"
# Descrever tópico (partições, líderes, ISR, replicas)
kafka-topics.sh --describe --topic orders.placed $KAFKA_BOOTSTRAP
# Saída esperada:
# Topic: orders.placed Partitions: 6 ReplicationFactor: 3 Configs: ...
# Topic: orders.placed Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
# Alterar número de partições (apenas aumentar, nunca diminuir)
kafka-topics.sh --alter --topic orders.placed \
--partitions 12 \
$KAFKA_BOOTSTRAP
# Alterar configurações de um tópico
kafka-configs.sh --alter --entity-type topics \
--entity-name orders.placed \
--add-config "retention.ms=86400000,max.message.bytes=1048576" \
$KAFKA_BOOTSTRAP
# Ver configurações de um tópico
kafka-configs.sh --describe --entity-type topics \
--entity-name orders.placed \
$KAFKA_BOOTSTRAP
# Deletar tópico
kafka-topics.sh --delete --topic orders.placed.DLT $KAFKA_BOOTSTRAPKafka CLI — Produzindo e Consumindo
# PRODUZIR mensagens via console
kafka-console-producer.sh \
--topic orders.placed \
$KAFKA_BOOTSTRAP
# Produzir com chave (separador padrão: TAB)
kafka-console-producer.sh \
--topic orders.placed \
--property key.separator=":" \
--property parse.key=true \
$KAFKA_BOOTSTRAP
# Entrada: order-123:{"orderId":"order-123","customerId":"cust-1"}
# Produzir arquivo de mensagens
cat mensagens.json | kafka-console-producer.sh \
--topic orders.placed \
$KAFKA_BOOTSTRAP
# CONSUMIR mensagens
# Do início (from-beginning)
kafka-console-consumer.sh \
--topic orders.placed \
--from-beginning \
$KAFKA_BOOTSTRAP
# Apenas as novas mensagens
kafka-console-consumer.sh \
--topic orders.placed \
$KAFKA_BOOTSTRAP
# Com chave e timestamp visíveis
kafka-console-consumer.sh \
--topic orders.placed \
--from-beginning \
--property print.key=true \
--property print.timestamp=true \
--property key.separator=" | " \
$KAFKA_BOOTSTRAP
# Consumir de partição específica
kafka-console-consumer.sh \
--topic orders.placed \
--partition 2 \
--offset 100 \
$KAFKA_BOOTSTRAP
# Consumir com consumer group
kafka-console-consumer.sh \
--topic orders.placed \
--from-beginning \
--group debug-consumer \
$KAFKA_BOOTSTRAP
# Limitar número de mensagens
kafka-console-consumer.sh \
--topic orders.placed \
--from-beginning \
--max-messages 50 \
$KAFKA_BOOTSTRAPConsumer Groups — Lag e Offset Management
# Listar consumer groups
kafka-consumer-groups.sh --list $KAFKA_BOOTSTRAP
# Descrever grupo — mostra LAG por partição
kafka-consumer-groups.sh \
--describe --group order-processor \
$KAFKA_BOOTSTRAP
# Saída:
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# order-processor orders.placed 0 1000 1050 50
# order-processor orders.placed 1 980 980 0
# order-processor orders.placed 2 1020 1020 0
# LAG total = soma de todos os lags por partição
# LAG > 0 = consumer está atrasado
# Resetar offset — consumer deve estar PARADO
# Para o início (reprocessar tudo)
kafka-consumer-groups.sh \
--group order-processor \
--topic orders.placed \
--reset-offsets --to-earliest \
--execute \
$KAFKA_BOOTSTRAP
# Para o final (ignorar mensagens antigas)
kafka-consumer-groups.sh \
--group order-processor \
--topic orders.placed \
--reset-offsets --to-latest \
--execute \
$KAFKA_BOOTSTRAP
# Para um offset específico
kafka-consumer-groups.sh \
--group order-processor \
--topic orders.placed \
--reset-offsets --to-offset 1000 \
--execute \
$KAFKA_BOOTSTRAP
# Para um timestamp específico (ISO 8601)
kafka-consumer-groups.sh \
--group order-processor \
--topic orders.placed:0,1,2 \
--reset-offsets --to-datetime "2024-01-15T10:00:00.000" \
--execute \
$KAFKA_BOOTSTRAP
# Voltar N mensagens
kafka-consumer-groups.sh \
--group order-processor \
--topic orders.placed \
--reset-offsets --shift-by -100 \
--execute \
$KAFKA_BOOTSTRAP
# Simulação sem executar (dry-run)
kafka-consumer-groups.sh \
--group order-processor \
--topic orders.placed \
--reset-offsets --to-earliest \
--dry-run \
$KAFKA_BOOTSTRAPProduzindo Mensagens — Acks, Keys e Idempotência
A garantia de entrega do Kafka é controlada pelo parâmetro acks:
acks=0— fire and forget. Sem confirmação. Mais rápido, pode perder mensagens.acks=1— aguarda confirmação do leader. Risco de perda se leader falha antes de replicar.acks=all(ou-1) — aguarda confirmação de todos os ISR. Mais seguro.
// SPRING BOOT — configuração do producer
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// ACKS — garantia de entrega
config.put(ProducerConfig.ACKS_CONFIG, "all");
// Idempotent producer — garante exatamente uma entrega mesmo com retry
// Requer acks=all e retries > 0
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Retries e timeout
config.put(ProducerConfig.RETRIES_CONFIG, 3);
config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
config.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
// Performance
config.put(ProducerConfig.LINGER_MS_CONFIG, 5); // aguarda 5ms para agrupar
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 16KB por batch
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // compressão
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
// ENVIANDO MENSAGENS COM KafkaTemplate
@Service
public class OrderEventPublisher {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
// Chave de particionamento — mensagens da mesma order vão para a mesma partição
// Garante ordenação de eventos por order
public void publishOrderPlaced(Order order) {
try {
String payload = objectMapper.writeValueAsString(new OrderPlacedEvent(
order.id().value(),
order.customerId().value(),
order.total().amount(),
Instant.now()
));
// ProducerRecord com chave explícita
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders.placed",
order.id().value(), // chave = orderId → mesma partição para a mesma order
payload
);
// Envio assíncrono com callback
kafkaTemplate.send(record).whenComplete((result, ex) -> {
if (ex != null) {
log.error("Falha ao publicar evento OrderPlaced: orderId={}", order.id(), ex);
throw new EventPublishingException("Falha ao publicar evento", ex);
} else {
log.info("OrderPlaced publicado: orderId={}, partition={}, offset={}",
order.id(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset()
);
}
});
} catch (JsonProcessingException e) {
throw new EventPublishingException("Falha ao serializar evento", e);
}
}
// Envio transacional — todos ou nenhum
public void publishMultipleEvents(Order order) {
kafkaTemplate.executeInTransaction(ops -> {
ops.send("orders.placed", order.id().value(), serializeOrderPlaced(order));
ops.send("inventory.reserve", order.id().value(), serializeInventoryReserve(order));
return true;
});
}
}Consumindo Mensagens — Consumer Groups e Offset Management
// CONFIGURAÇÃO DO CONSUMER
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// AUTO COMMIT — conveniente mas perigoso
// false = commit manual (mais controle)
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Quando não há offset para o grupo (primeiro start ou após reset)
// earliest = do início | latest = apenas novas mensagens
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Polling
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // mensagens por poll
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5 min timeout de processamento
// Session timeout — se consumer não fizer heartbeat neste tempo, é removido do grupo
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000);
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 15000);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConcurrency(3); // 3 threads, uma por partição (se tiver 3 partições)
return factory;
}
}
// CONSUMER COM COMMIT MANUAL
@Component
public class OrderConsumer {
@KafkaListener(
topics = "orders.placed",
groupId = "order-processor",
containerFactory = "kafkaListenerContainerFactory"
)
public void onOrderPlaced(
ConsumerRecord<String, String> record,
Acknowledgment acknowledgment) {
String orderId = record.key();
String payload = record.value();
long offset = record.offset();
int partition = record.partition();
log.info("Processando OrderPlaced: orderId={}, partition={}, offset={}",
orderId, partition, offset);
try {
OrderPlacedEvent event = objectMapper.readValue(payload, OrderPlacedEvent.class);
inventoryService.reserve(event.orderId(), event.items());
// Commit APENAS após processamento bem-sucedido
acknowledgment.acknowledge();
} catch (RetryableException e) {
// Erro transiente — NÃO commita, será reentregue
log.warn("Erro transiente ao processar OrderPlaced: {}. Não commitando.", e.getMessage());
throw e;
} catch (Exception e) {
// Erro permanente — commita para não travar a fila
log.error("Erro permanente ao processar OrderPlaced: orderId={}", orderId, e);
// Envia para DLQ/alerta aqui
deadLetterService.send("orders.placed", orderId, payload, e.getMessage());
acknowledgment.acknowledge(); // commit para avançar
}
}
}Retry Topics e Dead Letter Topics com Spring Kafka
// @RetryableTopic — cria automaticamente retry topics e DLT
@Component
public class OrderEventConsumer {
@RetryableTopic(
attempts = "4", // 1 original + 3 retries
backoff = @Backoff(
delay = 1000, // 1s antes do primeiro retry
multiplier = 2.0, // duplica a cada retry: 1s → 2s → 4s
maxDelay = 10000 // máximo 10s entre retries
),
dltTopicSuffix = ".DLT", // nome: orders.placed.DLT
retryTopicSuffix = "-retry", // nome: orders.placed-retry-0, -retry-1, etc
autoCreateTopics = "true",
numPartitions = "3",
replicationFactor = "1",
include = { // apenas estes erros fazem retry
ServiceUnavailableException.class,
RetryableException.class
}
// exclude = {} — estes erros vão direto para DLT sem retry
)
@KafkaListener(topics = "orders.placed", groupId = "order-processor")
public void onOrderPlaced(@Payload String payload,
@Header(KafkaHeaders.RECEIVED_KEY) String key) {
OrderPlacedEvent event = deserialize(payload);
inventoryService.reserve(event.orderId(), event.items());
}
// Handler da Dead Letter Topic
@DltHandler
public void handleDeadLetter(
@Payload String payload,
@Header(KafkaHeaders.EXCEPTION_MESSAGE) String errorMessage,
@Header(KafkaHeaders.ORIGINAL_TOPIC) String originalTopic,
@Header(KafkaHeaders.RECEIVED_KEY) String key) {
log.error("Mensagem na DLT | topic={} | key={} | erro={}",
originalTopic, key, errorMessage);
// Persistir para análise e reprocessamento manual
dltRecordRepository.save(DltRecord.builder()
.messageKey(key)
.originalTopic(originalTopic)
.payload(payload)
.errorMessage(errorMessage)
.receivedAt(Instant.now())
.build()
);
// Alerta para time de operações
alertService.sendDltAlert(originalTopic, key, errorMessage);
}
}Configurações Importantes de Tópicos e Brokers
# === CONFIGURAÇÕES DE TÓPICO ===
# Retenção
retention.ms=604800000 # quanto tempo manter mensagens (padrão: 7 dias)
retention.bytes=-1 # tamanho máximo por partição (-1 = sem limite)
# Tamanho de mensagem
max.message.bytes=1048576 # 1MB por mensagem (padrão)
# Compressão
compression.type=snappy # snappy, gzip, lz4, zstd (snappy = melhor equilíbrio)
# Replicação
replication.factor=3 # cópias de cada partição
min.insync.replicas=2 # mínimo de réplicas sincronizadas para aceitar escrita
# Compaction
cleanup.policy=compact # mantém apenas o último valor por chave
cleanup.policy=delete # deleta mensagens antigas (padrão)
cleanup.policy=compact,delete # compacta + deleta
# Configurar via kafka-configs.sh
kafka-configs.sh --alter \
--entity-type topics \
--entity-name orders.placed \
--add-config "retention.ms=86400000,min.insync.replicas=2,compression.type=snappy" \
--bootstrap-server localhost:9092
# === CONFIGURAÇÕES DE BROKER ===
# Ver configurações do broker
kafka-configs.sh --describe \
--entity-type brokers \
--entity-name 0 \
--bootstrap-server localhost:9092
# Configurações críticas de produção:
# unclean.leader.election.enable=false → NÃO permite eleição de réplica fora do ISR
# evita perda de dados, mas pode causar indisponibilidade
# default.replication.factor=3
# min.insync.replicas=2
# log.retention.hours=168 → 7 dias
# num.partitions=6 → padrão ao criar tópicosKafka com Java/Spring Boot — Configuração via application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
properties:
enable.idempotence: true
linger.ms: 5
compression.type: snappy
consumer:
group-id: order-processor
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
enable-auto-commit: false
max-poll-records: 100
properties:
max.poll.interval.ms: 300000
session.timeout.ms: 45000
listener:
ack-mode: manual_immediate
concurrency: 3
# Configuração de retry para listeners sem @RetryableTopic
retry:
backoff:
interval: 1000
max-interval: 10000
multiplier: 2.0Kafka Streams — Processamento em Tempo Real
Kafka Streams é uma biblioteca Java para processamento de streams sobre Kafka. Não requer cluster separado — roda dentro da aplicação.
@Configuration
@EnableKafkaStreams
public class OrderStreamConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration streamsConfig() {
Map<String, Object> config = new HashMap<>();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return new KafkaStreamsConfiguration(config);
}
}
@Component
public class OrderAnalyticsStream {
@Autowired
void buildPipeline(StreamsBuilder builder) {
// KStream — sequência de registros (eventos)
KStream<String, String> ordersStream = builder.stream("orders.placed");
// Filtrar e transformar
KStream<String, OrderValue> orderValues = ordersStream
.filter((key, value) -> value != null)
.mapValues(value -> {
OrderPlacedEvent event = deserialize(value);
return new OrderValue(event.orderId(), event.total());
});
// WINDOWED AGGREGATION — total de pedidos por hora
KTable<Windowed<String>, Long> orderCountByHour = ordersStream
.groupBy((key, value) -> "all") // tudo no mesmo grupo
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.count(Materialized.as("order-count-by-hour"));
// Enviar resultado para outro tópico
orderCountByHour
.toStream()
.map((windowedKey, count) -> KeyValue.pair(
windowedKey.window().startTime().toString(),
count.toString()
))
.to("orders.hourly-count");
// JOIN — enriquecer evento de pedido com dados do cliente
KTable<String, String> customersTable = builder.table("customers");
KStream<String, String> enrichedOrders = ordersStream.join(
customersTable,
(orderJson, customerJson) -> enrichOrder(orderJson, customerJson),
Joined.with(Serdes.String(), Serdes.String(), Serdes.String())
);
enrichedOrders.to("orders.enriched");
}
}Schema Registry e Avro
Schema Registry armazena e versiona schemas de mensagens. Avro é o formato de serialização mais comum. Garante compatibilidade entre producers e consumers.
# Registrar schema
curl -X POST http://localhost:8081/subjects/orders.placed-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schema": "{\"type\":\"record\",\"name\":\"OrderPlaced\",\"fields\":[{\"name\":\"orderId\",\"type\":\"string\"},{\"name\":\"customerId\",\"type\":\"string\"},{\"name\":\"total\",\"type\":\"double\"},{\"name\":\"occurredAt\",\"type\":\"long\"}]}"
}'
# Listar subjects (schemas registrados)
curl http://localhost:8081/subjects
# Ver versões de um schema
curl http://localhost:8081/subjects/orders.placed-value/versions
# Compatibilidade — tipos: BACKWARD, FORWARD, FULL, NONE
curl -X PUT http://localhost:8081/config/orders.placed-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}'
# BACKWARD: novo schema pode ler dados do schema anterior
# FORWARD: schema anterior pode ler dados do novo schema
# FULL: ambos os sentidos// Spring Boot com Avro + Schema Registry
// pom.xml: io.confluent:kafka-avro-serializer
// Schema em src/main/avro/OrderPlaced.avsc
// application.yml:
// spring.kafka.producer.value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
// spring.kafka.properties.schema.registry.url: http://localhost:8081
// Usando tipo gerado pelo Avro Maven plugin
@Service
public class OrderEventPublisher {
private final KafkaTemplate<String, OrderPlaced> kafkaTemplate;
public void publish(Order order) {
OrderPlaced avroEvent = OrderPlaced.newBuilder()
.setOrderId(order.id().value())
.setCustomerId(order.customerId().value())
.setTotal(order.total().doubleValue())
.setOccurredAt(Instant.now().toEpochMilli())
.build();
kafkaTemplate.send("orders.placed", order.id().value(), avroEvent);
}
}Monitoramento — Consumer Lag, JMX e Kafka UI
# Consumer lag via CLI (já mostrado em Consumer Groups)
kafka-consumer-groups.sh --describe --group order-processor $KAFKA_BOOTSTRAP
# Monitoramento de broker — JMX
# Métricas importantes:
# kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec → throughput de entrada
# kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec → bytes de entrada
# kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions → partições sem réplica suficiente
# kafka.server:type=KafkaController,name=ActiveControllerCount → deve ser 1 no cluster
# Habilitar JMX
export JMX_PORT=9999
./bin/kafka-server-start.sh config/server.properties# AKHQ (Kafka UI) via Docker Compose
akhq:
image: tchiotludo/akhq:0.25.0
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
local:
properties:
bootstrap.servers: "kafka:9092"
schema-registry:
url: "http://schema-registry:8081"
ports:
- "8080:8080"
depends_on:
- kafkaKafka em Produção — ISR, Compaction e Configurações Críticas
# === ISR (In-Sync Replicas) ===
# ISR são as réplicas que estão sincronizadas com o leader
# Se uma réplica está muito atrasada, é removida do ISR
# Ver ISR por partição
kafka-topics.sh --describe --topic orders.placed $KAFKA_BOOTSTRAP
# Isr: 1,2,3 → todas as 3 réplicas estão sincronizadas (saudável)
# Isr: 1 → apenas o leader está no ISR (ALERTA! réplicas atrasadas)
# UNCLEAN LEADER ELECTION — deve estar DESABILITADO em produção
# Se true, permite eleição de réplica fora do ISR como leader
# Consequência: possível PERDA DE DADOS
unclean.leader.election.enable=false # nunca mude para true em produção
# min.insync.replicas + acks=all = forte garantia de durabilidade
# Com replication.factor=3 e min.insync.replicas=2:
# → escrita aceita apenas se ao menos 2 réplicas confirmarem
# → 1 broker pode cair sem impacto
# → se 2 brokers caírem, o producer recebe NotEnoughReplicasException
# === TOPIC COMPACTION ===
# Log compaction: mantém apenas o último valor por chave
# Ideal para: tabelas de estado, registros de entidade
# Criar tópico compactado
kafka-topics.sh --create \
--topic customers \
--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.1 \
--config segment.ms=3600000 \
$KAFKA_BOOTSTRAP
# Tombstone: mensagem com value=null deleta a chave na compactionComparativo Kafka vs RabbitMQ
KAFKA RABBITMQ
─────────────────────────────────────────────────────────────────
Modelo Log distribuído Message broker tradicional
Persistência Mensagens retidas por período Mensagem removida após consume
Replay SIM — reprocessar histórico NÃO (sem configuração extra)
Ordenação Por partição Por fila (FIFO estrito)
Throughput Muito alto (milhões/seg) Alto (centenas de mil/seg)
Roteamento Por tópico/partição Exchanges e bindings flexíveis
Protocolos Protocolo proprietário AMQP, STOMP, MQTT
Consumer Groups SIM — partições divididas Consumers concorrem pela mesma fila
Latência Milissegundos (configurável) Sub-milissegundo possível
Setup Mais complexo Mais simples
Monitoramento JMX, Kafka UI (AKHQ) Management UI integrada
USE KAFKA QUANDO:
✓ Alto volume de eventos (IoT, logs, clickstream)
✓ Múltiplos consumers com processamentos independentes
✓ Necessidade de replay de eventos (Event Sourcing)
✓ Processamento em stream (Kafka Streams, Flink)
✓ Retenção histórica de eventos
USE RABBITMQ QUANDO:
✓ Roteamento complexo (exchanges, routing keys, bindings)
✓ Mensagens de trabalho (work queues) com ACK granular
✓ Prioridade de mensagens
✓ Request-reply pattern (RPC via messaging)
✓ Integrações com sistemas legados (AMQP)
✓ Menor complexidade operacional