Observabilidade

Elastic Stack

Referência completa do Elastic Stack: Elasticsearch, Kibana, Logstash e Beats — desde conceitos até produção

Arquitetura do Elastic Stack (ELK)

┌──────────────────────────────────────────────────────────────────┐
│                        ELASTIC STACK                             │
│                                                                  │
│  ┌─────────┐  ┌─────────┐  ┌──────────┐  ┌──────────────────┐  │
│  │Filebeat │  │Metricbt │  │Packetbeat│  │   Logstash       │  │
│  │(logs)   │  │(metrics)│  │(network) │  │ (transformações) │  │
│  └────┬────┘  └────┬────┘  └────┬─────┘  └───────┬──────────┘  │
│       │            │            │                 │             │
│       └────────────┴────────────┴─────────────────┘             │
│                                │                                 │
│                                ▼                                 │
│                    ┌───────────────────────┐                     │
│                    │    Elasticsearch      │                     │
│                    │  (armazenamento +     │                     │
│                    │   busca + análise)    │                     │
│                    └───────────┬───────────┘                     │
│                                │                                 │
│                    ┌───────────▼───────────┐                     │
│                    │       Kibana          │                     │
│                    │  (visualização +      │                     │
│                    │   dashboards + APM)   │                     │
│                    └───────────────────────┘                     │
└──────────────────────────────────────────────────────────────────┘

Componentes

ComponenteFunçãoQuando usar
ElasticsearchArmazenamento, busca e análiseSempre — núcleo do stack
KibanaVisualização, dashboards, Dev ToolsSempre — interface web
LogstashPipeline ETL pesado (filtros complexos)Quando precisar de transformações sofisticadas
FilebeatColeta de logs de arquivos (leve)Coletar logs de arquivos e containers
MetricbeatColeta de métricas do sistemaMonitorar servidores e serviços
PacketbeatAnálise de tráfego de redeMonitorar protocolos de rede
HeartbeatMonitoramento de uptime (ping)Verificar disponibilidade de endpoints
APM ServerRastreamento de aplicaçãoTracing distribuído

Conceitos do Elasticsearch

Estrutura de Dados

Elasticsearch          Equivalente Relacional (analogia)
─────────────────      ──────────────────────────────────
Índice (Index)    →    Tabela (com schema flexível)
Documento (Doc)   →    Linha / Registro
Campo (Field)     →    Coluna
Mapping           →    Schema / DDL
Shard             →    Partição física do índice
Réplica           →    Cópia redundante de shard
Node              →    Servidor do cluster
Cluster           →    Conjunto de nodes

Shards e Réplicas

Índice "logs-2024.01" com 3 primary shards e 1 réplica:

Node 1          Node 2          Node 3
────────────    ────────────    ────────────
P0 (primary)    P1 (primary)    P2 (primary)
R1 (replica)    R2 (replica)    R0 (replica)

P = Primary shard  R = Replica shard

Regras práticas:

  • Número de primary shards é fixo na criação (não pode mudar depois)
  • Réplicas podem ser alteradas a qualquer momento
  • Shard ideal: entre 10 GB e 50 GB
  • Número de shards: total_data_size_gb / 30

Tipos de Nós (ES 8.x)

# Roles dos nós
master:      true   # elege master, gerencia cluster
data:        true   # armazena dados, executa queries
data_hot:    true   # dados quentes (ILM)
data_warm:   true   # dados mornos (ILM)
data_cold:   true   # dados frios (ILM)
ingest:      true   # pré-processa documentos antes de indexar
ml:          true   # machine learning
remote_cluster_client: true  # cross-cluster search

CRUD REST API

Indexar Documentos

# PUT — indexar com ID específico
PUT /produtos/_doc/1
{
  "nome": "Notebook Dell XPS",
  "preco": 8999.90,
  "categoria": "informatica",
  "estoque": 15,
  "created_at": "2024-01-15T10:00:00Z",
  "tags": ["notebook", "dell", "xps"]
}

# POST — indexar com ID gerado automaticamente
POST /produtos/_doc
{
  "nome": "Mouse Logitech MX",
  "preco": 459.90,
  "categoria": "perifericos"
}

# Resposta de indexação
{
  "_index": "produtos",
  "_id": "1",
  "_version": 1,
  "result": "created",
  "_shards": { "total": 2, "successful": 1, "failed": 0 },
  "_seq_no": 0,
  "_primary_term": 1
}

# Indexação em lote (Bulk API — muito mais eficiente)
POST /_bulk
{ "index": { "_index": "produtos", "_id": "2" } }
{ "nome": "Teclado Mecânico", "preco": 350.00, "categoria": "perifericos" }
{ "index": { "_index": "produtos", "_id": "3" } }
{ "nome": "Monitor 4K", "preco": 2500.00, "categoria": "monitores" }
{ "delete": { "_index": "produtos", "_id": "99" } }
{ "update": { "_index": "produtos", "_id": "1" } }
{ "doc": { "estoque": 10 } }

Obter Documentos

# GET por ID
GET /produtos/_doc/1

# GET com campos específicos (source filtering)
GET /produtos/_doc/1?_source=nome,preco

# GET múltiplos (mget)
GET /_mget
{
  "docs": [
    { "_index": "produtos", "_id": "1" },
    { "_index": "produtos", "_id": "2", "_source": ["nome"] }
  ]
}

# Verificar se existe (HEAD — sem retornar o documento)
HEAD /produtos/_doc/1
# 200 = existe, 404 = não existe

Atualizar Documentos

# Update parcial (somente campos especificados)
POST /produtos/_update/1
{
  "doc": {
    "preco": 8499.90,
    "estoque": 12
  }
}

# Update com script (lógica mais complexa)
POST /produtos/_update/1
{
  "script": {
    "source": "ctx._source.estoque -= params.quantidade",
    "lang": "painless",
    "params": {
      "quantidade": 3
    }
  }
}

# Upsert — criar se não existir, atualizar se existir
POST /produtos/_update/99
{
  "doc": { "preco": 100 },
  "upsert": { "nome": "Produto Novo", "preco": 100 }
}

# Update by query — atualizar múltiplos documentos
POST /produtos/_update_by_query
{
  "query": {
    "term": { "categoria": "informatica" }
  },
  "script": {
    "source": "ctx._source.desconto = 0.1"
  }
}

Deletar Documentos

# DELETE por ID
DELETE /produtos/_doc/1

# Delete by query — deletar múltiplos
POST /produtos/_delete_by_query
{
  "query": {
    "range": {
      "created_at": {
        "lt": "now-1y"    # mais antigos que 1 ano
      }
    }
  }
}

# Deletar índice inteiro
DELETE /logs-2023.*
DELETE /produtos

# Deletar com wildcard
DELETE /logs-2023.01.*

Query DSL

Contexto: Query vs Filter

// Query context: calcula relevância (_score)
// Filter context: SIM/NÃO, sem score, cacheável

{
  "query": {
    "bool": {
      "must": [            // query context — afeta relevância
        { "match": { "title": "elasticsearch" } }
      ],
      "filter": [          // filter context — NÃO afeta relevância, mais rápido
        { "term": { "status": "published" } },
        { "range": { "date": { "gte": "2024-01-01" } } }
      ]
    }
  }
}

Match Query (full-text)

// match — busca full-text com análise (tokenização, stemming, etc.)
{
  "query": {
    "match": {
      "descricao": {
        "query": "notebook dell xps",
        "operator": "and",        // todos os termos devem estar presentes
        "fuzziness": "AUTO",      // permite erros de digitação
        "minimum_should_match": "75%"  // ou porcentagem mínima
      }
    }
  }
}

// match_phrase — frase exata, preserva ordem
{
  "query": {
    "match_phrase": {
      "descricao": {
        "query": "memória RAM DDR4",
        "slop": 2    // permite até 2 palavras entre os termos
      }
    }
  }
}

// multi_match — buscar em múltiplos campos
{
  "query": {
    "multi_match": {
      "query": "notebook performático",
      "fields": ["titulo^3", "descricao", "tags"],  // ^3 = boost no título
      "type": "best_fields",   // best_fields | most_fields | cross_fields | phrase
      "fuzziness": "AUTO"
    }
  }
}

Term e Terms Query (exact match)

// term — match exato (não passa pelo analyzer) — use para keyword fields
{
  "query": {
    "term": {
      "status.keyword": {
        "value": "ativo",
        "boost": 1.5
      }
    }
  }
}

// terms — match em lista de valores
{
  "query": {
    "terms": {
      "categoria.keyword": ["informatica", "perifericos", "monitores"],
      "boost": 1.0
    }
  }
}

// terms lookup — buscar valores de outro documento
{
  "query": {
    "terms": {
      "id_produto": {
        "index": "carrinho_usuario",
        "id": "usuario_123",
        "path": "produtos"
      }
    }
  }
}

Range Query

// Numérico
{
  "query": {
    "range": {
      "preco": {
        "gte": 100,       // >=
        "lte": 1000,      // <=
        "gt": 50,         // >
        "lt": 2000        // <
      }
    }
  }
}

// Data/hora
{
  "query": {
    "range": {
      "created_at": {
        "gte": "2024-01-01",
        "lte": "2024-12-31",
        "format": "yyyy-MM-dd",
        "time_zone": "America/Sao_Paulo"
      }
    }
  }
}

// Data relativa (math expressions)
{
  "query": {
    "range": {
      "@timestamp": {
        "gte": "now-7d/d",    // 7 dias atrás, arredondado ao dia
        "lte": "now/d"        // hoje, arredondado ao dia
      }
    }
  }
}

Bool Query

// A query mais importante — combina múltiplas condições
{
  "query": {
    "bool": {
      // must: AND — todos devem ser verdadeiros, afeta _score
      "must": [
        { "match": { "nome": "notebook" } },
        { "term": { "disponivel": true } }
      ],

      // filter: AND — todos devem ser verdadeiros, NÃO afeta _score (mais rápido)
      "filter": [
        { "range": { "preco": { "lte": 5000 } } },
        { "term": { "categoria.keyword": "informatica" } }
      ],

      // should: OR — ao menos um deve ser verdadeiro (ou boost de score)
      // minimum_should_match controla quantos são obrigatórios
      "should": [
        { "term": { "marca.keyword": "Dell" } },
        { "term": { "marca.keyword": "Apple" } },
        { "match": { "tags": "premium" } }
      ],
      "minimum_should_match": 1,

      // must_not: AND NOT — não deve ser verdadeiro
      "must_not": [
        { "term": { "status.keyword": "descontinuado" } },
        { "range": { "estoque": { "lte": 0 } } }
      ],

      // Boost de relevância
      "boost": 1.5
    }
  }
}

Nested e Other Queries

// exists — campo tem valor (não null, não ausente)
{ "query": { "exists": { "field": "email" } } }

// wildcard — padrão com * e ?
{
  "query": {
    "wildcard": {
      "email": {
        "value": "*@empresa.com",
        "case_insensitive": true
      }
    }
  }
}

// fuzzy — tolerância a erros de digitação
{
  "query": {
    "fuzzy": {
      "nome": {
        "value": "elsticsearch",    // typo
        "fuzziness": 2,             // distância de edição máxima
        "prefix_length": 3          // primeiros 3 chars devem ser exatos
      }
    }
  }
}

// ids — buscar por IDs
{
  "query": {
    "ids": { "values": ["1", "2", "3"] }
  }
}

// nested — buscar em objetos aninhados
{
  "query": {
    "nested": {
      "path": "itens",
      "query": {
        "bool": {
          "must": [
            { "term": { "itens.produto_id": "123" } },
            { "range": { "itens.quantidade": { "gte": 2 } } }
          ]
        }
      },
      "inner_hits": {}    // retornar os nested docs que fizeram match
    }
  }
}

Sorting e Pagination

{
  "query": { "match_all": {} },

  // Ordenação
  "sort": [
    { "preco": { "order": "asc" } },
    { "_score": { "order": "desc" } },
    { "nome.keyword": "asc" },
    {
      "_geo_distance": {
        "localizacao": { "lat": -23.5505, "lon": -46.6333 },
        "order": "asc",
        "unit": "km"
      }
    }
  ],

  // Paginação clássica (evitar para offset > 10000)
  "from": 0,
  "size": 20,

  // Source filtering
  "_source": ["nome", "preco", "categoria"],
  // ou exclusão:
  "_source": { "excludes": ["descricao_longa", "html_content"] },

  // Highlighting
  "highlight": {
    "fields": {
      "nome": {},
      "descricao": {
        "pre_tags": ["<strong>"],
        "post_tags": ["</strong>"],
        "fragment_size": 150
      }
    }
  }
}

Search After (paginação profunda)

// Primeira página
{
  "size": 20,
  "sort": [
    { "created_at": "desc" },
    { "_id": "asc" }           // desempate consistente
  ]
}

// Próximas páginas — usar search_after com valores da última linha
{
  "size": 20,
  "sort": [
    { "created_at": "desc" },
    { "_id": "asc" }
  ],
  "search_after": ["2024-01-15T10:00:00Z", "abc123"]   // valores da última linha anterior
}

Aggregations

Bucket Aggregations

// terms — agrupar por valor de campo
{
  "aggs": {
    "por_categoria": {
      "terms": {
        "field": "categoria.keyword",
        "size": 10,             // top 10 categorias
        "order": { "_count": "desc" },
        "min_doc_count": 5      // só incluir se tem ao menos 5 docs
      }
    }
  }
}

// date_histogram — agrupar por intervalo de tempo
{
  "aggs": {
    "por_dia": {
      "date_histogram": {
        "field": "@timestamp",
        "calendar_interval": "1d",    // day | week | month | quarter | year
        // ou fixed_interval para intervalos fixos:
        "fixed_interval": "6h",
        "time_zone": "America/Sao_Paulo",
        "format": "yyyy-MM-dd",
        "min_doc_count": 0,           // incluir buckets vazios
        "extended_bounds": {
          "min": "2024-01-01",
          "max": "2024-01-31"
        }
      }
    }
  }
}

// histogram — agrupar por intervalo numérico
{
  "aggs": {
    "por_faixa_preco": {
      "histogram": {
        "field": "preco",
        "interval": 500,     // buckets: 0-500, 500-1000, 1000-1500, ...
        "min_doc_count": 0
      }
    }
  }
}

// range — buckets com ranges específicos
{
  "aggs": {
    "faixas_preco": {
      "range": {
        "field": "preco",
        "ranges": [
          { "key": "barato", "to": 100 },
          { "key": "médio", "from": 100, "to": 500 },
          { "key": "premium", "from": 500 }
        ]
      }
    }
  }
}

// filter — bucket com condição específica
{
  "aggs": {
    "apenas_eletronicos": {
      "filter": {
        "term": { "categoria.keyword": "eletronicos" }
      },
      "aggs": {
        "preco_medio": { "avg": { "field": "preco" } }
      }
    }
  }
}

Metric Aggregations

{
  "aggs": {
    // Estatísticas básicas
    "preco_medio": { "avg": { "field": "preco" } },
    "total_vendas": { "sum": { "field": "valor_venda" } },
    "preco_min": { "min": { "field": "preco" } },
    "preco_max": { "max": { "field": "preco" } },
    "total_produtos": { "value_count": { "field": "preco" } },
    "produtos_unicos": { "cardinality": { "field": "produto_id" } },

    // Estatísticas em uma única agg
    "stats_preco": {
      "stats": { "field": "preco" }
      // retorna: count, min, max, avg, sum
    },

    // Estatísticas estendidas
    "stats_ext_preco": {
      "extended_stats": { "field": "preco" }
      // retorna: + sum_of_squares, variance, std_deviation, std_deviation_bounds
    },

    // Percentis
    "percentis_latencia": {
      "percentiles": {
        "field": "latencia_ms",
        "percents": [50, 75, 90, 95, 99, 99.9]
      }
    },

    // Percentil ranks (inverso — qual % de docs tem valor <= X)
    "rank_latencia": {
      "percentile_ranks": {
        "field": "latencia_ms",
        "values": [100, 200, 500]   // % de requests com latência <= 100ms, 200ms, 500ms
      }
    }
  }
}

Top Hits e Pipeline Aggregations

{
  "aggs": {
    "por_usuario": {
      "terms": { "field": "usuario_id" },
      "aggs": {
        // top_hits — documentos de exemplo de cada bucket
        "ultimo_acesso": {
          "top_hits": {
            "size": 1,
            "sort": [{ "@timestamp": "desc" }],
            "_source": ["@timestamp", "acao", "ip"]
          }
        }
      }
    },

    // Pipeline aggregations — operam em resultados de outras aggs
    "vendas_por_mes": {
      "date_histogram": {
        "field": "data_venda",
        "calendar_interval": "1M"
      },
      "aggs": {
        "total_mes": { "sum": { "field": "valor" } },
        // moving_avg — média móvel
        "media_movel": {
          "moving_avg": {
            "buckets_path": "total_mes",
            "window": 3
          }
        },
        // derivative — taxa de variação
        "variacao": {
          "derivative": { "buckets_path": "total_mes" }
        }
      }
    },

    // cumulative_sum — soma acumulada
    "acumulado": {
      "cumulative_sum": { "buckets_path": "vendas_por_mes>total_mes" }
    }
  }
}

Aggs Aninhadas (exemplo real: relatório)

// "Qual a receita total e ticket médio por categoria, nos últimos 30 dias?"
{
  "query": {
    "range": {
      "data_venda": { "gte": "now-30d" }
    }
  },
  "aggs": {
    "por_categoria": {
      "terms": {
        "field": "categoria.keyword",
        "size": 20
      },
      "aggs": {
        "receita_total": { "sum": { "field": "valor" } },
        "ticket_medio": { "avg": { "field": "valor" } },
        "total_pedidos": { "value_count": { "field": "pedido_id" } },
        "produtos_top": {
          "terms": {
            "field": "produto.keyword",
            "size": 5
          },
          "aggs": {
            "receita": { "sum": { "field": "valor" } }
          }
        }
      }
    }
  },
  "size": 0   // não retornar documentos, apenas aggs
}

Mappings

Tipos de Campo

PUT /produtos
{
  "mappings": {
    "properties": {
      // Text — full-text search (passa pelo analyzer)
      "descricao": {
        "type": "text",
        "analyzer": "portuguese",
        "fields": {
          "keyword": {
            "type": "keyword",    // versão não-analisada para sort/aggs
            "ignore_above": 256
          }
        }
      },

      // Keyword — exact match, sort, aggs (não passa pelo analyzer)
      "sku": { "type": "keyword" },
      "categoria": { "type": "keyword" },
      "status": { "type": "keyword" },

      // Numéricos
      "preco": { "type": "double" },
      "estoque": { "type": "integer" },
      "codigo": { "type": "long" },
      "desconto": { "type": "float" },

      // Boolean
      "disponivel": { "type": "boolean" },

      // Data/hora
      "created_at": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
      },

      // Nested — array de objetos onde cada objeto é independente
      "avaliacoes": {
        "type": "nested",
        "properties": {
          "usuario": { "type": "keyword" },
          "nota": { "type": "integer" },
          "comentario": { "type": "text", "analyzer": "portuguese" },
          "data": { "type": "date" }
        }
      },

      // Geo point — coordenadas geográficas
      "localizacao": { "type": "geo_point" },

      // IP address
      "client_ip": { "type": "ip" },

      // Binary — base64 encoded
      "thumbnail": { "type": "binary" },

      // Dense vector — para busca semântica/kNN
      "embedding": {
        "type": "dense_vector",
        "dims": 1536,
        "index": true,
        "similarity": "cosine"
      }
    }
  }
}

Analyzers

PUT /artigos
{
  "settings": {
    "analysis": {
      "analyzer": {
        // Analyzer customizado para português
        "portuguese_custom": {
          "type": "custom",
          "tokenizer": "standard",
          "filter": [
            "lowercase",
            "portuguese_stop",       // stopwords em português
            "portuguese_stemmer",    // radical das palavras
            "asciifolding"           // acentos → sem acento
          ]
        },
        // Analyzer para autocomplete
        "autocomplete": {
          "type": "custom",
          "tokenizer": "standard",
          "filter": ["lowercase", "autocomplete_filter"]
        },
        "autocomplete_search": {
          "type": "custom",
          "tokenizer": "standard",
          "filter": ["lowercase"]
        }
      },
      "filter": {
        "portuguese_stop": {
          "type": "stop",
          "stopwords": "_portuguese_"
        },
        "portuguese_stemmer": {
          "type": "stemmer",
          "language": "portuguese"
        },
        "autocomplete_filter": {
          "type": "edge_ngram",
          "min_gram": 2,
          "max_gram": 20
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "titulo": {
        "type": "text",
        "analyzer": "portuguese_custom"
      },
      "nome": {
        "type": "text",
        "analyzer": "autocomplete",
        "search_analyzer": "autocomplete_search"
      }
    }
  }
}

// Testar analyzer
POST /_analyze
{
  "analyzer": "portuguese",
  "text": "Elasticsearch é uma ferramenta poderosa para buscas"
}

Index Lifecycle Management (ILM)

// Criar política ILM
PUT /_ilm/policy/logs-policy
{
  "policy": {
    "phases": {
      // HOT — dados ativos, muitas gravações e leituras
      "hot": {
        "min_age": "0ms",
        "actions": {
          "rollover": {
            "max_primary_shard_size": "50gb",
            "max_age": "1d"          // criar novo índice após 1 dia
          },
          "set_priority": { "priority": 100 }
        }
      },

      // WARM — dados menos acessados, otimizar para leitura
      "warm": {
        "min_age": "7d",              // após 7 dias no hot
        "actions": {
          "shrink": { "number_of_shards": 1 },  // reduzir shards
          "forcemerge": { "max_num_segments": 1 },  // otimizar segmentos
          "set_priority": { "priority": 50 }
        }
      },

      // COLD — dados raros, acesso eventual
      "cold": {
        "min_age": "30d",
        "actions": {
          "freeze": {},               // libera memória heap
          "set_priority": { "priority": 0 }
        }
      },

      // DELETE — expirar dados antigos
      "delete": {
        "min_age": "90d",
        "actions": {
          "delete": {
            "delete_searchable_snapshot": true
          }
        }
      }
    }
  }
}

// Index template com ILM
PUT /_index_template/logs-template
{
  "index_patterns": ["logs-*"],
  "template": {
    "settings": {
      "number_of_shards": 2,
      "number_of_replicas": 1,
      "index.lifecycle.name": "logs-policy",
      "index.lifecycle.rollover_alias": "logs-current"
    },
    "mappings": {
      "properties": {
        "@timestamp": { "type": "date" },
        "level": { "type": "keyword" },
        "message": { "type": "text" },
        "service": { "type": "keyword" }
      }
    }
  },
  "priority": 100,
  "data_stream": {}    // usar data streams (ES 7.9+)
}

Filebeat

Configuração Básica

# filebeat.yml

filebeat.inputs:
  # Arquivos de log simples
  - type: log
    enabled: true
    paths:
      - /var/log/app/*.log
      - /var/log/nginx/access.log
    fields:
      service: api-backend
      environment: production
    fields_under_root: true
    multiline:
      # Logs multi-linha (ex: Java stack traces)
      pattern: '^[[:space:]]'
      negate: false
      match: after
      max_lines: 100
      timeout: 5s

  # Logs de containers Docker
  - type: container
    paths:
      - /var/lib/docker/containers/*/*.log
    processors:
      - add_docker_metadata:
          host: "unix:///var/run/docker.sock"

  # Logs do Kubernetes
  - type: filestream
    id: kubernetes-pods
    paths:
      - /var/log/pods/*/*.log
    parsers:
      - container:
          stream: all
          format: auto
    prospector.scanner.symlinks: true
    processors:
      - add_kubernetes_metadata:
          host: ${NODE_NAME}
          matchers:
            - logs_path:
                logs_path: "/var/log/pods/"

# Processors globais
processors:
  # Remover campos desnecessários
  - drop_fields:
      fields: ["host.containerized", "host.os.codename"]
      ignore_missing: true

  # Renomear campos
  - rename:
      fields:
        - from: "log.level"
          to: "level"

  # Adicionar geo IP (requer GeoLite2 database)
  - geoip:
      input_field: client.ip
      target_field: client.geo

# Output para Elasticsearch diretamente
output.elasticsearch:
  hosts: ["elasticsearch:9200"]
  username: "filebeat_user"
  password: "${ES_PASSWORD}"
  # Usar data streams
  index: "logs-%{[fields.service]}-%{+yyyy.MM.dd}"
  # ILM habilitado
  ilm.enabled: auto
  ilm.rollover_alias: "filebeat-logs"
  ilm.pattern: "{now/d}-000001"
  # Pipeline de ingestão
  pipeline: "filebeat-log-pipeline"

# Output para Logstash (recomendado para processamento pesado)
# output.logstash:
#   hosts: ["logstash:5044"]
#   loadbalance: true

# Monitoramento do Filebeat
monitoring.enabled: true
monitoring.elasticsearch:
  hosts: ["elasticsearch:9200"]

logging.level: warning
logging.to_files: true
logging.files:
  path: /var/log/filebeat
  name: filebeat
  keepfiles: 7

Filebeat para Spring Boot (JSON logs)

filebeat.inputs:
  - type: log
    paths:
      - /var/log/spring-boot/*.json
    json.keys_under_root: true      # campos JSON no nível raiz
    json.add_error_key: true        # campo json.error se parse falhar
    json.message_key: message       # campo principal da linha de log
    fields:
      service: api-backend
    fields_under_root: true

Logstash

Pipeline Básico

# logstash.conf

# INPUT — como os dados chegam ao Logstash
input {
  # Receber do Filebeat
  beats {
    port => 5044
    ssl => true
    ssl_certificate => "/etc/logstash/certs/logstash.crt"
    ssl_key => "/etc/logstash/certs/logstash.key"
  }

  # Receber via TCP (logs simples)
  tcp {
    port => 5000
    codec => json_lines
  }

  # Ler de arquivo diretamente
  file {
    path => "/var/log/app/*.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"   # reprocessar sempre
  }
}

# FILTER — transformar e enriquecer os dados
filter {
  # Parsear logs com grok (regex com nomes)
  if [type] == "nginx" {
    grok {
      match => {
        "message" => '%{IPORHOST:client_ip} - %{DATA:user} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{NUMBER:response_code} %{NUMBER:bytes_sent} "%{DATA:referrer}" "%{DATA:user_agent}"'
      }
      tag_on_failure => ["_grokparsefailure"]
    }
  }

  # Parsear JSON
  if [type] == "spring-boot" {
    json {
      source => "message"
      target => "log"
      remove_field => ["message"]
    }
  }

  # Converter timestamp
  date {
    match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z", "ISO8601"]
    target => "@timestamp"
    timezone => "America/Sao_Paulo"
    remove_field => ["timestamp"]
  }

  # Modificar campos
  mutate {
    # Renomear
    rename => { "response_code" => "http.response.status_code" }

    # Converter tipos
    convert => {
      "http.response.status_code" => "integer"
      "bytes_sent" => "integer"
    }

    # Adicionar campos
    add_field => {
      "environment" => "production"
      "datacenter" => "us-east-1"
    }

    # Remover campos
    remove_field => ["host", "beat", "input", "@version"]

    # Uppercase/lowercase
    uppercase => ["method"]
    lowercase => ["level"]
  }

  # GeoIP lookup
  geoip {
    source => "client_ip"
    target => "geoip"
    database => "/usr/share/logstash/GeoLite2-City.mmdb"
    fields => ["city_name", "country_code2", "location"]
  }

  # User agent parsing
  useragent {
    source => "user_agent"
    target => "ua"
  }

  # Condicional para enriquecer por tipo de log
  if [http.response.status_code] >= 400 {
    mutate {
      add_tag => ["error"]
    }
  }

  if [http.response.status_code] >= 500 {
    mutate {
      add_tag => ["critical"]
    }
  }

  # Drop de logs desnecessários
  if [request] =~ "/health" or [request] =~ "/metrics" {
    drop {}
  }

  # Fingerprint para deduplicação
  fingerprint {
    source => ["client_ip", "request", "@timestamp"]
    target => "[@metadata][fingerprint]"
    method => "SHA1"
  }
}

# OUTPUT — destino dos dados processados
output {
  # Elasticsearch
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    user => "logstash_writer"
    password => "${ES_PASSWORD}"

    # Nome do índice dinâmico
    index => "logs-%{[service]}-%{+YYYY.MM.dd}"

    # Usar o fingerprint como ID (deduplicação)
    document_id => "%{[@metadata][fingerprint]}"

    # ILM
    ilm_enabled => true
    ilm_rollover_alias => "logs-nginx"
    ilm_pattern => "{now/d}-000001"
    ilm_policy => "logs-policy"

    # Pipeline de ingestão no ES
    pipeline => "logs-enrichment"

    # Retry
    retry_max_interval => 64
    retry_initial_interval => 2
  }

  # Debug (apenas desenvolvimento)
  if "debug" in [tags] {
    stdout { codec => rubydebug }
  }
}

Múltiplos Pipelines

# config/pipelines.yml
- pipeline.id: nginx-logs
  path.config: "/etc/logstash/conf.d/nginx.conf"
  pipeline.workers: 2
  pipeline.batch.size: 500

- pipeline.id: app-logs
  path.config: "/etc/logstash/conf.d/app.conf"
  pipeline.workers: 4
  pipeline.batch.size: 1000
  queue.type: persisted    # fila persistente no disco
  queue.max_bytes: 1gb

Kibana

Dev Tools / Console

# Atalhos úteis no Dev Tools
Ctrl+Enter       # executar query selecionada
Ctrl+/           # comentar/descomentar linha
Ctrl+Space       # autocomplete

# Formato geral das queries no console
METODO /endpoint
{
  "corpo": "da query"
}
// Verificar saúde do cluster
GET /_cluster/health?pretty

// Informações do cluster
GET /_cluster/stats

// Listar índices (com tamanho e contagem)
GET /_cat/indices?v&s=index

// Informações de shards
GET /_cat/shards?v

// Aliases
GET /_cat/aliases?v

// Nodes
GET /_cat/nodes?v&h=name,ip,heap.percent,ram.percent,cpu,load_1m,disk.used_percent

// Verificar mapeamento de índice
GET /produtos/_mapping

// Verificar configurações de índice
GET /produtos/_settings

// Estatísticas de um índice
GET /produtos/_stats

// Explicar por que um documento não aparece em uma query
GET /produtos/_explain/1
{
  "query": {
    "term": { "categoria.keyword": "informatica" }
  }
}

Discover (Exploração de Logs)

Data View (Index Pattern):

  • Criar em: Management → Stack Management → Data Views
  • Pattern: logs-* para múltiplos índices
  • Time field: @timestamp

KQL — Kibana Query Language:

# Exemplos de KQL
level: ERROR
level: ERROR AND service: api-backend
response.status_code >= 500
message: "connection refused"
request: /api/*
tags: (error OR critical)
NOT tags: healthcheck
@timestamp >= "2024-01-15"

# Com wildcards
service: api-*
message: "timeout*"

ECS — Elastic Common Schema

ECS padroniza os nomes dos campos entre diferentes tipos de dados.

// Campos ECS obrigatórios/recomendados
{
  "@timestamp": "2024-01-15T14:30:00.000Z",  // timestamp do evento

  "ecs": { "version": "8.11.0" },

  "event": {
    "kind": "event",           // event | alert | metric | state
    "category": ["web"],       // web | database | network | process | ...
    "type": ["access"],        // access | error | info | creation | ...
    "outcome": "success",      // success | failure | unknown
    "duration": 45000000,      // nanosegundos
    "module": "nginx",
    "dataset": "nginx.access"
  },

  "host": {
    "name": "api-server-01",
    "hostname": "api-server-01.prod",
    "ip": ["10.0.1.10"],
    "os": { "name": "Ubuntu", "version": "22.04" }
  },

  "service": {
    "name": "api-backend",
    "version": "1.8.13",
    "environment": "production",
    "type": "http"
  },

  "http": {
    "request": {
      "method": "GET",
      "url": { "path": "/api/users", "full": "https://api.empresa.com/api/users" },
      "bytes": 256
    },
    "response": {
      "status_code": 200,
      "bytes": 4096
    }
  },

  "client": {
    "ip": "200.100.50.25",
    "geo": {
      "city_name": "São Paulo",
      "country_iso_code": "BR",
      "location": { "lat": -23.5505, "lon": -46.6333 }
    }
  },

  "user": {
    "id": "user_123",
    "name": "joao.silva",
    "email": "joao@empresa.com",
    "roles": ["admin"]
  },

  "trace": { "id": "abc123def456" },
  "transaction": { "id": "789xyz" },
  "span": { "id": "span123" },

  "log": {
    "level": "info",
    "logger": "com.empresa.api.UserController"
  },

  "error": {
    "type": "java.lang.NullPointerException",
    "message": "Cannot invoke method on null object",
    "stack_trace": "java.lang.NullPointerException\n\tat com.empresa..."
  }
}

Segurança

# elasticsearch.yml — habilitar segurança (ES 8.x por padrão)
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: elastic-certificates.p12
xpack.security.http.ssl.enabled: true
xpack.security.http.ssl.keystore.path: http.p12
# Gerar certificados
./bin/elasticsearch-certutil ca
./bin/elasticsearch-certutil cert --ca elastic-stack-ca.p12

# Configurar senha do usuário elastic
./bin/elasticsearch-setup-passwords interactive

# Criar usuário com roles
POST /_security/user/grafana_reader
{
  "password": "senha-segura-aqui",
  "roles": ["monitoring_user", "kibana_user"],
  "full_name": "Grafana Reader",
  "email": "grafana@empresa.com"
}

# Criar role customizada
POST /_security/role/app_reader
{
  "indices": [
    {
      "names": ["logs-*", "metrics-*"],
      "privileges": ["read", "view_index_metadata"],
      "query": {
        "term": { "environment": "production" }   // filtro por documento
      },
      "field_security": {
        "grant": ["@timestamp", "level", "message", "service"],
        "except": ["user.password"]               // nunca retornar esses campos
      }
    }
  ],
  "cluster": ["monitor"]
}

# Criar API key
POST /_security/api_key
{
  "name": "filebeat-key",
  "role_descriptors": {
    "filebeat_writer": {
      "indices": [
        {
          "names": ["logs-*"],
          "privileges": ["create_index", "create", "write"]
        }
      ]
    }
  },
  "expiration": "365d"
}

Elastic com Spring Boot

<!-- pom.xml -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
# application.yml
spring:
  elasticsearch:
    uris: https://elasticsearch:9200
    username: app_user
    password: ${ES_PASSWORD}
    socket-timeout: 10s
    connection-timeout: 5s
// Entidade mapeada para Elasticsearch
@Document(indexName = "produtos", createIndex = true)
@Setting(settingPath = "elasticsearch/settings.json")
@Mapping(mappingPath = "elasticsearch/mappings.json")
public class Produto {

    @Id
    private String id;

    @Field(type = FieldType.Text, analyzer = "portuguese", searchAnalyzer = "portuguese")
    private String nome;

    @MultiField(
        mainField = @Field(type = FieldType.Text, analyzer = "portuguese"),
        otherFields = @InnerField(suffix = "keyword", type = FieldType.Keyword)
    )
    private String categoria;

    @Field(type = FieldType.Double)
    private BigDecimal preco;

    @Field(type = FieldType.Date, format = DateFormat.date_hour_minute_second)
    private LocalDateTime criadoEm;

    @Field(type = FieldType.Nested)
    private List<Avaliacao> avaliacoes;

    @Field(type = FieldType.Keyword, index = false)
    private List<String> tags;
}

// Repository com queries customizadas
public interface ProdutoRepository extends ElasticsearchRepository<Produto, String> {

    // Query derivada
    List<Produto> findByCategoria(String categoria);
    List<Produto> findByPrecoBetween(BigDecimal min, BigDecimal max);

    // Query com @Query (Query DSL)
    @Query("""
        {
          "bool": {
            "must": [
              { "match": { "nome": "?0" } }
            ],
            "filter": [
              { "term": { "categoria.keyword": "?1" } },
              { "range": { "preco": { "lte": ?2 } } }
            ]
          }
        }
        """)
    List<Produto> buscarPorNomeCategoriaEPrecoMaximo(String nome, String categoria, BigDecimal precoMax);
}

// Uso do ElasticsearchOperations para queries mais complexas
@Service
public class ProdutoSearchService {

    private final ElasticsearchOperations operations;

    public SearchPage<Produto> buscarComFacetas(String termo, Pageable pageable) {
        Query query = NativeQuery.builder()
            .withQuery(q -> q
                .bool(b -> b
                    .must(m -> m
                        .multiMatch(mm -> mm
                            .query(termo)
                            .fields("nome^3", "descricao", "tags")
                            .fuzziness(Fuzziness.AUTO)
                        )
                    )
                    .filter(f -> f
                        .term(t -> t
                            .field("disponivel")
                            .value(true)
                        )
                    )
                )
            )
            .withAggregation("por_categoria",
                Aggregation.of(a -> a
                    .terms(t -> t.field("categoria.keyword").size(10))
                )
            )
            .withPageable(pageable)
            .withHighlightQuery(
                HighlightQuery.of(hq -> hq
                    .withHighlightFields(Map.of(
                        "nome", new HighlightFieldParameters.HighlightFieldParametersBuilder().build(),
                        "descricao", new HighlightFieldParameters.HighlightFieldParametersBuilder()
                            .withFragmentSize(200).build()
                    ))
                ),
                Produto.class
            )
            .build();

        return operations.searchForPage(query, Produto.class);
    }
}

Elastic com Python

# pip install elasticsearch==8.11.0

from elasticsearch import Elasticsearch, helpers
from datetime import datetime
import json

# Conectar
es = Elasticsearch(
    ["https://elasticsearch:9200"],
    basic_auth=("user", "password"),
    verify_certs=True,
    ca_certs="/path/to/ca.crt",
    # Ou com API key:
    # api_key=("key_id", "api_key_value")
)

# Verificar conexão
print(es.info())

# Indexar documento
doc = {
    "produto": "Notebook Dell XPS",
    "preco": 8999.90,
    "disponivel": True,
    "@timestamp": datetime.now()
}
es.index(index="produtos", id="1", document=doc)

# Buscar documentos
resp = es.search(
    index="logs-*",
    body={
        "query": {
            "bool": {
                "must": [
                    {"match": {"message": "ERROR"}}
                ],
                "filter": [
                    {"term": {"service.keyword": "api-backend"}},
                    {"range": {"@timestamp": {"gte": "now-1h"}}}
                ]
            }
        },
        "aggs": {
            "por_nivel": {
                "terms": {"field": "level.keyword", "size": 10}
            }
        },
        "size": 100,
        "sort": [{"@timestamp": "desc"}]
    }
)

# Iterar hits
for hit in resp["hits"]["hits"]:
    print(hit["_source"])

# Total de resultados
print(f"Total: {resp['hits']['total']['value']}")

# Acessar aggregations
for bucket in resp["aggregations"]["por_nivel"]["buckets"]:
    print(f"{bucket['key']}: {bucket['doc_count']}")

# Bulk indexing (muito mais eficiente para grandes volumes)
def gerar_documentos(dados):
    for item in dados:
        yield {
            "_index": f"logs-{datetime.now().strftime('%Y.%m.%d')}",
            "_source": {
                "@timestamp": datetime.now().isoformat(),
                **item
            }
        }

dados = [
    {"level": "INFO", "message": "Request processada", "service": "api"},
    {"level": "ERROR", "message": "Timeout na query", "service": "api"},
]

success, errors = helpers.bulk(
    es,
    gerar_documentos(dados),
    chunk_size=1000,      # enviar em lotes de 1000
    request_timeout=30
)
print(f"Indexados: {success}, Erros: {len(errors)}")

# Scroll para grandes volumes de dados
def scroll_all(es, index, query, page_size=1000):
    resp = es.search(
        index=index,
        body=query,
        scroll="2m",      # manter contexto por 2 minutos
        size=page_size
    )
    scroll_id = resp["_scroll_id"]

    while True:
        hits = resp["hits"]["hits"]
        if not hits:
            break
        yield from hits

        resp = es.scroll(scroll_id=scroll_id, scroll="2m")

    es.clear_scroll(scroll_id=scroll_id)

# Usar scroll
for doc in scroll_all(es, "logs-*", {"query": {"match_all": {}}}, page_size=500):
    process(doc["_source"])

Referência Rápida

Comandos _cat API

# Saúde do cluster
GET /_cat/health?v

# Índices
GET /_cat/indices?v&s=store.size:desc

# Shards com detalhes
GET /_cat/shards?v&h=index,shard,prirep,state,docs,store,node

# Nodes
GET /_cat/nodes?v&h=name,ip,cpu,heap.percent,disk.used_percent

# Templates
GET /_cat/templates?v

# Aliases
GET /_cat/aliases?v

# Tarefas em execução
GET /_cat/tasks?v

# Segmentos (para debug de performance)
GET /_cat/segments?v

Operações de Cluster

# Alterar número de réplicas de um índice
PUT /meu-indice/_settings
{
  "number_of_replicas": 0
}

# Refresh manual (tornar documentos pesquisáveis imediatamente)
POST /meu-indice/_refresh

# Flush (forçar escrita no disco)
POST /meu-indice/_flush

# Force merge (reduzir segmentos — use com cuidado em produção)
POST /meu-indice/_forcemerge?max_num_segments=1

# Reindex (copiar dados entre índices)
POST /_reindex
{
  "source": {
    "index": "logs-old",
    "query": { "range": { "@timestamp": { "gte": "2024-01-01" } } }
  },
  "dest": { "index": "logs-new" }
}

# Aliases — swap atômico (zero downtime)
POST /_aliases
{
  "actions": [
    { "remove": { "index": "logs-v1", "alias": "logs" } },
    { "add": { "index": "logs-v2", "alias": "logs" } }
  ]
}

Métricas de Performance

# Verificar slow queries (threshold: 2s)
PUT /meu-indice/_settings
{
  "index.search.slowlog.threshold.query.warn": "2s",
  "index.search.slowlog.threshold.query.info": "1s",
  "index.indexing.slowlog.threshold.index.warn": "5s"
}

# Profile de query (para otimização)
GET /produtos/_search
{
  "profile": true,
  "query": { "match": { "nome": "notebook" } }
}

# Thread pool status
GET /_nodes/stats/thread_pool

# Hot threads (debug de CPU alta)
GET /_nodes/hot_threads