Design de Software

Event-Driven Design

Arquitetura orientada a eventos — Domain Events, Event Sourcing, CQRS, Outbox, Saga e Idempotência com exemplos completos

Em arquitetura orientada a eventos, componentes se comunicam publicando e consumindo eventos em vez de chamadas diretas. O publicador não sabe quem vai reagir. Isso reduz acoplamento, permite escalar consumidores independentemente e viabiliza auditoria natural do sistema.


Domain Events vs Integration Events

A distinção é fundamental e frequentemente ignorada.

Domain Events acontecem dentro de um bounded context, transportam semântica de negócio e são processados in-process (mesmo processo, mesma transação). São parte do modelo de domínio.

Integration Events cruzam boundaries de serviço/contexto, são serializados e publicados em um broker (Kafka, RabbitMQ). São parte da infraestrutura de comunicação.

// DOMAIN EVENT — semantica de negócio, in-process, imutável
// Nome sempre no passado: algo que já aconteceu
public record OrderPlaced(
    OrderId orderId,
    CustomerId customerId,
    List<OrderItem> items,
    Money total,
    Instant occurredAt
) implements DomainEvent {}

public record PaymentProcessed(
    OrderId orderId,
    String paymentId,
    Money amount,
    Instant occurredAt
) implements DomainEvent {}

public record OrderShipped(
    OrderId orderId,
    String trackingCode,
    Instant occurredAt
) implements DomainEvent {}

// INTEGRATION EVENT — cruzando fronteiras de serviços, serializado para broker
// Contém apenas o necessário para outros contextos — sem tipos de domínio internos
public record OrderPlacedIntegrationEvent(
    String orderId,
    String customerId,
    BigDecimal total,
    String currency,
    Instant occurredAt
) {}

// QUANDO USAR CADA UM:
// Domain Event: dentro do mesmo serviço, mesma transação
//   → pedido colocado → enviar email de confirmação (mesmo serviço)
// Integration Event: comunicação entre serviços
//   → pedido colocado → notificar serviço de estoque (outro serviço)

Event Sourcing — Conceito, Event Store e Projeções

Em Event Sourcing, o estado atual de uma entidade é derivado do histórico completo de eventos que aconteceram com ela. Em vez de salvar o estado atual, salvamos a sequência de eventos.

// EVENTOS que descrevem mudanças na Order
public sealed interface OrderEvent permits
    OrderCreated, ItemAdded, ItemRemoved, OrderConfirmed, OrderCancelled {}

public record OrderCreated(
    String orderId, String customerId, Instant at
) implements OrderEvent {}

public record ItemAdded(
    String orderId, String productId, int quantity, BigDecimal price, Instant at
) implements OrderEvent {}

public record OrderConfirmed(
    String orderId, String paymentId, Instant at
) implements OrderEvent {}

public record OrderCancelled(
    String orderId, String reason, Instant at
) implements OrderEvent {}

// ENTIDADE que reconstrói seu estado a partir dos eventos
public class Order {
    private String id;
    private String customerId;
    private List<OrderItem> items = new ArrayList<>();
    private OrderStatus status;
    private final List<OrderEvent> uncommittedEvents = new ArrayList<>();

    // Construtor privado — Order só é criada via eventos
    private Order() {}

    // Método factory — gera o primeiro evento
    public static Order create(String customerId) {
        Order order = new Order();
        order.apply(new OrderCreated(UUID.randomUUID().toString(), customerId, Instant.now()));
        return order;
    }

    public void addItem(String productId, int quantity, BigDecimal price) {
        if (status != OrderStatus.DRAFT) {
            throw new InvalidOrderStateException("Itens só podem ser adicionados em pedidos em rascunho");
        }
        apply(new ItemAdded(id, productId, quantity, price, Instant.now()));
    }

    public void confirm(String paymentId) {
        if (items.isEmpty()) {
            throw new DomainException("Pedido sem itens não pode ser confirmado");
        }
        apply(new OrderConfirmed(id, paymentId, Instant.now()));
    }

    // Aplica o evento: muda o estado E registra o evento para persistência
    private void apply(OrderEvent event) {
        when(event);
        uncommittedEvents.add(event);
    }

    // Reconstrói o estado a partir de um evento (sem efeitos colaterais)
    private void when(OrderEvent event) {
        switch (event) {
            case OrderCreated e -> {
                this.id = e.orderId();
                this.customerId = e.customerId();
                this.status = OrderStatus.DRAFT;
            }
            case ItemAdded e -> {
                items.add(new OrderItem(e.productId(), e.quantity(), e.price()));
            }
            case OrderConfirmed e -> {
                this.status = OrderStatus.CONFIRMED;
            }
            case OrderCancelled e -> {
                this.status = OrderStatus.CANCELLED;
            }
        }
    }

    // Carrega a entidade a partir do histórico de eventos (event store → memória)
    public static Order reconstitute(List<OrderEvent> history) {
        Order order = new Order();
        history.forEach(order::when); // aplica sem gerar novos eventos
        return order;
    }

    public List<OrderEvent> uncommittedEvents() { return List.copyOf(uncommittedEvents); }
    public void clearUncommittedEvents() { uncommittedEvents.clear(); }
}

// EVENT STORE — repositório de eventos (append-only)
public interface EventStore {
    void append(String aggregateId, List<OrderEvent> events, int expectedVersion);
    List<OrderEvent> loadEvents(String aggregateId);
    List<OrderEvent> loadEvents(String aggregateId, int fromVersion);
}

@Repository
public class JpaEventStore implements EventStore {

    private final EventStorageRepository repo;

    @Override
    @Transactional
    public void append(String aggregateId, List<OrderEvent> events, int expectedVersion) {
        // Verifica versão para detectar conflitos de concorrência
        int currentVersion = repo.countByAggregateId(aggregateId);
        if (currentVersion != expectedVersion) {
            throw new ConcurrencyException(
                "Versão esperada " + expectedVersion + " mas encontrada " + currentVersion
            );
        }

        int version = currentVersion;
        for (OrderEvent event : events) {
            repo.save(new EventRecord(aggregateId, ++version, serialize(event)));
        }
    }

    @Override
    public List<OrderEvent> loadEvents(String aggregateId) {
        return repo.findByAggregateIdOrderByVersion(aggregateId)
            .stream()
            .map(this::deserialize)
            .toList();
    }
}

// PROJEÇÃO — view derivada do histórico de eventos (read model)
@Component
public class OrderSummaryProjection {

    private final OrderSummaryRepository summaryRepo;

    @EventListener
    public void on(OrderCreated event) {
        summaryRepo.save(new OrderSummary(
            event.orderId(), event.customerId(), 0, BigDecimal.ZERO, "DRAFT"
        ));
    }

    @EventListener
    public void on(ItemAdded event) {
        summaryRepo.findById(event.orderId()).ifPresent(summary -> {
            summaryRepo.save(summary.withAddedItem(event.price(), event.quantity()));
        });
    }

    @EventListener
    public void on(OrderConfirmed event) {
        summaryRepo.findById(event.orderId()).ifPresent(summary ->
            summaryRepo.save(summary.withStatus("CONFIRMED"))
        );
    }
}

// SNAPSHOT — otimização para aggregates com histórico longo
public class OrderRepository {
    private final EventStore eventStore;
    private final SnapshotStore snapshotStore;

    public Order load(String orderId) {
        // Tenta carregar snapshot recente
        Optional<OrderSnapshot> snapshot = snapshotStore.findLatest(orderId);

        if (snapshot.isPresent()) {
            // Carrega apenas eventos após o snapshot
            List<OrderEvent> events = eventStore.loadEvents(orderId, snapshot.get().version());
            Order order = Order.fromSnapshot(snapshot.get());
            events.forEach(order::applyFromHistory);
            return order;
        } else {
            // Sem snapshot — carrega todos os eventos
            List<OrderEvent> events = eventStore.loadEvents(orderId);
            return Order.reconstitute(events);
        }
    }
}

CQRS — Command Query Responsibility Segregation

CQRS separa as operações de escrita (Commands) das operações de leitura (Queries) em modelos distintos.

// ═══════ LADO DO COMANDO (escrita) ═══════

// Commands são imperatives — descrevem intenção
public record PlaceOrderCommand(
    String customerId,
    List<OrderItemRequest> items,
    String paymentMethodId
) {}

public record ConfirmOrderCommand(String orderId, String paymentId) {}

// Command Handler — processa o comando e muda estado
@Component
public class OrderCommandHandler {

    private final OrderRepository orderRepository;
    private final ApplicationEventPublisher eventPublisher;

    public OrderId handle(PlaceOrderCommand cmd) {
        Order order = new Order(new CustomerId(cmd.customerId()), buildItems(cmd.items()));
        orderRepository.save(order);
        eventPublisher.publishEvent(new OrderPlaced(order.id(), order.customerId(), Instant.now()));
        return order.id();
    }

    public void handle(ConfirmOrderCommand cmd) {
        Order order = orderRepository.findById(new OrderId(cmd.orderId()))
            .orElseThrow(() -> new OrderNotFoundException(cmd.orderId()));
        order.confirm();
        orderRepository.save(order);
    }
}

// ═══════ LADO DA QUERY (leitura) ═══════

// Read models — otimizados para consulta, podem ser desnormalizados
public record OrderSummaryView(
    String orderId,
    String customerId,
    String customerName,
    BigDecimal total,
    String status,
    int itemCount,
    Instant createdAt
) {}

public record OrderDetailView(
    String orderId,
    String customerId,
    String customerName,
    String customerEmail,
    List<OrderItemView> items,
    BigDecimal subtotal,
    BigDecimal discount,
    BigDecimal total,
    String status,
    String paymentId,
    Instant createdAt,
    Instant updatedAt
) {}

// Query Handler — lê diretamente do banco de leitura (pode ser tabela otimizada)
@Component
public class OrderQueryHandler {

    private final OrderReadRepository readRepo;

    public List<OrderSummaryView> findByCustomer(String customerId) {
        return readRepo.findSummariesByCustomer(customerId);
    }

    public OrderDetailView findById(String orderId) {
        return readRepo.findDetailById(orderId)
            .orElseThrow(() -> new OrderNotFoundException(orderId));
    }

    public Page<OrderSummaryView> listAll(Pageable pageable) {
        return readRepo.findAllSummaries(pageable);
    }
}

// READ REPOSITORY — queries complexas com JOIN, sem lógica de domínio
public interface OrderReadRepository {
    List<OrderSummaryView> findSummariesByCustomer(String customerId);
    Optional<OrderDetailView> findDetailById(String orderId);
    Page<OrderSummaryView> findAllSummaries(Pageable pageable);
}

@Repository
public class JpaOrderReadRepository implements OrderReadRepository {

    private final EntityManager em;

    @Override
    public Optional<OrderDetailView> findDetailById(String orderId) {
        String jpql = """
            SELECT new com.empresa.order.OrderDetailView(
                o.id, c.id, c.name, c.email, o.total, o.status, ...
            )
            FROM OrderEntity o
            JOIN CustomerEntity c ON c.id = o.customerId
            WHERE o.id = :orderId
            """;
        // query projetada diretamente para o DTO — sem carregar entidade de domínio
    }
}

Outbox Pattern — Garantia de Entrega de Eventos

O Outbox Pattern resolve o problema de atomicidade entre salvar no banco e publicar no broker: os dois devem acontecer ou nenhum.

// PROBLEMA — race condition sem outbox
@Transactional
public void placeOrder(PlaceOrderCommand cmd) {
    Order order = new Order(cmd.customerId(), cmd.items());
    orderRepository.save(order);
    // ❌ Se cair aqui, o pedido foi salvo mas o evento não foi publicado
    // Stock service nunca vai saber do pedido
    kafka.publish(new OrderPlaced(order.id(), ...));
}

// SOLUÇÃO — Outbox: evento vai para o banco na mesma transação
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
    @Id
    private String id = UUID.randomUUID().toString();
    private String aggregateId;
    private String aggregateType;
    private String eventType;
    private String payload;  // JSON do evento
    private Instant createdAt = Instant.now();
    private boolean published = false;
    private Instant publishedAt;
}

public interface OutboxRepository extends JpaRepository<OutboxEvent, String> {
    List<OutboxEvent> findByPublishedFalseOrderByCreatedAtAsc();
}

// ✅ Tudo em uma única transação
@Transactional
public OrderId placeOrder(PlaceOrderCommand cmd) {
    Order order = new Order(new CustomerId(cmd.customerId()), buildItems(cmd.items()));
    orderRepository.save(order);  // salva no banco

    // Salva o evento no outbox NA MESMA transação
    OutboxEvent outbox = new OutboxEvent(
        order.id().value(),
        "Order",
        "OrderPlaced",
        serialize(new OrderPlacedIntegrationEvent(
            order.id().value(),
            order.customerId().value(),
            order.total().amount(),
            Instant.now()
        ))
    );
    outboxRepository.save(outbox);

    return order.id();
    // Se cair aqui, nenhum dos dois foi salvo (rollback)
    // Se chegar aqui, os dois foram salvos de forma atômica
}

// WORKER — lê o outbox e publica no Kafka de forma confiável
@Component
public class OutboxPublisher {

    private final OutboxRepository outboxRepo;
    private final KafkaTemplate<String, String> kafka;

    @Scheduled(fixedDelay = 100) // roda a cada 100ms
    @Transactional
    public void processOutbox() {
        List<OutboxEvent> pending = outboxRepo.findByPublishedFalseOrderByCreatedAtAsc();

        for (OutboxEvent event : pending) {
            try {
                String topic = resolveTopicFor(event.getEventType());
                kafka.send(topic, event.getAggregateId(), event.getPayload()).get();

                event.markAsPublished();
                outboxRepo.save(event);
            } catch (Exception e) {
                log.error("Falha ao publicar evento {}: {}", event.getId(), e.getMessage());
                // não marca como publicado — será tentado na próxima iteração
            }
        }
    }
}

Saga Pattern — Transações Distribuídas

Uma Saga é uma sequência de transações locais onde cada etapa publica um evento. Se uma etapa falha, eventos de compensação desfazem as etapas anteriores.

Coreografia: cada serviço reage a eventos e publica os próximos — sem orquestrador central.

Orquestração: um orquestrador central comanda cada etapa — mais visibilidade mas mais acoplamento.

// ═══════ COREOGRAFIA — checkout ═══════
// Fluxo: OrderPlaced → InventoryReserved → PaymentCharged → OrderConfirmed
//         (compensação) InventoryFailed → OrderCancelled
//         (compensação) PaymentFailed → InventoryReleased → OrderCancelled

// ORDER SERVICE
@KafkaListener(topics = "checkout.initiated")
public void onCheckoutInitiated(CheckoutInitiatedEvent event) {
    Order order = new Order(event.customerId(), event.items());
    orderRepository.save(order);
    kafka.send("orders.placed", new OrderPlacedEvent(order.id(), event.items()));
}

@KafkaListener(topics = "payment.processed")
public void onPaymentProcessed(PaymentProcessedEvent event) {
    Order order = orderRepository.findById(event.orderId());
    order.confirm(event.paymentId());
    orderRepository.save(order);
    kafka.send("orders.confirmed", new OrderConfirmedEvent(order.id()));
}

@KafkaListener(topics = "payment.failed")
public void onPaymentFailed(PaymentFailedEvent event) {
    Order order = orderRepository.findById(event.orderId());
    order.cancel("Pagamento recusado: " + event.reason());
    orderRepository.save(order);
    // notifica cliente
}

// INVENTORY SERVICE
@KafkaListener(topics = "orders.placed")
public void onOrderPlaced(OrderPlacedEvent event) {
    try {
        inventory.reserveAll(event.orderId(), event.items());
        kafka.send("inventory.reserved", new InventoryReservedEvent(event.orderId()));
    } catch (OutOfStockException e) {
        kafka.send("inventory.failed", new InventoryFailedEvent(event.orderId(), e.getMessage()));
    }
}

@KafkaListener(topics = "payment.failed")
public void onPaymentFailed(PaymentFailedEvent event) {
    inventory.releaseAll(event.orderId());  // compensação
}

// PAYMENT SERVICE
@KafkaListener(topics = "inventory.reserved")
public void onInventoryReserved(InventoryReservedEvent event) {
    Order order = orderRepository.findById(event.orderId());
    try {
        PaymentResult result = paymentGateway.charge(order.total(), order.customerId());
        kafka.send("payment.processed", new PaymentProcessedEvent(event.orderId(), result.paymentId()));
    } catch (PaymentException e) {
        kafka.send("payment.failed", new PaymentFailedEvent(event.orderId(), e.getMessage()));
    }
}

// ═══════ ORQUESTRAÇÃO — Saga Orchestrator ═══════
@Component
public class CheckoutSagaOrchestrator {

    private final SagaStateRepository sagaRepo;
    private final KafkaTemplate<String, Object> kafka;

    // Inicia a saga
    public void startCheckout(CheckoutRequest request) {
        CheckoutSaga saga = new CheckoutSaga(request.orderId(), SagaStep.RESERVE_INVENTORY);
        sagaRepo.save(saga);

        kafka.send("inventory.reserve-command",
            new ReserveInventoryCommand(request.orderId(), request.items())
        );
    }

    @KafkaListener(topics = "inventory.reserved-event")
    public void onInventoryReserved(InventoryReservedEvent event) {
        CheckoutSaga saga = sagaRepo.findByOrderId(event.orderId());
        saga.advance(SagaStep.CHARGE_PAYMENT);
        sagaRepo.save(saga);

        kafka.send("payment.charge-command",
            new ChargePaymentCommand(event.orderId(), saga.total())
        );
    }

    @KafkaListener(topics = "payment.charged-event")
    public void onPaymentCharged(PaymentChargedEvent event) {
        CheckoutSaga saga = sagaRepo.findByOrderId(event.orderId());
        saga.complete();
        sagaRepo.save(saga);

        kafka.send("orders.confirm-command", new ConfirmOrderCommand(event.orderId()));
    }

    @KafkaListener(topics = "payment.failed-event")
    public void onPaymentFailed(PaymentFailedEvent event) {
        CheckoutSaga saga = sagaRepo.findByOrderId(event.orderId());
        saga.compensate();
        sagaRepo.save(saga);

        // Orquestrador comanda a compensação explicitamente
        kafka.send("inventory.release-command",
            new ReleaseInventoryCommand(event.orderId())
        );
        kafka.send("orders.cancel-command",
            new CancelOrderCommand(event.orderId(), "Pagamento falhou")
        );
    }
}

Idempotência — Idempotency Keys e Deduplicação

Em sistemas distribuídos, mensagens podem ser entregues mais de uma vez. Handlers devem ser idempotentes.

// ❌ Não-idempotente — duas entregas = dois débitos
@KafkaListener(topics = "payment.charge-command")
public void onChargePayment(ChargePaymentCommand cmd) {
    paymentAccount.debit(cmd.amount());  // débita a cada entrega
    kafka.send("payment.charged-event", new PaymentChargedEvent(cmd.orderId()));
}

// ✅ Idempotente com tabela de processados
@Entity
@Table(name = "processed_messages")
public class ProcessedMessage {
    @Id
    private String messageId;  // chave de idempotência (offset Kafka ou ID do evento)
    private String consumerId;
    private Instant processedAt;
}

@KafkaListener(topics = "payment.charge-command")
@Transactional
public void onChargePayment(
        ChargePaymentCommand cmd,
        @Header(KafkaHeaders.OFFSET) long offset,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {

    // Chave de idempotência: topic + partition + offset = único
    String messageId = "payment.charge-command:" + partition + ":" + offset;

    if (processedMessages.existsById(messageId)) {
        log.info("Mensagem {} já foi processada. Ignorando.", messageId);
        return;
    }

    // Processa
    paymentAccount.debit(cmd.amount());
    kafka.send("payment.charged-event", new PaymentChargedEvent(cmd.orderId()));

    // Registra como processada na mesma transação
    processedMessages.save(new ProcessedMessage(messageId, "payment-service", Instant.now()));
}

// ✅ Idempotência em API REST — Idempotency-Key header
@PostMapping("/api/payments")
public ResponseEntity<PaymentResponse> processPayment(
        @RequestBody PaymentRequest body,
        @RequestHeader("Idempotency-Key") String idempotencyKey) {

    // Verifica se já processou esta requisição
    Optional<PaymentResponse> cached = idempotencyCache.get(idempotencyKey);
    if (cached.isPresent()) {
        return ResponseEntity.ok(cached.get());  // retorna resultado anterior
    }

    PaymentResponse response = paymentService.process(body);

    // Armazena resultado para futuras chamadas com a mesma chave
    idempotencyCache.store(idempotencyKey, response, Duration.ofHours(24));

    return ResponseEntity.ok(response);
}

Dead Letter Queue — Configuração e Tratamento

Mensagens que falham repetidamente vão para a DLQ para análise e reprocessamento manual.

// SPRING BOOT — configuração de retry e DLT com @RetryableTopic
@Component
public class OrderEventConsumer {

    @RetryableTopic(
        attempts = "4",                    // 1 tentativa original + 3 retries
        backoff = @Backoff(
            delay = 1000,                  // 1s inicial
            multiplier = 2.0,              // duplica a cada retry: 1s, 2s, 4s
            maxDelay = 10000               // máximo 10s
        ),
        dltTopicSuffix = ".DLT",          // tópico de DLT: orders.placed.DLT
        include = {RetryableException.class}  // apenas erros transientes fazem retry
    )
    @KafkaListener(topics = "orders.placed")
    public void onOrderPlaced(OrderPlacedEvent event) {
        // Se lançar RetryableException, vai para retry topic
        // Se lançar outra exceção (DomainException), vai direto para DLT
        inventoryService.reserve(event.orderId(), event.items());
    }

    // Handler da Dead Letter Topic — para análise e reprocessamento manual
    @DltHandler
    public void handleDlt(
            OrderPlacedEvent event,
            @Header(KafkaHeaders.EXCEPTION_MESSAGE) String exceptionMessage) {

        log.error("Mensagem na DLT | orderId={} | erro={}",
            event.orderId(), exceptionMessage
        );

        // Salva para análise
        dltRecordRepository.save(new DltRecord(
            event.orderId(),
            "orders.placed",
            serialize(event),
            exceptionMessage,
            Instant.now()
        ));

        // Pode enviar alerta para Slack/PagerDuty
        alertService.sendDltAlert("orders.placed", event.orderId(), exceptionMessage);
    }
}

Exemplo Completo — Checkout Event Flow

Fluxo completo de um checkout com todos os conceitos integrados:

Cliente faz checkout


[OrderService] — salva Order + OutboxEvent (mesma TX)

        ▼ (OutboxPublisher lê e publica)
[Kafka: orders.placed]

        ├──→ [InventoryService] — reserva estoque
        │           │ OK → [Kafka: inventory.reserved]
        │           │ Falha → [Kafka: inventory.failed]

        │    [inventory.reserved]
        │           │
        │           ▼
        │    [PaymentService] — cobra cartão
        │           │ OK → [Kafka: payment.processed]
        │           │ Falha → [Kafka: payment.failed]

        │    [payment.processed]
        │           │
        │           ▼
        └──→ [OrderService] — confirma pedido


             [Kafka: orders.confirmed]

                    ├──→ [NotificationService] — e-mail de confirmação
                    └──→ [ShippingService] — gera etiqueta de envio

COMPENSAÇÕES:
  [inventory.failed] → [OrderService] cancela pedido
  [payment.failed] → [InventoryService] libera reserva
                   → [OrderService] cancela pedido