Em arquitetura orientada a eventos, componentes se comunicam publicando e consumindo eventos em vez de chamadas diretas. O publicador não sabe quem vai reagir. Isso reduz acoplamento, permite escalar consumidores independentemente e viabiliza auditoria natural do sistema.
Domain Events vs Integration Events
A distinção é fundamental e frequentemente ignorada.
Domain Events acontecem dentro de um bounded context, transportam semântica de negócio e são processados in-process (mesmo processo, mesma transação). São parte do modelo de domínio.
Integration Events cruzam boundaries de serviço/contexto, são serializados e publicados em um broker (Kafka, RabbitMQ). São parte da infraestrutura de comunicação.
// DOMAIN EVENT — semantica de negócio, in-process, imutável
// Nome sempre no passado: algo que já aconteceu
public record OrderPlaced(
OrderId orderId,
CustomerId customerId,
List<OrderItem> items,
Money total,
Instant occurredAt
) implements DomainEvent {}
public record PaymentProcessed(
OrderId orderId,
String paymentId,
Money amount,
Instant occurredAt
) implements DomainEvent {}
public record OrderShipped(
OrderId orderId,
String trackingCode,
Instant occurredAt
) implements DomainEvent {}
// INTEGRATION EVENT — cruzando fronteiras de serviços, serializado para broker
// Contém apenas o necessário para outros contextos — sem tipos de domínio internos
public record OrderPlacedIntegrationEvent(
String orderId,
String customerId,
BigDecimal total,
String currency,
Instant occurredAt
) {}
// QUANDO USAR CADA UM:
// Domain Event: dentro do mesmo serviço, mesma transação
// → pedido colocado → enviar email de confirmação (mesmo serviço)
// Integration Event: comunicação entre serviços
// → pedido colocado → notificar serviço de estoque (outro serviço)Event Sourcing — Conceito, Event Store e Projeções
Em Event Sourcing, o estado atual de uma entidade é derivado do histórico completo de eventos que aconteceram com ela. Em vez de salvar o estado atual, salvamos a sequência de eventos.
// EVENTOS que descrevem mudanças na Order
public sealed interface OrderEvent permits
OrderCreated, ItemAdded, ItemRemoved, OrderConfirmed, OrderCancelled {}
public record OrderCreated(
String orderId, String customerId, Instant at
) implements OrderEvent {}
public record ItemAdded(
String orderId, String productId, int quantity, BigDecimal price, Instant at
) implements OrderEvent {}
public record OrderConfirmed(
String orderId, String paymentId, Instant at
) implements OrderEvent {}
public record OrderCancelled(
String orderId, String reason, Instant at
) implements OrderEvent {}
// ENTIDADE que reconstrói seu estado a partir dos eventos
public class Order {
private String id;
private String customerId;
private List<OrderItem> items = new ArrayList<>();
private OrderStatus status;
private final List<OrderEvent> uncommittedEvents = new ArrayList<>();
// Construtor privado — Order só é criada via eventos
private Order() {}
// Método factory — gera o primeiro evento
public static Order create(String customerId) {
Order order = new Order();
order.apply(new OrderCreated(UUID.randomUUID().toString(), customerId, Instant.now()));
return order;
}
public void addItem(String productId, int quantity, BigDecimal price) {
if (status != OrderStatus.DRAFT) {
throw new InvalidOrderStateException("Itens só podem ser adicionados em pedidos em rascunho");
}
apply(new ItemAdded(id, productId, quantity, price, Instant.now()));
}
public void confirm(String paymentId) {
if (items.isEmpty()) {
throw new DomainException("Pedido sem itens não pode ser confirmado");
}
apply(new OrderConfirmed(id, paymentId, Instant.now()));
}
// Aplica o evento: muda o estado E registra o evento para persistência
private void apply(OrderEvent event) {
when(event);
uncommittedEvents.add(event);
}
// Reconstrói o estado a partir de um evento (sem efeitos colaterais)
private void when(OrderEvent event) {
switch (event) {
case OrderCreated e -> {
this.id = e.orderId();
this.customerId = e.customerId();
this.status = OrderStatus.DRAFT;
}
case ItemAdded e -> {
items.add(new OrderItem(e.productId(), e.quantity(), e.price()));
}
case OrderConfirmed e -> {
this.status = OrderStatus.CONFIRMED;
}
case OrderCancelled e -> {
this.status = OrderStatus.CANCELLED;
}
}
}
// Carrega a entidade a partir do histórico de eventos (event store → memória)
public static Order reconstitute(List<OrderEvent> history) {
Order order = new Order();
history.forEach(order::when); // aplica sem gerar novos eventos
return order;
}
public List<OrderEvent> uncommittedEvents() { return List.copyOf(uncommittedEvents); }
public void clearUncommittedEvents() { uncommittedEvents.clear(); }
}
// EVENT STORE — repositório de eventos (append-only)
public interface EventStore {
void append(String aggregateId, List<OrderEvent> events, int expectedVersion);
List<OrderEvent> loadEvents(String aggregateId);
List<OrderEvent> loadEvents(String aggregateId, int fromVersion);
}
@Repository
public class JpaEventStore implements EventStore {
private final EventStorageRepository repo;
@Override
@Transactional
public void append(String aggregateId, List<OrderEvent> events, int expectedVersion) {
// Verifica versão para detectar conflitos de concorrência
int currentVersion = repo.countByAggregateId(aggregateId);
if (currentVersion != expectedVersion) {
throw new ConcurrencyException(
"Versão esperada " + expectedVersion + " mas encontrada " + currentVersion
);
}
int version = currentVersion;
for (OrderEvent event : events) {
repo.save(new EventRecord(aggregateId, ++version, serialize(event)));
}
}
@Override
public List<OrderEvent> loadEvents(String aggregateId) {
return repo.findByAggregateIdOrderByVersion(aggregateId)
.stream()
.map(this::deserialize)
.toList();
}
}
// PROJEÇÃO — view derivada do histórico de eventos (read model)
@Component
public class OrderSummaryProjection {
private final OrderSummaryRepository summaryRepo;
@EventListener
public void on(OrderCreated event) {
summaryRepo.save(new OrderSummary(
event.orderId(), event.customerId(), 0, BigDecimal.ZERO, "DRAFT"
));
}
@EventListener
public void on(ItemAdded event) {
summaryRepo.findById(event.orderId()).ifPresent(summary -> {
summaryRepo.save(summary.withAddedItem(event.price(), event.quantity()));
});
}
@EventListener
public void on(OrderConfirmed event) {
summaryRepo.findById(event.orderId()).ifPresent(summary ->
summaryRepo.save(summary.withStatus("CONFIRMED"))
);
}
}
// SNAPSHOT — otimização para aggregates com histórico longo
public class OrderRepository {
private final EventStore eventStore;
private final SnapshotStore snapshotStore;
public Order load(String orderId) {
// Tenta carregar snapshot recente
Optional<OrderSnapshot> snapshot = snapshotStore.findLatest(orderId);
if (snapshot.isPresent()) {
// Carrega apenas eventos após o snapshot
List<OrderEvent> events = eventStore.loadEvents(orderId, snapshot.get().version());
Order order = Order.fromSnapshot(snapshot.get());
events.forEach(order::applyFromHistory);
return order;
} else {
// Sem snapshot — carrega todos os eventos
List<OrderEvent> events = eventStore.loadEvents(orderId);
return Order.reconstitute(events);
}
}
}CQRS — Command Query Responsibility Segregation
CQRS separa as operações de escrita (Commands) das operações de leitura (Queries) em modelos distintos.
// ═══════ LADO DO COMANDO (escrita) ═══════
// Commands são imperatives — descrevem intenção
public record PlaceOrderCommand(
String customerId,
List<OrderItemRequest> items,
String paymentMethodId
) {}
public record ConfirmOrderCommand(String orderId, String paymentId) {}
// Command Handler — processa o comando e muda estado
@Component
public class OrderCommandHandler {
private final OrderRepository orderRepository;
private final ApplicationEventPublisher eventPublisher;
public OrderId handle(PlaceOrderCommand cmd) {
Order order = new Order(new CustomerId(cmd.customerId()), buildItems(cmd.items()));
orderRepository.save(order);
eventPublisher.publishEvent(new OrderPlaced(order.id(), order.customerId(), Instant.now()));
return order.id();
}
public void handle(ConfirmOrderCommand cmd) {
Order order = orderRepository.findById(new OrderId(cmd.orderId()))
.orElseThrow(() -> new OrderNotFoundException(cmd.orderId()));
order.confirm();
orderRepository.save(order);
}
}
// ═══════ LADO DA QUERY (leitura) ═══════
// Read models — otimizados para consulta, podem ser desnormalizados
public record OrderSummaryView(
String orderId,
String customerId,
String customerName,
BigDecimal total,
String status,
int itemCount,
Instant createdAt
) {}
public record OrderDetailView(
String orderId,
String customerId,
String customerName,
String customerEmail,
List<OrderItemView> items,
BigDecimal subtotal,
BigDecimal discount,
BigDecimal total,
String status,
String paymentId,
Instant createdAt,
Instant updatedAt
) {}
// Query Handler — lê diretamente do banco de leitura (pode ser tabela otimizada)
@Component
public class OrderQueryHandler {
private final OrderReadRepository readRepo;
public List<OrderSummaryView> findByCustomer(String customerId) {
return readRepo.findSummariesByCustomer(customerId);
}
public OrderDetailView findById(String orderId) {
return readRepo.findDetailById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
}
public Page<OrderSummaryView> listAll(Pageable pageable) {
return readRepo.findAllSummaries(pageable);
}
}
// READ REPOSITORY — queries complexas com JOIN, sem lógica de domínio
public interface OrderReadRepository {
List<OrderSummaryView> findSummariesByCustomer(String customerId);
Optional<OrderDetailView> findDetailById(String orderId);
Page<OrderSummaryView> findAllSummaries(Pageable pageable);
}
@Repository
public class JpaOrderReadRepository implements OrderReadRepository {
private final EntityManager em;
@Override
public Optional<OrderDetailView> findDetailById(String orderId) {
String jpql = """
SELECT new com.empresa.order.OrderDetailView(
o.id, c.id, c.name, c.email, o.total, o.status, ...
)
FROM OrderEntity o
JOIN CustomerEntity c ON c.id = o.customerId
WHERE o.id = :orderId
""";
// query projetada diretamente para o DTO — sem carregar entidade de domínio
}
}Outbox Pattern — Garantia de Entrega de Eventos
O Outbox Pattern resolve o problema de atomicidade entre salvar no banco e publicar no broker: os dois devem acontecer ou nenhum.
// PROBLEMA — race condition sem outbox
@Transactional
public void placeOrder(PlaceOrderCommand cmd) {
Order order = new Order(cmd.customerId(), cmd.items());
orderRepository.save(order);
// ❌ Se cair aqui, o pedido foi salvo mas o evento não foi publicado
// Stock service nunca vai saber do pedido
kafka.publish(new OrderPlaced(order.id(), ...));
}
// SOLUÇÃO — Outbox: evento vai para o banco na mesma transação
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
@Id
private String id = UUID.randomUUID().toString();
private String aggregateId;
private String aggregateType;
private String eventType;
private String payload; // JSON do evento
private Instant createdAt = Instant.now();
private boolean published = false;
private Instant publishedAt;
}
public interface OutboxRepository extends JpaRepository<OutboxEvent, String> {
List<OutboxEvent> findByPublishedFalseOrderByCreatedAtAsc();
}
// ✅ Tudo em uma única transação
@Transactional
public OrderId placeOrder(PlaceOrderCommand cmd) {
Order order = new Order(new CustomerId(cmd.customerId()), buildItems(cmd.items()));
orderRepository.save(order); // salva no banco
// Salva o evento no outbox NA MESMA transação
OutboxEvent outbox = new OutboxEvent(
order.id().value(),
"Order",
"OrderPlaced",
serialize(new OrderPlacedIntegrationEvent(
order.id().value(),
order.customerId().value(),
order.total().amount(),
Instant.now()
))
);
outboxRepository.save(outbox);
return order.id();
// Se cair aqui, nenhum dos dois foi salvo (rollback)
// Se chegar aqui, os dois foram salvos de forma atômica
}
// WORKER — lê o outbox e publica no Kafka de forma confiável
@Component
public class OutboxPublisher {
private final OutboxRepository outboxRepo;
private final KafkaTemplate<String, String> kafka;
@Scheduled(fixedDelay = 100) // roda a cada 100ms
@Transactional
public void processOutbox() {
List<OutboxEvent> pending = outboxRepo.findByPublishedFalseOrderByCreatedAtAsc();
for (OutboxEvent event : pending) {
try {
String topic = resolveTopicFor(event.getEventType());
kafka.send(topic, event.getAggregateId(), event.getPayload()).get();
event.markAsPublished();
outboxRepo.save(event);
} catch (Exception e) {
log.error("Falha ao publicar evento {}: {}", event.getId(), e.getMessage());
// não marca como publicado — será tentado na próxima iteração
}
}
}
}Saga Pattern — Transações Distribuídas
Uma Saga é uma sequência de transações locais onde cada etapa publica um evento. Se uma etapa falha, eventos de compensação desfazem as etapas anteriores.
Coreografia: cada serviço reage a eventos e publica os próximos — sem orquestrador central.
Orquestração: um orquestrador central comanda cada etapa — mais visibilidade mas mais acoplamento.
// ═══════ COREOGRAFIA — checkout ═══════
// Fluxo: OrderPlaced → InventoryReserved → PaymentCharged → OrderConfirmed
// (compensação) InventoryFailed → OrderCancelled
// (compensação) PaymentFailed → InventoryReleased → OrderCancelled
// ORDER SERVICE
@KafkaListener(topics = "checkout.initiated")
public void onCheckoutInitiated(CheckoutInitiatedEvent event) {
Order order = new Order(event.customerId(), event.items());
orderRepository.save(order);
kafka.send("orders.placed", new OrderPlacedEvent(order.id(), event.items()));
}
@KafkaListener(topics = "payment.processed")
public void onPaymentProcessed(PaymentProcessedEvent event) {
Order order = orderRepository.findById(event.orderId());
order.confirm(event.paymentId());
orderRepository.save(order);
kafka.send("orders.confirmed", new OrderConfirmedEvent(order.id()));
}
@KafkaListener(topics = "payment.failed")
public void onPaymentFailed(PaymentFailedEvent event) {
Order order = orderRepository.findById(event.orderId());
order.cancel("Pagamento recusado: " + event.reason());
orderRepository.save(order);
// notifica cliente
}
// INVENTORY SERVICE
@KafkaListener(topics = "orders.placed")
public void onOrderPlaced(OrderPlacedEvent event) {
try {
inventory.reserveAll(event.orderId(), event.items());
kafka.send("inventory.reserved", new InventoryReservedEvent(event.orderId()));
} catch (OutOfStockException e) {
kafka.send("inventory.failed", new InventoryFailedEvent(event.orderId(), e.getMessage()));
}
}
@KafkaListener(topics = "payment.failed")
public void onPaymentFailed(PaymentFailedEvent event) {
inventory.releaseAll(event.orderId()); // compensação
}
// PAYMENT SERVICE
@KafkaListener(topics = "inventory.reserved")
public void onInventoryReserved(InventoryReservedEvent event) {
Order order = orderRepository.findById(event.orderId());
try {
PaymentResult result = paymentGateway.charge(order.total(), order.customerId());
kafka.send("payment.processed", new PaymentProcessedEvent(event.orderId(), result.paymentId()));
} catch (PaymentException e) {
kafka.send("payment.failed", new PaymentFailedEvent(event.orderId(), e.getMessage()));
}
}
// ═══════ ORQUESTRAÇÃO — Saga Orchestrator ═══════
@Component
public class CheckoutSagaOrchestrator {
private final SagaStateRepository sagaRepo;
private final KafkaTemplate<String, Object> kafka;
// Inicia a saga
public void startCheckout(CheckoutRequest request) {
CheckoutSaga saga = new CheckoutSaga(request.orderId(), SagaStep.RESERVE_INVENTORY);
sagaRepo.save(saga);
kafka.send("inventory.reserve-command",
new ReserveInventoryCommand(request.orderId(), request.items())
);
}
@KafkaListener(topics = "inventory.reserved-event")
public void onInventoryReserved(InventoryReservedEvent event) {
CheckoutSaga saga = sagaRepo.findByOrderId(event.orderId());
saga.advance(SagaStep.CHARGE_PAYMENT);
sagaRepo.save(saga);
kafka.send("payment.charge-command",
new ChargePaymentCommand(event.orderId(), saga.total())
);
}
@KafkaListener(topics = "payment.charged-event")
public void onPaymentCharged(PaymentChargedEvent event) {
CheckoutSaga saga = sagaRepo.findByOrderId(event.orderId());
saga.complete();
sagaRepo.save(saga);
kafka.send("orders.confirm-command", new ConfirmOrderCommand(event.orderId()));
}
@KafkaListener(topics = "payment.failed-event")
public void onPaymentFailed(PaymentFailedEvent event) {
CheckoutSaga saga = sagaRepo.findByOrderId(event.orderId());
saga.compensate();
sagaRepo.save(saga);
// Orquestrador comanda a compensação explicitamente
kafka.send("inventory.release-command",
new ReleaseInventoryCommand(event.orderId())
);
kafka.send("orders.cancel-command",
new CancelOrderCommand(event.orderId(), "Pagamento falhou")
);
}
}Idempotência — Idempotency Keys e Deduplicação
Em sistemas distribuídos, mensagens podem ser entregues mais de uma vez. Handlers devem ser idempotentes.
// ❌ Não-idempotente — duas entregas = dois débitos
@KafkaListener(topics = "payment.charge-command")
public void onChargePayment(ChargePaymentCommand cmd) {
paymentAccount.debit(cmd.amount()); // débita a cada entrega
kafka.send("payment.charged-event", new PaymentChargedEvent(cmd.orderId()));
}
// ✅ Idempotente com tabela de processados
@Entity
@Table(name = "processed_messages")
public class ProcessedMessage {
@Id
private String messageId; // chave de idempotência (offset Kafka ou ID do evento)
private String consumerId;
private Instant processedAt;
}
@KafkaListener(topics = "payment.charge-command")
@Transactional
public void onChargePayment(
ChargePaymentCommand cmd,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
// Chave de idempotência: topic + partition + offset = único
String messageId = "payment.charge-command:" + partition + ":" + offset;
if (processedMessages.existsById(messageId)) {
log.info("Mensagem {} já foi processada. Ignorando.", messageId);
return;
}
// Processa
paymentAccount.debit(cmd.amount());
kafka.send("payment.charged-event", new PaymentChargedEvent(cmd.orderId()));
// Registra como processada na mesma transação
processedMessages.save(new ProcessedMessage(messageId, "payment-service", Instant.now()));
}
// ✅ Idempotência em API REST — Idempotency-Key header
@PostMapping("/api/payments")
public ResponseEntity<PaymentResponse> processPayment(
@RequestBody PaymentRequest body,
@RequestHeader("Idempotency-Key") String idempotencyKey) {
// Verifica se já processou esta requisição
Optional<PaymentResponse> cached = idempotencyCache.get(idempotencyKey);
if (cached.isPresent()) {
return ResponseEntity.ok(cached.get()); // retorna resultado anterior
}
PaymentResponse response = paymentService.process(body);
// Armazena resultado para futuras chamadas com a mesma chave
idempotencyCache.store(idempotencyKey, response, Duration.ofHours(24));
return ResponseEntity.ok(response);
}Dead Letter Queue — Configuração e Tratamento
Mensagens que falham repetidamente vão para a DLQ para análise e reprocessamento manual.
// SPRING BOOT — configuração de retry e DLT com @RetryableTopic
@Component
public class OrderEventConsumer {
@RetryableTopic(
attempts = "4", // 1 tentativa original + 3 retries
backoff = @Backoff(
delay = 1000, // 1s inicial
multiplier = 2.0, // duplica a cada retry: 1s, 2s, 4s
maxDelay = 10000 // máximo 10s
),
dltTopicSuffix = ".DLT", // tópico de DLT: orders.placed.DLT
include = {RetryableException.class} // apenas erros transientes fazem retry
)
@KafkaListener(topics = "orders.placed")
public void onOrderPlaced(OrderPlacedEvent event) {
// Se lançar RetryableException, vai para retry topic
// Se lançar outra exceção (DomainException), vai direto para DLT
inventoryService.reserve(event.orderId(), event.items());
}
// Handler da Dead Letter Topic — para análise e reprocessamento manual
@DltHandler
public void handleDlt(
OrderPlacedEvent event,
@Header(KafkaHeaders.EXCEPTION_MESSAGE) String exceptionMessage) {
log.error("Mensagem na DLT | orderId={} | erro={}",
event.orderId(), exceptionMessage
);
// Salva para análise
dltRecordRepository.save(new DltRecord(
event.orderId(),
"orders.placed",
serialize(event),
exceptionMessage,
Instant.now()
));
// Pode enviar alerta para Slack/PagerDuty
alertService.sendDltAlert("orders.placed", event.orderId(), exceptionMessage);
}
}Exemplo Completo — Checkout Event Flow
Fluxo completo de um checkout com todos os conceitos integrados:
Cliente faz checkout
│
▼
[OrderService] — salva Order + OutboxEvent (mesma TX)
│
▼ (OutboxPublisher lê e publica)
[Kafka: orders.placed]
│
├──→ [InventoryService] — reserva estoque
│ │ OK → [Kafka: inventory.reserved]
│ │ Falha → [Kafka: inventory.failed]
│
│ [inventory.reserved]
│ │
│ ▼
│ [PaymentService] — cobra cartão
│ │ OK → [Kafka: payment.processed]
│ │ Falha → [Kafka: payment.failed]
│
│ [payment.processed]
│ │
│ ▼
└──→ [OrderService] — confirma pedido
│
▼
[Kafka: orders.confirmed]
│
├──→ [NotificationService] — e-mail de confirmação
└──→ [ShippingService] — gera etiqueta de envio
COMPENSAÇÕES:
[inventory.failed] → [OrderService] cancela pedido
[payment.failed] → [InventoryService] libera reserva
→ [OrderService] cancela pedido