Data

Apache Spark

Referência completa de Apache Spark — RDD, DataFrame, Spark SQL, streaming, otimização, PySpark e integração com Delta Lake

Fundamentos Spark

Arquitetura: Driver (orquestra), Executors (processam dados), Cluster Manager (YARN/k8s/Standalone).

ConceitoDescrição
DriverJVM principal; mantém SparkContext, envia tasks
ExecutorProcesso em cada worker node; memória + CPU
SparkContextPonto de entrada legado (RDD)
SparkSessionPonto de entrada moderno (DataFrame, SQL)

Modos de execução

# Local — tudo no mesmo processo
spark = SparkSession.builder.master("local[*]").getOrCreate()

# YARN
spark-submit --master yarn --deploy-mode cluster app.py

# Kubernetes
spark-submit --master k8s://https://<api-server>:443 \
  --deploy-mode cluster app.py

Lazy Evaluation: transformações constroem um DAG; nada executa até uma ação ser chamada.

DAG → Stage 1 (map, filter) → shuffle boundary → Stage 2 (reduce) → Action

RDD vs DataFrame vs Dataset

RDDDataFrameDataset
LinguagemJava/Scala/PythonTodasJava/Scala
SchemaNãoSimSim
Otimizador CatalystNãoSimSim
Type-safeNão (Python)NãoSim
Uso idealcontrole fino, ML customETL, SQLScala/Java tipado

Jobs / Stages / Tasks: 1 ação = 1 job → stages separadas por shuffles → 1 task por partição por stage.

Partições: unidade de paralelismo. repartition(n) → shuffle completo. coalesce(n) → sem shuffle (só reduz).


SparkSession e Configuração

from pyspark.sql import SparkSession
from pyspark import SparkConf

conf = SparkConf() \
    .setAppName("MyApp") \
    .set("spark.executor.memory", "4g") \
    .set("spark.executor.cores", "2") \
    .set("spark.sql.shuffle.partitions", "200")

spark = SparkSession.builder \
    .master("local[*]") \
    .config(conf=conf) \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .enableHiveSupport() \
    .getOrCreate()

# Ajustar em runtime
spark.conf.set("spark.sql.shuffle.partitions", "100")

# Acessar configuração
print(spark.conf.get("spark.executor.memory"))

sc = spark.sparkContext  # acesso ao SparkContext

Parâmetros críticos

ParâmetroValor típicoDescrição
executor.memory4–16gHeap do executor
executor.cores2–5Threads por executor
shuffle.partitions200 (padrão)Partições após shuffle
driver.memory2–4gHeap do driver
memory.fraction0.6% heap para Spark
sql.adaptive.enabledtrueAtiva AQE

RDD

sc = spark.sparkContext

# Criação
rdd_mem = sc.parallelize([1, 2, 3, 4, 5], numSlices=4)
rdd_file = sc.textFile("hdfs:///data/orders.csv")

# Transformações (lazy)
mapped    = rdd_mem.map(lambda x: x * 2)
filtered  = rdd_mem.filter(lambda x: x > 2)
flat      = sc.parallelize(["a b", "c d"]).flatMap(lambda x: x.split())
pairs     = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
reduced   = pairs.reduceByKey(lambda a, b: a + b)   # ("a",4), ("b",2)
grouped   = pairs.groupByKey()                        # evitar em produção
sorted_r  = pairs.sortByKey(ascending=True)
joined    = pairs.join(sc.parallelize([("a", 10)]))  # inner join por chave

# Ações (executam o DAG)
result    = mapped.collect()
count     = rdd_mem.count()
top3      = rdd_mem.take(3)
rdd_file.saveAsTextFile("hdfs:///output/orders")

# Cache e Persist
from pyspark import StorageLevel
rdd_mem.cache()                                       # MEMORY_ONLY
rdd_mem.persist(StorageLevel.MEMORY_AND_DISK)
rdd_mem.unpersist()

# Broadcast variables — enviar dado grande para todos executors sem re-serializar
lookup = sc.broadcast({"NY": "New York", "CA": "California"})
rdd_mem.map(lambda x: lookup.value.get(x, "Unknown"))

# Accumulators — contador/somador thread-safe no driver
counter = sc.accumulator(0)
def count_errors(line):
    global counter
    if "ERROR" in line:
        counter += 1
rdd_file.foreach(count_errors)
print(counter.value)

Quando usar RDD vs DataFrame

Use RDD quando…Use DataFrame quando…
Dados não estruturados (texto, binário)Dados tabulares
Controle preciso de particionamentoETL, análise, SQL
Algoritmos ML customizadosInterop com Spark SQL
API Python pura com objetos complexosOtimização automática necessária

DataFrame — Criação e Leitura

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

spark = SparkSession.builder.getOrCreate()

# Schema explícito
schema = StructType([
    StructField("order_id",  IntegerType(), nullable=False),
    StructField("customer",  StringType(),  nullable=True),
    StructField("amount",    DoubleType(),  nullable=True),
    StructField("ts",        TimestampType(), nullable=True),
])

# De lista Python
data = [(1, "Alice", 99.9), (2, "Bob", 150.0)]
df = spark.createDataFrame(data, schema=["order_id", "customer", "amount"])

# CSV
df_csv = spark.read \
    .option("header", "true") \
    .option("sep", ",") \
    .option("mode", "DROPMALFORMED") \
    .option("inferSchema", "true") \
    .csv("s3a://bucket/orders/*.csv")

# CSV com schema explícito (mais eficiente)
df_csv = spark.read.schema(schema).csv("s3a://bucket/orders/")

# JSON
df_json = spark.read.option("multiLine", True).json("data/events/*.json")

# Parquet (schema embutido, mais rápido)
df_parquet = spark.read.parquet("data/warehouse/orders/")

# JDBC
df_jdbc = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://host:5432/db") \
    .option("dbtable", "orders") \
    .option("user", "usr") \
    .option("password", "pwd") \
    .option("numPartitions", "10") \
    .option("partitionColumn", "order_id") \
    .option("lowerBound", "1") \
    .option("upperBound", "1000000") \
    .load()

# Inspecionar
df.printSchema()
df.show(5, truncate=False)
df.dtypes          # lista de (nome, tipo)
df.count()

DataFrame — Transformações

from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType

# select e alias
df.select("order_id", F.col("amount").alias("valor"))

# filter / where (equivalentes)
df.filter(F.col("amount") > 100)
df.where("amount > 100 AND customer IS NOT NULL")

# withColumn / withColumnRenamed / drop
df = df.withColumn("amount_brl", F.col("amount") * 5.0)
df = df.withColumnRenamed("customer", "cliente")
df = df.drop("coluna_inutil")

# cast e lit
df = df.withColumn("amount", F.col("amount").cast(DoubleType()))
df = df.withColumn("source", F.lit("orders_v2"))

# when / otherwise
df = df.withColumn("tier",
    F.when(F.col("amount") >= 500, "gold")
     .when(F.col("amount") >= 100, "silver")
     .otherwise("bronze")
)

# expr() — SQL inline
df = df.withColumn("year", F.expr("year(ts)"))
df = df.selectExpr("order_id", "amount * 1.1 as amount_with_tax")

# Null handling
df.filter(F.col("amount").isNull())
df.filter(F.col("amount").isNotNull())
df2 = df.fillna({"amount": 0.0, "customer": "unknown"})
df3 = df.dropna(subset=["order_id", "amount"])
df4 = df.withColumn("amt", F.coalesce(F.col("amount"), F.lit(0.0)))

# UDF Python (serialização Python — mais lento)
from pyspark.sql.functions import udf
@udf(returnType=DoubleType())
def apply_discount(amount: float) -> float:
    return amount * 0.9 if amount and amount > 200 else amount

df = df.withColumn("discounted", apply_discount(F.col("amount")))

# Pandas UDF / Vectorized UDF (Arrow — muito mais rápido)
from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf(DoubleType())
def apply_discount_vec(s: pd.Series) -> pd.Series:
    return s.where(s <= 200, s * 0.9)

df = df.withColumn("discounted", apply_discount_vec(F.col("amount")))

Aggregações e GroupBy

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# groupBy + agg
df_agg = df.groupBy("customer").agg(
    F.sum("amount").alias("total"),
    F.count("order_id").alias("num_orders"),
    F.avg("amount").alias("avg_amount"),
    F.countDistinct("product_id").alias("unique_products"),
    F.collect_list("order_id").alias("order_ids"),
    F.collect_set("status").alias("statuses"),
    F.max("ts").alias("last_order"),
)

# pivot — clientes por status de pedido
df.groupBy("customer").pivot("status", ["pending", "done", "cancelled"]) \
  .agg(F.count("order_id")).fillna(0)

# cube e rollup
df.cube("customer", "product_id").agg(F.sum("amount"))
df.rollup("customer", "product_id").agg(F.sum("amount"))

# Window Functions
w_customer = Window.partitionBy("customer").orderBy(F.col("ts").asc())
w_all_time  = Window.partitionBy("customer").orderBy("ts") \
                .rowsBetween(Window.unboundedPreceding, Window.currentRow)

df = df.withColumn("row_num",   F.row_number().over(w_customer))
df = df.withColumn("rank",      F.rank().over(w_customer))
df = df.withColumn("dense_rk",  F.dense_rank().over(w_customer))
df = df.withColumn("prev_amt",  F.lag("amount", 1).over(w_customer))
df = df.withColumn("next_amt",  F.lead("amount", 1).over(w_customer))
df = df.withColumn("run_total", F.sum("amount").over(w_all_time))

# Percentil como window (approx)
df = df.withColumn("pct50", F.percentile_approx("amount", 0.5).over(
    Window.partitionBy("customer")
))

Joins

orders   = spark.read.parquet("data/orders/")
products = spark.read.parquet("data/products/")
events   = spark.read.parquet("data/events/")

# Tipos de join
orders.join(products, "product_id", "inner")
orders.join(products, "product_id", "left")
orders.join(products, "product_id", "right")
orders.join(products, "product_id", "full")
orders.crossJoin(products)                        # produto cartesiano

# Semi e Anti join
orders.join(products, "product_id", "left_semi")  # apenas ordens com produto
orders.join(products, "product_id", "left_anti")  # apenas ordens sem produto

# Múltiplas colunas
orders.join(products,
    (orders.product_id == products.id) & (orders.region == products.region),
    "inner"
)

# Broadcast hint — forçar broadcast de tabela pequena
from pyspark.sql.functions import broadcast
orders.join(broadcast(products), "product_id", "inner")

# Spark faz broadcast automático se tabela < spark.sql.autoBroadcastJoinThreshold (10MB padrão)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", str(50 * 1024 * 1024))  # 50MB

# Sort-Merge Join (padrão para tabelas grandes — eficiente com shuffle)
# Verificar com explain()
orders.join(products, "product_id").explain(mode="formatted")

# Skew Join — AQE automático
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# Skew Join manual — salting
import pyspark.sql.functions as F
salt_factor = 10
orders_s = orders.withColumn("salt", (F.rand() * salt_factor).cast("int")) \
                 .withColumn("key_salted", F.concat_ws("_", "product_id", "salt"))
products_s = products.withColumn("salt", F.explode(F.array([F.lit(i) for i in range(salt_factor)]))) \
                      .withColumn("key_salted", F.concat_ws("_", "id", "salt"))
orders_s.join(products_s, "key_salted", "inner")

Spark SQL

# Registrar view e consultar
orders.createOrReplaceTempView("orders")
products.createOrReplaceTempView("products")

result = spark.sql("""
    SELECT
        o.customer,
        p.category,
        SUM(o.amount) AS total,
        COUNT(*)      AS num_orders
    FROM orders o
    JOIN products p ON o.product_id = p.id
    WHERE o.status = 'done'
      AND o.ts >= DATE_TRUNC('month', CURRENT_DATE)
    GROUP BY 1, 2
    ORDER BY total DESC
""")

# CTE
spark.sql("""
    WITH monthly AS (
        SELECT customer, DATE_FORMAT(ts, 'yyyy-MM') AS month, SUM(amount) AS total
        FROM orders
        GROUP BY 1, 2
    ),
    ranked AS (
        SELECT *, ROW_NUMBER() OVER (PARTITION BY customer ORDER BY total DESC) AS rn
        FROM monthly
    )
    SELECT * FROM ranked WHERE rn = 1
""")

# LATERAL VIEW EXPLODE
spark.sql("""
    SELECT order_id, tag
    FROM orders
    LATERAL VIEW EXPLODE(tags) t AS tag
""")

# QUALIFY (filtra após window function — Spark 3.4+)
spark.sql("""
    SELECT *
    FROM orders
    QUALIFY ROW_NUMBER() OVER (PARTITION BY customer ORDER BY ts DESC) = 1
""")

# CASE WHEN
spark.sql("""
    SELECT order_id,
           CASE WHEN amount >= 500 THEN 'gold'
                WHEN amount >= 100 THEN 'silver'
                ELSE 'bronze' END AS tier
    FROM orders
""")

# Funções de data
spark.sql("""
    SELECT
        DATE_TRUNC('week', ts)         AS week_start,
        DATE_FORMAT(ts, 'yyyy-MM-dd')  AS date_str,
        REGEXP_REPLACE(customer, '\\s+', '_') AS customer_slug
    FROM orders
""")

Funções Built-in

from pyspark.sql import functions as F

# --- Strings ---
df.withColumn("full", F.concat(F.col("first"), F.lit(" "), F.col("last")))
df.withColumn("parts", F.split(F.col("tags"), ","))
df.withColumn("clean", F.regexp_replace(F.col("email"), r"\s+", ""))
df.withColumn("name", F.trim(F.col("name")))
df.withColumn("sub", F.substring(F.col("code"), 1, 3))
df.withColumn("upper", F.upper(F.col("name")))
df.withColumn("len", F.length(F.col("name")))

# --- Datas ---
df.withColumn("date",    F.to_date(F.col("date_str"), "yyyy-MM-dd"))
df.withColumn("ts",      F.to_timestamp(F.col("ts_str"), "yyyy-MM-dd HH:mm:ss"))
df.withColumn("d7",      F.date_add(F.col("date"), 7))
df.withColumn("diff",    F.datediff(F.col("end_date"), F.col("start_date")))
df.withColumn("fmt",     F.date_format(F.col("ts"), "yyyy/MM/dd"))
df.withColumn("yr",      F.year(F.col("ts")))
df.withColumn("mo",      F.month(F.col("ts")))
df.withColumn("dy",      F.dayofmonth(F.col("ts")))
df.withColumn("trunc",   F.date_trunc("month", F.col("ts")))

# --- Arrays ---
df.withColumn("exploded", F.explode(F.col("tags")))           # 1 row per element
df.withColumn("outer",    F.explode_outer(F.col("tags")))     # mantém nulls
df.withColumn("has_vip",  F.array_contains(F.col("tags"), "vip"))
df.withColumn("sz",       F.size(F.col("tags")))
df.withColumn("flat",     F.flatten(F.col("nested_arrays")))
# transform (Spark 3.1+)
df.withColumn("upper_tags", F.transform(F.col("tags"), lambda x: F.upper(x)))
df.withColumn("long_tags",  F.filter(F.col("tags"), lambda x: F.length(x) > 3))
df.withColumn("total",      F.aggregate(F.col("amounts"), F.lit(0.0),
                                        lambda acc, x: acc + x))

# --- JSON ---
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
payload_schema = StructType([
    StructField("product_id", StringType()),
    StructField("price", DoubleType()),
])
df.withColumn("parsed",   F.from_json(F.col("payload"), payload_schema))
df.withColumn("json_str", F.to_json(F.struct("order_id", "amount")))
df.withColumn("pid",      F.get_json_object(F.col("payload"), "$.product_id"))

Leitura e Escrita

# Parquet (compressão recomendada)
df.write \
    .mode("overwrite") \
    .option("compression", "snappy") \   # ou "zstd" (melhor ratio)
    .parquet("s3a://bucket/orders/")

# Particionado por coluna
df.write \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet("s3a://bucket/orders_partitioned/")

# Bucketing (para joins frequentes — requer tabela Hive)
df.write \
    .mode("overwrite") \
    .bucketBy(64, "customer_id") \
    .sortBy("customer_id") \
    .saveAsTable("orders_bucketed")

# ORC
df.write.mode("append").orc("data/orc/orders/")

# CSV
df.write.mode("overwrite").option("header", True).csv("output/orders_csv/")

# JSON
df.write.mode("overwrite").json("output/orders_json/")

# JDBC
df.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://host:5432/db") \
    .option("dbtable", "orders_staging") \
    .option("user", "usr").option("password", "pwd") \
    .mode("append").save()

# repartition vs coalesce
df.repartition(50).write.parquet("...")           # shuffle, redistribuição uniforme
df.coalesce(10).write.parquet("...")              # sem shuffle, só reduz partições
df.repartition(50, "customer_id").write.parquet("...")  # partições por coluna (hash)

Structured Streaming

# Source — Kafka
stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "orders-topic") \
    .option("startingOffsets", "earliest") \
    .load()

from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql import functions as F

order_schema = StructType([
    StructField("order_id", StringType()),
    StructField("amount",   DoubleType()),
    StructField("ts",       TimestampType()),
])

orders_stream = stream_df \
    .select(F.from_json(F.col("value").cast("string"), order_schema).alias("data")) \
    .select("data.*")

# Source — arquivo (monitorar pasta)
file_stream = spark.readStream \
    .schema(order_schema) \
    .option("maxFilesPerTrigger", "10") \
    .parquet("s3a://bucket/landing/")

# Watermark + window aggregation
windowed = orders_stream \
    .withWatermark("ts", "10 minutes") \
    .groupBy(F.window("ts", "5 minutes"), "order_id") \
    .agg(F.sum("amount").alias("total"))

# Output modes
# append   — apenas novas linhas (sem agregação ou com watermark)
# complete — toda a tabela a cada micro-batch
# update   — apenas linhas alteradas

# Sink — console (debug)
query = windowed.writeStream \
    .outputMode("update") \
    .format("console") \
    .trigger(processingTime="30 seconds") \
    .start()

# Sink — Parquet com checkpoint
query = orders_stream.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "s3a://bucket/streaming/orders/") \
    .option("checkpointLocation", "s3a://bucket/checkpoints/orders/") \
    .trigger(processingTime="1 minute") \
    .start()

# Trigger once / availableNow (batch incremental)
query = file_stream.writeStream \
    .trigger(availableNow=True) \
    .format("delta") \
    .option("checkpointLocation", "/checkpoints/orders/") \
    .start("data/delta/orders/")

# foreachBatch — lógica customizada por micro-batch
def process_batch(batch_df, batch_id):
    batch_df.persist()
    batch_df.write.mode("append").parquet("output/orders/")
    batch_df.unpersist()

orders_stream.writeStream \
    .foreachBatch(process_batch) \
    .option("checkpointLocation", "/checkpoints/") \
    .start()

query.awaitTermination()
query.stop()

Performance e Otimização

Catalyst + Tungsten

  • Catalyst: otimizador lógico/físico — predicate pushdown, column pruning, constant folding, join reorder.
  • Tungsten: execução off-heap, codegen, operações binárias sem serialização Java.
# Verificar plano de execução
df.explain()                        # plano físico simples
df.explain(mode="extended")         # lógico + físico
df.explain(mode="formatted")        # mais legível (Spark 3+)
df.explain(mode="cost")             # com estimativas de custo

# Predicate pushdown — filtrar antes do join
# RUIM:
big.join(small, "id").filter("amount > 100")
# BOM:
big.filter("amount > 100").join(small, "id")

# Column pruning — selecionar apenas o necessário antes de operações caras
df.select("order_id", "amount", "customer").groupBy("customer").sum("amount")

# AQE — Adaptive Query Execution (Spark 3+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# AQE ajusta shuffle.partitions, converte sort-merge em broadcast, trata skew automaticamente

# shuffle.partitions tuning
# Regra: ~128MB por partição após shuffle
# Para 10GB de dados: 10*1024 / 128 ≈ 80 partições
spark.conf.set("spark.sql.shuffle.partitions", "80")

# Cache vs Persist
df_heavy = df.join(products, "product_id").filter("amount > 0")
df_heavy.cache()         # MEMORY_ONLY — perde se não couber
df_heavy.persist()       # MEMORY_AND_DISK — mais seguro

# Usar depois:
df_heavy.count()         # força materialização
df_heavy.unpersist()     # liberar quando não precisar mais

# Evitar UDFs Python sempre que possível — quebram Catalyst
# Preferir F.when, F.expr, funções nativas do módulo functions

# Detectar skew
df.groupBy("customer_id").count().orderBy(F.col("count").desc()).show(20)

Delta Lake

# Requer: pip install delta-spark
# spark = SparkSession.builder.config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")
#           .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
#           .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
#           .getOrCreate()

from delta.tables import DeltaTable

# Criar tabela Delta
df.write.format("delta").mode("overwrite").save("data/delta/orders/")

# Criar via SQL
spark.sql("""
    CREATE TABLE IF NOT EXISTS orders (
        order_id  INT,
        customer  STRING,
        amount    DOUBLE,
        status    STRING,
        ts        TIMESTAMP
    )
    USING delta
    LOCATION 'data/delta/orders/'
    PARTITIONED BY (status)
""")

# MERGE INTO (upsert) — SQL
spark.sql("""
    MERGE INTO orders AS t
    USING orders_staging AS s
    ON t.order_id = s.order_id
    WHEN MATCHED THEN
        UPDATE SET t.amount = s.amount, t.status = s.status
    WHEN NOT MATCHED THEN
        INSERT *
""")

# MERGE — PySpark API
dt = DeltaTable.forPath(spark, "data/delta/orders/")
dt.alias("t").merge(
    orders_staging.alias("s"),
    "t.order_id = s.order_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# UPDATE e DELETE
dt.update(
    condition = F.col("status") == "pending",
    set       = {"status": F.lit("cancelled")}
)
dt.delete(F.col("ts") < F.lit("2020-01-01").cast("timestamp"))

# Time Travel
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load("data/delta/orders/")
df_ts  = spark.read.format("delta").option("timestampAsOf", "2024-01-01").load("data/delta/orders/")

spark.sql("SELECT * FROM orders VERSION AS OF 3")
spark.sql("DESCRIBE HISTORY orders")

# VACUUM — remover arquivos antigos (padrão: 7 dias de retenção)
spark.sql("VACUUM orders RETAIN 168 HOURS")
dt.vacuum(retentionHours=168)

# OPTIMIZE + ZORDER — compactar e ordenar arquivos
spark.sql("OPTIMIZE orders ZORDER BY (customer, ts)")

# Change Data Feed — capturar mudanças incrementais
spark.conf.set("spark.databricks.delta.properties.defaults.enableChangeDataFeed", "true")
spark.sql("ALTER TABLE orders SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')")

cdf = spark.read.format("delta") \
    .option("readChangeData", "true") \
    .option("startingVersion", 5) \
    .load("data/delta/orders/")
# Colunas extras: _change_type (insert/update_preimage/update_postimage/delete)

Delta vs Parquet puro

RecursoDeltaParquet
ACID transactionsSimNão
Schema enforcementSimNão
Time travelSimNão
MERGE/UPDATE/DELETESimNão
Change Data FeedSimNão
Leitura pura (sem escritas)Ligeiramente mais lentoMais rápido

PySpark Boas Práticas

# Type hints em todas as funções
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from typing import List

def filter_active_orders(df: DataFrame, min_amount: float = 0.0) -> DataFrame:
    """Retorna apenas pedidos ativos acima de min_amount."""
    return df.filter(
        (F.col("status") == "done") & (F.col("amount") > min_amount)
    )

def enrich_with_products(
    orders: DataFrame,
    products: DataFrame,
    join_cols: List[str] = ["product_id"]
) -> DataFrame:
    return orders.join(F.broadcast(products), join_cols, "left")

# Nunca collect() em dados grandes — estoura memória do driver
# RUIM:
all_orders = df.collect()              # traz TUDO para o driver
# BOM:
df.write.parquet("output/")           # processa distribuído
sample = df.limit(1000).collect()      # amostra pequena

# Schema enforcement — nunca inferSchema em produção
ORDERS_SCHEMA = StructType([
    StructField("order_id", IntegerType(), False),
    StructField("customer", StringType(),  True),
    StructField("amount",   DoubleType(),  True),
])
df = spark.read.schema(ORDERS_SCHEMA).csv("data/orders/")

# Logging — no driver (print/logging padrão); nos executors usar accumulators
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.info("Lendo %d registros", df.count())

# Estrutura de projeto
# projeto/
# ├── jobs/
# │   ├── orders_etl.py          # entry-point spark-submit
# │   └── products_sync.py
# ├── transformations/
# │   ├── orders.py              # funções puras DataFrame → DataFrame
# │   └── products.py
# ├── schemas/
# │   └── orders.py              # StructType definitions
# ├── tests/
# │   ├── conftest.py            # fixture SparkSession
# │   └── test_orders.py
# └── requirements.txt

# pytest + SparkSession local (conftest.py)
import pytest
from pyspark.sql import SparkSession

@pytest.fixture(scope="session")
def spark() -> SparkSession:
    return SparkSession.builder \
        .master("local[2]") \
        .appName("tests") \
        .config("spark.sql.shuffle.partitions", "2") \
        .getOrCreate()

# test_orders.py
def test_filter_active_orders(spark: SparkSession):
    data = [(1, "done", 200.0), (2, "pending", 50.0), (3, "done", 10.0)]
    df = spark.createDataFrame(data, ["order_id", "status", "amount"])
    result = filter_active_orders(df, min_amount=100.0)
    assert result.count() == 1
    assert result.collect()[0]["order_id"] == 1

# spark-submit — empacotar dependências
# spark-submit \
#   --master yarn \
#   --deploy-mode cluster \
#   --py-files dist/libs.zip \
#   --files config/prod.yaml \
#   jobs/orders_etl.py \
#   --config prod.yaml

# Evitar
# - groupByKey em RDD (prefira reduceByKey)
# - UDFs Python sem necessidade (use funções nativas)
# - collect() em DataFrames grandes
# - inferSchema=true em produção
# - shuffle.partitions padrão (200) sem ajuste
# - joins sem verificar broadcast eligibility

---

## MLlib — Pipelines de Machine Learning

O MLlib é a biblioteca de Machine Learning do Spark. A abstração central é o `Pipeline`, que encadeia `Transformers` (transformam DataFrames) e `Estimators` (treinam modelos) em um fluxo reproduzível. Hyperparameter tuning é feito com `ParamGridBuilder` + `CrossValidator` ou `TrainValidationSplit`.

```python
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# ── Dados de exemplo ─────────────────────────────────────────────
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("MLlib-Demo").getOrCreate()

data = spark.createDataFrame([
    (0, 1.0, 0.5, 2.1, "cat_a"),
    (1, 2.0, 1.5, 0.8, "cat_b"),
    (1, 3.0, 2.0, 1.2, "cat_a"),
    (0, 0.5, 0.1, 3.0, "cat_b"),
    (1, 2.5, 1.8, 1.5, "cat_a"),
], ["label", "feat1", "feat2", "feat3", "category"])

train, test = data.randomSplit([0.8, 0.2], seed=42)

# ── Etapa 1: StringIndexer — categoria → índice numérico ─────────
# Transformer: transforma coluna sem treinar
indexer = StringIndexer(inputCol="category", outputCol="category_idx")

# ── Etapa 2: VectorAssembler — colunas → vetor de features ───────
# Transformer: combina colunas numéricas em um único vetor
assembler = VectorAssembler(
    inputCols=["feat1", "feat2", "feat3", "category_idx"],
    outputCol="features_raw"
)

# ── Etapa 3: StandardScaler — normalizar features ────────────────
# Estimator: aprende média/desvio no treino, transforma nos dois splits
scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withMean=True,
    withStd=True
)

# ── Etapa 4: LogisticRegression — modelo de classificação ────────
# Estimator: treina e retorna um Transformer (LogisticRegressionModel)
lr = LogisticRegression(
    featuresCol="features",
    labelCol="label",
    maxIter=100,
    regParam=0.01
)

# ── Pipeline — encadeia todas as etapas ──────────────────────────
pipeline = Pipeline(stages=[indexer, assembler, scaler, lr])

# Sem tuning: treinar diretamente
model = pipeline.fit(train)
predictions = model.transform(test)
predictions.select("label", "prediction", "probability").show()

# ── ParamGrid + CrossValidator — hyperparameter tuning ───────────
param_grid = (ParamGridBuilder()
    .addGrid(lr.regParam,  [0.01, 0.1, 1.0])   # força de regularização
    .addGrid(lr.maxIter,   [50, 100])
    .addGrid(lr.elasticNetParam, [0.0, 0.5])    # 0=L2, 1=L1
    .build())                                   # gera 3 × 2 × 2 = 12 combinações

evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    metricName="areaUnderROC"
)

cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=3,          # 3-fold cross-validation
    parallelism=4,       # avalia 4 combinações em paralelo
    seed=42
)

cv_model = cv.fit(train)

# Melhor modelo e seus parâmetros
best_model = cv_model.bestModel
best_lr    = best_model.stages[-1]   # último estágio = LR treinado
print("Melhor regParam:", best_lr.getRegParam())
print("Melhor maxIter: ", best_lr.getMaxIter())

# AUC no conjunto de teste
auc = evaluator.evaluate(cv_model.transform(test))
print(f"AUC no teste: {auc:.4f}")

# ── Salvar e carregar pipeline ────────────────────────────────────
cv_model.bestModel.write().overwrite().save("s3://bucket/models/lr_pipeline")

from pyspark.ml import PipelineModel
loaded = PipelineModel.load("s3://bucket/models/lr_pipeline")
loaded.transform(test).show()

SQL Optimizer Hints

Hints permitem instruir o Catalyst Optimizer a usar uma estratégia específica de join ou particionamento, substituindo as decisões automáticas quando você tem conhecimento do domínio que o otimizador não tem.

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("Hints-Demo").getOrCreate()

# Dados de exemplo
orders   = spark.range(10_000_000).withColumnRenamed("id", "order_id")
products = spark.range(500).withColumnRenamed("id", "product_id")

# Registrar como views para uso em SQL puro
orders.createOrReplaceTempView("orders")
products.createOrReplaceTempView("products")

# ── BROADCAST — forçar broadcast join ────────────────────────────
# Use quando uma tabela é pequena o suficiente para caber em memória de cada executor
# Elimina o shuffle: a tabela pequena é copiada para todos os nodes
result = spark.sql("""
    SELECT /*+ BROADCAST(p) */ o.order_id, p.product_id
    FROM orders o
    JOIN products p ON o.order_id = p.product_id
""")
# Equivalente via DataFrame API:
from pyspark.sql.functions import broadcast
result_df = orders.join(broadcast(products), "order_id")

# ── MERGE — forçar sort-merge join ────────────────────────────────
# Use quando duas tabelas grandes já estão ordenadas pela chave de join
# Evita hash join que pode causar OOM com dados muito grandes
result2 = spark.sql("""
    SELECT /*+ MERGE(o, p) */ o.order_id, p.product_id
    FROM orders o
    JOIN products p ON o.order_id = p.product_id
""")

# ── SHUFFLE_HASH — forçar shuffle hash join ───────────────────────
# Use quando uma tabela é moderada (não cabe em broadcast, mas é menor que a outra)
# Constrói hash table em memória por partição — mais rápido que sort-merge se couber
result3 = spark.sql("""
    SELECT /*+ SHUFFLE_HASH(p) */ o.order_id, p.product_id
    FROM orders o
    JOIN products p ON o.order_id = p.product_id
""")

# ── REPARTITION — controlar número de partições com shuffle ───────
# Use para aumentar paralelismo antes de operações pesadas
# Diferente de coalesce: faz shuffle, pode aumentar ou diminuir partições
result4 = spark.sql("""
    SELECT /*+ REPARTITION(200) */ * FROM orders WHERE order_id > 1000
""")
# Com coluna específica (útil antes de joins/groupBy pela mesma chave)
result5 = spark.sql("""
    SELECT /*+ REPARTITION(100, order_id) */ * FROM orders
""")

# ── COALESCE — reduzir partições SEM shuffle ─────────────────────
# Use para reduzir o número de partições antes de writes (evita muitos arquivos pequenos)
# Não pode aumentar partições — use REPARTITION para isso
result6 = spark.sql("""
    SELECT /*+ COALESCE(10) */ * FROM orders WHERE order_id < 1000
""")

# ── Quando usar cada hint ─────────────────────────────────────────
# BROADCAST    → tabela pequena (<= spark.sql.autoBroadcastJoinThreshold, padrão 10 MB)
#                força mesmo acima do threshold se você sabe que cabe
# MERGE        → ambas as tabelas grandes e já particionadas/ordenadas pela chave
# SHUFFLE_HASH → tabela do lado menor é moderada; OOM em sort-merge (dados skewed)
# REPARTITION  → antes de joins/groupBy para alinhar particionamento
# COALESCE     → antes de df.write para consolidar arquivos de saída

# ── Verificar no plano de execução ───────────────────────────────
# explain("cost") mostra estimativas de custo e o plano escolhido
spark.sql("""
    SELECT /*+ BROADCAST(p) */ o.order_id, p.product_id
    FROM orders o JOIN products p ON o.order_id = p.product_id
""").explain("cost")
# Procure por: BroadcastHashJoin, SortMergeJoin, ShuffledHashJoin no plano físico
# "Statistics" mostra estimativas de linhas e bytes que guiam o otimizador

Adaptive Query Execution (AQE)

O AQE é um otimizador dinâmico introduzido no Spark 3.0 que reotimiza o plano de execução em tempo de execução, usando estatísticas reais coletadas durante o processamento — diferente do Catalyst que só usa estatísticas estáticas (de catálogo).

O que o AQE resolve

ProblemaSolução AQE
shuffle.partitions=200 gera partições minúsculascoalescePartitions mescla partições pequenas automaticamente
Estimativas ruins de cardinalidade escolhem join erradoReplaneja joins com dados reais
Skew em joins causa straggler tasksskewJoin divide partições grandes automaticamente
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.sql.adaptive.enabled",                     "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled",  "true") \
    .config("spark.sql.adaptive.skewJoin.enabled",            "true") \
    .config("spark.sql.adaptive.coalescePartitions.minPartitionSize", "64mb") \
    .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128mb") \
    .getOrCreate()

# ── spark.sql.adaptive.enabled ────────────────────────────────────
# Ativa o AQE. Padrão: true no Spark 3.2+
# Com AQE, o plano é reotimizado em cada "query stage" (após cada shuffle)

# ── coalescePartitions — anti small-files ────────────────────────
# Após shuffle com 200 partições, se os dados resultantes forem pequenos,
# AQE mescla as partições para atingir ~128 MB cada
# advisoryPartitionSizeInBytes: tamanho alvo por partição após coalesce
# minPartitionSize: não mescla abaixo deste tamanho (evitar partições gigantes)

# ── skewJoin — lidar com dados desbalanceados ─────────────────────
# Se uma partição for > spark.sql.adaptive.skewJoin.skewedPartitionFactor (padrão 5x)
# a mediana E > spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (padrão 256 MB),
# o AQE divide essa partição automaticamente e duplica o lado oposto do join

# Configurações avançadas de skewJoin
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor",           "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256mb")

# ── Verificar se AQE está ajudando via Spark UI ───────────────────
# 1. Spark UI → aba "SQL/DataFrame" → clique no job
# 2. No DAG, procure por nós com "(AQE)" no nome:
#    - "AQEShuffleRead" → partições foram mescladas/divididas
#    - "CustomShuffleReader" → AQE customizou a leitura do shuffle
# 3. Aba "Stages" → compare "Input Size" vs "Output Size" por stage
#    → partições muito pequenas ou muito grandes indicam oportunidade de tuning
# 4. Stage com "skewed" na descrição → skewJoin foi acionado

# ── Diagnóstico via explain ───────────────────────────────────────
df = spark.range(10_000_000).groupBy("id").count()
df.explain(True)
# Com AQE, o plano "analyzed" e "optimized" são estáticos;
# o plano "executed" (disponível após a action) mostra as decisões reais do AQE

# ── Desabilitar AQE seletivamente (debug) ────────────────────────
spark.conf.set("spark.sql.adaptive.enabled", "false")
df.explain("cost")  # ver plano sem AQE para comparar
spark.conf.set("spark.sql.adaptive.enabled", "true")