Fundamentos Spark
Arquitetura: Driver (orquestra), Executors (processam dados), Cluster Manager (YARN/k8s/Standalone).
| Conceito | Descrição |
|---|---|
| Driver | JVM principal; mantém SparkContext, envia tasks |
| Executor | Processo em cada worker node; memória + CPU |
| SparkContext | Ponto de entrada legado (RDD) |
| SparkSession | Ponto de entrada moderno (DataFrame, SQL) |
Modos de execução
# Local — tudo no mesmo processo
spark = SparkSession.builder.master("local[*]").getOrCreate()
# YARN
spark-submit --master yarn --deploy-mode cluster app.py
# Kubernetes
spark-submit --master k8s://https://<api-server>:443 \
--deploy-mode cluster app.pyLazy Evaluation: transformações constroem um DAG; nada executa até uma ação ser chamada.
DAG → Stage 1 (map, filter) → shuffle boundary → Stage 2 (reduce) → ActionRDD vs DataFrame vs Dataset
| RDD | DataFrame | Dataset | |
|---|---|---|---|
| Linguagem | Java/Scala/Python | Todas | Java/Scala |
| Schema | Não | Sim | Sim |
| Otimizador Catalyst | Não | Sim | Sim |
| Type-safe | Não (Python) | Não | Sim |
| Uso ideal | controle fino, ML custom | ETL, SQL | Scala/Java tipado |
Jobs / Stages / Tasks: 1 ação = 1 job → stages separadas por shuffles → 1 task por partição por stage.
Partições: unidade de paralelismo. repartition(n) → shuffle completo. coalesce(n) → sem shuffle (só reduz).
SparkSession e Configuração
from pyspark.sql import SparkSession
from pyspark import SparkConf
conf = SparkConf() \
.setAppName("MyApp") \
.set("spark.executor.memory", "4g") \
.set("spark.executor.cores", "2") \
.set("spark.sql.shuffle.partitions", "200")
spark = SparkSession.builder \
.master("local[*]") \
.config(conf=conf) \
.config("spark.driver.memory", "2g") \
.config("spark.sql.adaptive.enabled", "true") \
.enableHiveSupport() \
.getOrCreate()
# Ajustar em runtime
spark.conf.set("spark.sql.shuffle.partitions", "100")
# Acessar configuração
print(spark.conf.get("spark.executor.memory"))
sc = spark.sparkContext # acesso ao SparkContextParâmetros críticos
| Parâmetro | Valor típico | Descrição |
|---|---|---|
executor.memory | 4–16g | Heap do executor |
executor.cores | 2–5 | Threads por executor |
shuffle.partitions | 200 (padrão) | Partições após shuffle |
driver.memory | 2–4g | Heap do driver |
memory.fraction | 0.6 | % heap para Spark |
sql.adaptive.enabled | true | Ativa AQE |
RDD
sc = spark.sparkContext
# Criação
rdd_mem = sc.parallelize([1, 2, 3, 4, 5], numSlices=4)
rdd_file = sc.textFile("hdfs:///data/orders.csv")
# Transformações (lazy)
mapped = rdd_mem.map(lambda x: x * 2)
filtered = rdd_mem.filter(lambda x: x > 2)
flat = sc.parallelize(["a b", "c d"]).flatMap(lambda x: x.split())
pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
reduced = pairs.reduceByKey(lambda a, b: a + b) # ("a",4), ("b",2)
grouped = pairs.groupByKey() # evitar em produção
sorted_r = pairs.sortByKey(ascending=True)
joined = pairs.join(sc.parallelize([("a", 10)])) # inner join por chave
# Ações (executam o DAG)
result = mapped.collect()
count = rdd_mem.count()
top3 = rdd_mem.take(3)
rdd_file.saveAsTextFile("hdfs:///output/orders")
# Cache e Persist
from pyspark import StorageLevel
rdd_mem.cache() # MEMORY_ONLY
rdd_mem.persist(StorageLevel.MEMORY_AND_DISK)
rdd_mem.unpersist()
# Broadcast variables — enviar dado grande para todos executors sem re-serializar
lookup = sc.broadcast({"NY": "New York", "CA": "California"})
rdd_mem.map(lambda x: lookup.value.get(x, "Unknown"))
# Accumulators — contador/somador thread-safe no driver
counter = sc.accumulator(0)
def count_errors(line):
global counter
if "ERROR" in line:
counter += 1
rdd_file.foreach(count_errors)
print(counter.value)Quando usar RDD vs DataFrame
| Use RDD quando… | Use DataFrame quando… |
|---|---|
| Dados não estruturados (texto, binário) | Dados tabulares |
| Controle preciso de particionamento | ETL, análise, SQL |
| Algoritmos ML customizados | Interop com Spark SQL |
| API Python pura com objetos complexos | Otimização automática necessária |
DataFrame — Criação e Leitura
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
spark = SparkSession.builder.getOrCreate()
# Schema explícito
schema = StructType([
StructField("order_id", IntegerType(), nullable=False),
StructField("customer", StringType(), nullable=True),
StructField("amount", DoubleType(), nullable=True),
StructField("ts", TimestampType(), nullable=True),
])
# De lista Python
data = [(1, "Alice", 99.9), (2, "Bob", 150.0)]
df = spark.createDataFrame(data, schema=["order_id", "customer", "amount"])
# CSV
df_csv = spark.read \
.option("header", "true") \
.option("sep", ",") \
.option("mode", "DROPMALFORMED") \
.option("inferSchema", "true") \
.csv("s3a://bucket/orders/*.csv")
# CSV com schema explícito (mais eficiente)
df_csv = spark.read.schema(schema).csv("s3a://bucket/orders/")
# JSON
df_json = spark.read.option("multiLine", True).json("data/events/*.json")
# Parquet (schema embutido, mais rápido)
df_parquet = spark.read.parquet("data/warehouse/orders/")
# JDBC
df_jdbc = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://host:5432/db") \
.option("dbtable", "orders") \
.option("user", "usr") \
.option("password", "pwd") \
.option("numPartitions", "10") \
.option("partitionColumn", "order_id") \
.option("lowerBound", "1") \
.option("upperBound", "1000000") \
.load()
# Inspecionar
df.printSchema()
df.show(5, truncate=False)
df.dtypes # lista de (nome, tipo)
df.count()DataFrame — Transformações
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
# select e alias
df.select("order_id", F.col("amount").alias("valor"))
# filter / where (equivalentes)
df.filter(F.col("amount") > 100)
df.where("amount > 100 AND customer IS NOT NULL")
# withColumn / withColumnRenamed / drop
df = df.withColumn("amount_brl", F.col("amount") * 5.0)
df = df.withColumnRenamed("customer", "cliente")
df = df.drop("coluna_inutil")
# cast e lit
df = df.withColumn("amount", F.col("amount").cast(DoubleType()))
df = df.withColumn("source", F.lit("orders_v2"))
# when / otherwise
df = df.withColumn("tier",
F.when(F.col("amount") >= 500, "gold")
.when(F.col("amount") >= 100, "silver")
.otherwise("bronze")
)
# expr() — SQL inline
df = df.withColumn("year", F.expr("year(ts)"))
df = df.selectExpr("order_id", "amount * 1.1 as amount_with_tax")
# Null handling
df.filter(F.col("amount").isNull())
df.filter(F.col("amount").isNotNull())
df2 = df.fillna({"amount": 0.0, "customer": "unknown"})
df3 = df.dropna(subset=["order_id", "amount"])
df4 = df.withColumn("amt", F.coalesce(F.col("amount"), F.lit(0.0)))
# UDF Python (serialização Python — mais lento)
from pyspark.sql.functions import udf
@udf(returnType=DoubleType())
def apply_discount(amount: float) -> float:
return amount * 0.9 if amount and amount > 200 else amount
df = df.withColumn("discounted", apply_discount(F.col("amount")))
# Pandas UDF / Vectorized UDF (Arrow — muito mais rápido)
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(DoubleType())
def apply_discount_vec(s: pd.Series) -> pd.Series:
return s.where(s <= 200, s * 0.9)
df = df.withColumn("discounted", apply_discount_vec(F.col("amount")))Aggregações e GroupBy
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# groupBy + agg
df_agg = df.groupBy("customer").agg(
F.sum("amount").alias("total"),
F.count("order_id").alias("num_orders"),
F.avg("amount").alias("avg_amount"),
F.countDistinct("product_id").alias("unique_products"),
F.collect_list("order_id").alias("order_ids"),
F.collect_set("status").alias("statuses"),
F.max("ts").alias("last_order"),
)
# pivot — clientes por status de pedido
df.groupBy("customer").pivot("status", ["pending", "done", "cancelled"]) \
.agg(F.count("order_id")).fillna(0)
# cube e rollup
df.cube("customer", "product_id").agg(F.sum("amount"))
df.rollup("customer", "product_id").agg(F.sum("amount"))
# Window Functions
w_customer = Window.partitionBy("customer").orderBy(F.col("ts").asc())
w_all_time = Window.partitionBy("customer").orderBy("ts") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn("row_num", F.row_number().over(w_customer))
df = df.withColumn("rank", F.rank().over(w_customer))
df = df.withColumn("dense_rk", F.dense_rank().over(w_customer))
df = df.withColumn("prev_amt", F.lag("amount", 1).over(w_customer))
df = df.withColumn("next_amt", F.lead("amount", 1).over(w_customer))
df = df.withColumn("run_total", F.sum("amount").over(w_all_time))
# Percentil como window (approx)
df = df.withColumn("pct50", F.percentile_approx("amount", 0.5).over(
Window.partitionBy("customer")
))Joins
orders = spark.read.parquet("data/orders/")
products = spark.read.parquet("data/products/")
events = spark.read.parquet("data/events/")
# Tipos de join
orders.join(products, "product_id", "inner")
orders.join(products, "product_id", "left")
orders.join(products, "product_id", "right")
orders.join(products, "product_id", "full")
orders.crossJoin(products) # produto cartesiano
# Semi e Anti join
orders.join(products, "product_id", "left_semi") # apenas ordens com produto
orders.join(products, "product_id", "left_anti") # apenas ordens sem produto
# Múltiplas colunas
orders.join(products,
(orders.product_id == products.id) & (orders.region == products.region),
"inner"
)
# Broadcast hint — forçar broadcast de tabela pequena
from pyspark.sql.functions import broadcast
orders.join(broadcast(products), "product_id", "inner")
# Spark faz broadcast automático se tabela < spark.sql.autoBroadcastJoinThreshold (10MB padrão)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", str(50 * 1024 * 1024)) # 50MB
# Sort-Merge Join (padrão para tabelas grandes — eficiente com shuffle)
# Verificar com explain()
orders.join(products, "product_id").explain(mode="formatted")
# Skew Join — AQE automático
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Skew Join manual — salting
import pyspark.sql.functions as F
salt_factor = 10
orders_s = orders.withColumn("salt", (F.rand() * salt_factor).cast("int")) \
.withColumn("key_salted", F.concat_ws("_", "product_id", "salt"))
products_s = products.withColumn("salt", F.explode(F.array([F.lit(i) for i in range(salt_factor)]))) \
.withColumn("key_salted", F.concat_ws("_", "id", "salt"))
orders_s.join(products_s, "key_salted", "inner")Spark SQL
# Registrar view e consultar
orders.createOrReplaceTempView("orders")
products.createOrReplaceTempView("products")
result = spark.sql("""
SELECT
o.customer,
p.category,
SUM(o.amount) AS total,
COUNT(*) AS num_orders
FROM orders o
JOIN products p ON o.product_id = p.id
WHERE o.status = 'done'
AND o.ts >= DATE_TRUNC('month', CURRENT_DATE)
GROUP BY 1, 2
ORDER BY total DESC
""")
# CTE
spark.sql("""
WITH monthly AS (
SELECT customer, DATE_FORMAT(ts, 'yyyy-MM') AS month, SUM(amount) AS total
FROM orders
GROUP BY 1, 2
),
ranked AS (
SELECT *, ROW_NUMBER() OVER (PARTITION BY customer ORDER BY total DESC) AS rn
FROM monthly
)
SELECT * FROM ranked WHERE rn = 1
""")
# LATERAL VIEW EXPLODE
spark.sql("""
SELECT order_id, tag
FROM orders
LATERAL VIEW EXPLODE(tags) t AS tag
""")
# QUALIFY (filtra após window function — Spark 3.4+)
spark.sql("""
SELECT *
FROM orders
QUALIFY ROW_NUMBER() OVER (PARTITION BY customer ORDER BY ts DESC) = 1
""")
# CASE WHEN
spark.sql("""
SELECT order_id,
CASE WHEN amount >= 500 THEN 'gold'
WHEN amount >= 100 THEN 'silver'
ELSE 'bronze' END AS tier
FROM orders
""")
# Funções de data
spark.sql("""
SELECT
DATE_TRUNC('week', ts) AS week_start,
DATE_FORMAT(ts, 'yyyy-MM-dd') AS date_str,
REGEXP_REPLACE(customer, '\\s+', '_') AS customer_slug
FROM orders
""")Funções Built-in
from pyspark.sql import functions as F
# --- Strings ---
df.withColumn("full", F.concat(F.col("first"), F.lit(" "), F.col("last")))
df.withColumn("parts", F.split(F.col("tags"), ","))
df.withColumn("clean", F.regexp_replace(F.col("email"), r"\s+", ""))
df.withColumn("name", F.trim(F.col("name")))
df.withColumn("sub", F.substring(F.col("code"), 1, 3))
df.withColumn("upper", F.upper(F.col("name")))
df.withColumn("len", F.length(F.col("name")))
# --- Datas ---
df.withColumn("date", F.to_date(F.col("date_str"), "yyyy-MM-dd"))
df.withColumn("ts", F.to_timestamp(F.col("ts_str"), "yyyy-MM-dd HH:mm:ss"))
df.withColumn("d7", F.date_add(F.col("date"), 7))
df.withColumn("diff", F.datediff(F.col("end_date"), F.col("start_date")))
df.withColumn("fmt", F.date_format(F.col("ts"), "yyyy/MM/dd"))
df.withColumn("yr", F.year(F.col("ts")))
df.withColumn("mo", F.month(F.col("ts")))
df.withColumn("dy", F.dayofmonth(F.col("ts")))
df.withColumn("trunc", F.date_trunc("month", F.col("ts")))
# --- Arrays ---
df.withColumn("exploded", F.explode(F.col("tags"))) # 1 row per element
df.withColumn("outer", F.explode_outer(F.col("tags"))) # mantém nulls
df.withColumn("has_vip", F.array_contains(F.col("tags"), "vip"))
df.withColumn("sz", F.size(F.col("tags")))
df.withColumn("flat", F.flatten(F.col("nested_arrays")))
# transform (Spark 3.1+)
df.withColumn("upper_tags", F.transform(F.col("tags"), lambda x: F.upper(x)))
df.withColumn("long_tags", F.filter(F.col("tags"), lambda x: F.length(x) > 3))
df.withColumn("total", F.aggregate(F.col("amounts"), F.lit(0.0),
lambda acc, x: acc + x))
# --- JSON ---
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
payload_schema = StructType([
StructField("product_id", StringType()),
StructField("price", DoubleType()),
])
df.withColumn("parsed", F.from_json(F.col("payload"), payload_schema))
df.withColumn("json_str", F.to_json(F.struct("order_id", "amount")))
df.withColumn("pid", F.get_json_object(F.col("payload"), "$.product_id"))Leitura e Escrita
# Parquet (compressão recomendada)
df.write \
.mode("overwrite") \
.option("compression", "snappy") \ # ou "zstd" (melhor ratio)
.parquet("s3a://bucket/orders/")
# Particionado por coluna
df.write \
.mode("overwrite") \
.partitionBy("year", "month") \
.parquet("s3a://bucket/orders_partitioned/")
# Bucketing (para joins frequentes — requer tabela Hive)
df.write \
.mode("overwrite") \
.bucketBy(64, "customer_id") \
.sortBy("customer_id") \
.saveAsTable("orders_bucketed")
# ORC
df.write.mode("append").orc("data/orc/orders/")
# CSV
df.write.mode("overwrite").option("header", True).csv("output/orders_csv/")
# JSON
df.write.mode("overwrite").json("output/orders_json/")
# JDBC
df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://host:5432/db") \
.option("dbtable", "orders_staging") \
.option("user", "usr").option("password", "pwd") \
.mode("append").save()
# repartition vs coalesce
df.repartition(50).write.parquet("...") # shuffle, redistribuição uniforme
df.coalesce(10).write.parquet("...") # sem shuffle, só reduz partições
df.repartition(50, "customer_id").write.parquet("...") # partições por coluna (hash)Structured Streaming
# Source — Kafka
stream_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "orders-topic") \
.option("startingOffsets", "earliest") \
.load()
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql import functions as F
order_schema = StructType([
StructField("order_id", StringType()),
StructField("amount", DoubleType()),
StructField("ts", TimestampType()),
])
orders_stream = stream_df \
.select(F.from_json(F.col("value").cast("string"), order_schema).alias("data")) \
.select("data.*")
# Source — arquivo (monitorar pasta)
file_stream = spark.readStream \
.schema(order_schema) \
.option("maxFilesPerTrigger", "10") \
.parquet("s3a://bucket/landing/")
# Watermark + window aggregation
windowed = orders_stream \
.withWatermark("ts", "10 minutes") \
.groupBy(F.window("ts", "5 minutes"), "order_id") \
.agg(F.sum("amount").alias("total"))
# Output modes
# append — apenas novas linhas (sem agregação ou com watermark)
# complete — toda a tabela a cada micro-batch
# update — apenas linhas alteradas
# Sink — console (debug)
query = windowed.writeStream \
.outputMode("update") \
.format("console") \
.trigger(processingTime="30 seconds") \
.start()
# Sink — Parquet com checkpoint
query = orders_stream.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "s3a://bucket/streaming/orders/") \
.option("checkpointLocation", "s3a://bucket/checkpoints/orders/") \
.trigger(processingTime="1 minute") \
.start()
# Trigger once / availableNow (batch incremental)
query = file_stream.writeStream \
.trigger(availableNow=True) \
.format("delta") \
.option("checkpointLocation", "/checkpoints/orders/") \
.start("data/delta/orders/")
# foreachBatch — lógica customizada por micro-batch
def process_batch(batch_df, batch_id):
batch_df.persist()
batch_df.write.mode("append").parquet("output/orders/")
batch_df.unpersist()
orders_stream.writeStream \
.foreachBatch(process_batch) \
.option("checkpointLocation", "/checkpoints/") \
.start()
query.awaitTermination()
query.stop()Performance e Otimização
Catalyst + Tungsten
- Catalyst: otimizador lógico/físico — predicate pushdown, column pruning, constant folding, join reorder.
- Tungsten: execução off-heap, codegen, operações binárias sem serialização Java.
# Verificar plano de execução
df.explain() # plano físico simples
df.explain(mode="extended") # lógico + físico
df.explain(mode="formatted") # mais legível (Spark 3+)
df.explain(mode="cost") # com estimativas de custo
# Predicate pushdown — filtrar antes do join
# RUIM:
big.join(small, "id").filter("amount > 100")
# BOM:
big.filter("amount > 100").join(small, "id")
# Column pruning — selecionar apenas o necessário antes de operações caras
df.select("order_id", "amount", "customer").groupBy("customer").sum("amount")
# AQE — Adaptive Query Execution (Spark 3+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# AQE ajusta shuffle.partitions, converte sort-merge em broadcast, trata skew automaticamente
# shuffle.partitions tuning
# Regra: ~128MB por partição após shuffle
# Para 10GB de dados: 10*1024 / 128 ≈ 80 partições
spark.conf.set("spark.sql.shuffle.partitions", "80")
# Cache vs Persist
df_heavy = df.join(products, "product_id").filter("amount > 0")
df_heavy.cache() # MEMORY_ONLY — perde se não couber
df_heavy.persist() # MEMORY_AND_DISK — mais seguro
# Usar depois:
df_heavy.count() # força materialização
df_heavy.unpersist() # liberar quando não precisar mais
# Evitar UDFs Python sempre que possível — quebram Catalyst
# Preferir F.when, F.expr, funções nativas do módulo functions
# Detectar skew
df.groupBy("customer_id").count().orderBy(F.col("count").desc()).show(20)Delta Lake
# Requer: pip install delta-spark
# spark = SparkSession.builder.config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")
# .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
# .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
# .getOrCreate()
from delta.tables import DeltaTable
# Criar tabela Delta
df.write.format("delta").mode("overwrite").save("data/delta/orders/")
# Criar via SQL
spark.sql("""
CREATE TABLE IF NOT EXISTS orders (
order_id INT,
customer STRING,
amount DOUBLE,
status STRING,
ts TIMESTAMP
)
USING delta
LOCATION 'data/delta/orders/'
PARTITIONED BY (status)
""")
# MERGE INTO (upsert) — SQL
spark.sql("""
MERGE INTO orders AS t
USING orders_staging AS s
ON t.order_id = s.order_id
WHEN MATCHED THEN
UPDATE SET t.amount = s.amount, t.status = s.status
WHEN NOT MATCHED THEN
INSERT *
""")
# MERGE — PySpark API
dt = DeltaTable.forPath(spark, "data/delta/orders/")
dt.alias("t").merge(
orders_staging.alias("s"),
"t.order_id = s.order_id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# UPDATE e DELETE
dt.update(
condition = F.col("status") == "pending",
set = {"status": F.lit("cancelled")}
)
dt.delete(F.col("ts") < F.lit("2020-01-01").cast("timestamp"))
# Time Travel
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load("data/delta/orders/")
df_ts = spark.read.format("delta").option("timestampAsOf", "2024-01-01").load("data/delta/orders/")
spark.sql("SELECT * FROM orders VERSION AS OF 3")
spark.sql("DESCRIBE HISTORY orders")
# VACUUM — remover arquivos antigos (padrão: 7 dias de retenção)
spark.sql("VACUUM orders RETAIN 168 HOURS")
dt.vacuum(retentionHours=168)
# OPTIMIZE + ZORDER — compactar e ordenar arquivos
spark.sql("OPTIMIZE orders ZORDER BY (customer, ts)")
# Change Data Feed — capturar mudanças incrementais
spark.conf.set("spark.databricks.delta.properties.defaults.enableChangeDataFeed", "true")
spark.sql("ALTER TABLE orders SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')")
cdf = spark.read.format("delta") \
.option("readChangeData", "true") \
.option("startingVersion", 5) \
.load("data/delta/orders/")
# Colunas extras: _change_type (insert/update_preimage/update_postimage/delete)Delta vs Parquet puro
| Recurso | Delta | Parquet |
|---|---|---|
| ACID transactions | Sim | Não |
| Schema enforcement | Sim | Não |
| Time travel | Sim | Não |
| MERGE/UPDATE/DELETE | Sim | Não |
| Change Data Feed | Sim | Não |
| Leitura pura (sem escritas) | Ligeiramente mais lento | Mais rápido |
PySpark Boas Práticas
# Type hints em todas as funções
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from typing import List
def filter_active_orders(df: DataFrame, min_amount: float = 0.0) -> DataFrame:
"""Retorna apenas pedidos ativos acima de min_amount."""
return df.filter(
(F.col("status") == "done") & (F.col("amount") > min_amount)
)
def enrich_with_products(
orders: DataFrame,
products: DataFrame,
join_cols: List[str] = ["product_id"]
) -> DataFrame:
return orders.join(F.broadcast(products), join_cols, "left")
# Nunca collect() em dados grandes — estoura memória do driver
# RUIM:
all_orders = df.collect() # traz TUDO para o driver
# BOM:
df.write.parquet("output/") # processa distribuído
sample = df.limit(1000).collect() # amostra pequena
# Schema enforcement — nunca inferSchema em produção
ORDERS_SCHEMA = StructType([
StructField("order_id", IntegerType(), False),
StructField("customer", StringType(), True),
StructField("amount", DoubleType(), True),
])
df = spark.read.schema(ORDERS_SCHEMA).csv("data/orders/")
# Logging — no driver (print/logging padrão); nos executors usar accumulators
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.info("Lendo %d registros", df.count())
# Estrutura de projeto
# projeto/
# ├── jobs/
# │ ├── orders_etl.py # entry-point spark-submit
# │ └── products_sync.py
# ├── transformations/
# │ ├── orders.py # funções puras DataFrame → DataFrame
# │ └── products.py
# ├── schemas/
# │ └── orders.py # StructType definitions
# ├── tests/
# │ ├── conftest.py # fixture SparkSession
# │ └── test_orders.py
# └── requirements.txt
# pytest + SparkSession local (conftest.py)
import pytest
from pyspark.sql import SparkSession
@pytest.fixture(scope="session")
def spark() -> SparkSession:
return SparkSession.builder \
.master("local[2]") \
.appName("tests") \
.config("spark.sql.shuffle.partitions", "2") \
.getOrCreate()
# test_orders.py
def test_filter_active_orders(spark: SparkSession):
data = [(1, "done", 200.0), (2, "pending", 50.0), (3, "done", 10.0)]
df = spark.createDataFrame(data, ["order_id", "status", "amount"])
result = filter_active_orders(df, min_amount=100.0)
assert result.count() == 1
assert result.collect()[0]["order_id"] == 1
# spark-submit — empacotar dependências
# spark-submit \
# --master yarn \
# --deploy-mode cluster \
# --py-files dist/libs.zip \
# --files config/prod.yaml \
# jobs/orders_etl.py \
# --config prod.yaml
# Evitar
# - groupByKey em RDD (prefira reduceByKey)
# - UDFs Python sem necessidade (use funções nativas)
# - collect() em DataFrames grandes
# - inferSchema=true em produção
# - shuffle.partitions padrão (200) sem ajuste
# - joins sem verificar broadcast eligibility
---
## MLlib — Pipelines de Machine Learning
O MLlib é a biblioteca de Machine Learning do Spark. A abstração central é o `Pipeline`, que encadeia `Transformers` (transformam DataFrames) e `Estimators` (treinam modelos) em um fluxo reproduzível. Hyperparameter tuning é feito com `ParamGridBuilder` + `CrossValidator` ou `TrainValidationSplit`.
```python
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# ── Dados de exemplo ─────────────────────────────────────────────
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("MLlib-Demo").getOrCreate()
data = spark.createDataFrame([
(0, 1.0, 0.5, 2.1, "cat_a"),
(1, 2.0, 1.5, 0.8, "cat_b"),
(1, 3.0, 2.0, 1.2, "cat_a"),
(0, 0.5, 0.1, 3.0, "cat_b"),
(1, 2.5, 1.8, 1.5, "cat_a"),
], ["label", "feat1", "feat2", "feat3", "category"])
train, test = data.randomSplit([0.8, 0.2], seed=42)
# ── Etapa 1: StringIndexer — categoria → índice numérico ─────────
# Transformer: transforma coluna sem treinar
indexer = StringIndexer(inputCol="category", outputCol="category_idx")
# ── Etapa 2: VectorAssembler — colunas → vetor de features ───────
# Transformer: combina colunas numéricas em um único vetor
assembler = VectorAssembler(
inputCols=["feat1", "feat2", "feat3", "category_idx"],
outputCol="features_raw"
)
# ── Etapa 3: StandardScaler — normalizar features ────────────────
# Estimator: aprende média/desvio no treino, transforma nos dois splits
scaler = StandardScaler(
inputCol="features_raw",
outputCol="features",
withMean=True,
withStd=True
)
# ── Etapa 4: LogisticRegression — modelo de classificação ────────
# Estimator: treina e retorna um Transformer (LogisticRegressionModel)
lr = LogisticRegression(
featuresCol="features",
labelCol="label",
maxIter=100,
regParam=0.01
)
# ── Pipeline — encadeia todas as etapas ──────────────────────────
pipeline = Pipeline(stages=[indexer, assembler, scaler, lr])
# Sem tuning: treinar diretamente
model = pipeline.fit(train)
predictions = model.transform(test)
predictions.select("label", "prediction", "probability").show()
# ── ParamGrid + CrossValidator — hyperparameter tuning ───────────
param_grid = (ParamGridBuilder()
.addGrid(lr.regParam, [0.01, 0.1, 1.0]) # força de regularização
.addGrid(lr.maxIter, [50, 100])
.addGrid(lr.elasticNetParam, [0.0, 0.5]) # 0=L2, 1=L1
.build()) # gera 3 × 2 × 2 = 12 combinações
evaluator = BinaryClassificationEvaluator(
labelCol="label",
metricName="areaUnderROC"
)
cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=3, # 3-fold cross-validation
parallelism=4, # avalia 4 combinações em paralelo
seed=42
)
cv_model = cv.fit(train)
# Melhor modelo e seus parâmetros
best_model = cv_model.bestModel
best_lr = best_model.stages[-1] # último estágio = LR treinado
print("Melhor regParam:", best_lr.getRegParam())
print("Melhor maxIter: ", best_lr.getMaxIter())
# AUC no conjunto de teste
auc = evaluator.evaluate(cv_model.transform(test))
print(f"AUC no teste: {auc:.4f}")
# ── Salvar e carregar pipeline ────────────────────────────────────
cv_model.bestModel.write().overwrite().save("s3://bucket/models/lr_pipeline")
from pyspark.ml import PipelineModel
loaded = PipelineModel.load("s3://bucket/models/lr_pipeline")
loaded.transform(test).show()SQL Optimizer Hints
Hints permitem instruir o Catalyst Optimizer a usar uma estratégia específica de join ou particionamento, substituindo as decisões automáticas quando você tem conhecimento do domínio que o otimizador não tem.
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("Hints-Demo").getOrCreate()
# Dados de exemplo
orders = spark.range(10_000_000).withColumnRenamed("id", "order_id")
products = spark.range(500).withColumnRenamed("id", "product_id")
# Registrar como views para uso em SQL puro
orders.createOrReplaceTempView("orders")
products.createOrReplaceTempView("products")
# ── BROADCAST — forçar broadcast join ────────────────────────────
# Use quando uma tabela é pequena o suficiente para caber em memória de cada executor
# Elimina o shuffle: a tabela pequena é copiada para todos os nodes
result = spark.sql("""
SELECT /*+ BROADCAST(p) */ o.order_id, p.product_id
FROM orders o
JOIN products p ON o.order_id = p.product_id
""")
# Equivalente via DataFrame API:
from pyspark.sql.functions import broadcast
result_df = orders.join(broadcast(products), "order_id")
# ── MERGE — forçar sort-merge join ────────────────────────────────
# Use quando duas tabelas grandes já estão ordenadas pela chave de join
# Evita hash join que pode causar OOM com dados muito grandes
result2 = spark.sql("""
SELECT /*+ MERGE(o, p) */ o.order_id, p.product_id
FROM orders o
JOIN products p ON o.order_id = p.product_id
""")
# ── SHUFFLE_HASH — forçar shuffle hash join ───────────────────────
# Use quando uma tabela é moderada (não cabe em broadcast, mas é menor que a outra)
# Constrói hash table em memória por partição — mais rápido que sort-merge se couber
result3 = spark.sql("""
SELECT /*+ SHUFFLE_HASH(p) */ o.order_id, p.product_id
FROM orders o
JOIN products p ON o.order_id = p.product_id
""")
# ── REPARTITION — controlar número de partições com shuffle ───────
# Use para aumentar paralelismo antes de operações pesadas
# Diferente de coalesce: faz shuffle, pode aumentar ou diminuir partições
result4 = spark.sql("""
SELECT /*+ REPARTITION(200) */ * FROM orders WHERE order_id > 1000
""")
# Com coluna específica (útil antes de joins/groupBy pela mesma chave)
result5 = spark.sql("""
SELECT /*+ REPARTITION(100, order_id) */ * FROM orders
""")
# ── COALESCE — reduzir partições SEM shuffle ─────────────────────
# Use para reduzir o número de partições antes de writes (evita muitos arquivos pequenos)
# Não pode aumentar partições — use REPARTITION para isso
result6 = spark.sql("""
SELECT /*+ COALESCE(10) */ * FROM orders WHERE order_id < 1000
""")
# ── Quando usar cada hint ─────────────────────────────────────────
# BROADCAST → tabela pequena (<= spark.sql.autoBroadcastJoinThreshold, padrão 10 MB)
# força mesmo acima do threshold se você sabe que cabe
# MERGE → ambas as tabelas grandes e já particionadas/ordenadas pela chave
# SHUFFLE_HASH → tabela do lado menor é moderada; OOM em sort-merge (dados skewed)
# REPARTITION → antes de joins/groupBy para alinhar particionamento
# COALESCE → antes de df.write para consolidar arquivos de saída
# ── Verificar no plano de execução ───────────────────────────────
# explain("cost") mostra estimativas de custo e o plano escolhido
spark.sql("""
SELECT /*+ BROADCAST(p) */ o.order_id, p.product_id
FROM orders o JOIN products p ON o.order_id = p.product_id
""").explain("cost")
# Procure por: BroadcastHashJoin, SortMergeJoin, ShuffledHashJoin no plano físico
# "Statistics" mostra estimativas de linhas e bytes que guiam o otimizadorAdaptive Query Execution (AQE)
O AQE é um otimizador dinâmico introduzido no Spark 3.0 que reotimiza o plano de execução em tempo de execução, usando estatísticas reais coletadas durante o processamento — diferente do Catalyst que só usa estatísticas estáticas (de catálogo).
O que o AQE resolve
| Problema | Solução AQE |
|---|---|
shuffle.partitions=200 gera partições minúsculas | coalescePartitions mescla partições pequenas automaticamente |
| Estimativas ruins de cardinalidade escolhem join errado | Replaneja joins com dados reais |
| Skew em joins causa straggler tasks | skewJoin divide partições grandes automaticamente |
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local[*]") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.adaptive.skewJoin.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.minPartitionSize", "64mb") \
.config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128mb") \
.getOrCreate()
# ── spark.sql.adaptive.enabled ────────────────────────────────────
# Ativa o AQE. Padrão: true no Spark 3.2+
# Com AQE, o plano é reotimizado em cada "query stage" (após cada shuffle)
# ── coalescePartitions — anti small-files ────────────────────────
# Após shuffle com 200 partições, se os dados resultantes forem pequenos,
# AQE mescla as partições para atingir ~128 MB cada
# advisoryPartitionSizeInBytes: tamanho alvo por partição após coalesce
# minPartitionSize: não mescla abaixo deste tamanho (evitar partições gigantes)
# ── skewJoin — lidar com dados desbalanceados ─────────────────────
# Se uma partição for > spark.sql.adaptive.skewJoin.skewedPartitionFactor (padrão 5x)
# a mediana E > spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (padrão 256 MB),
# o AQE divide essa partição automaticamente e duplica o lado oposto do join
# Configurações avançadas de skewJoin
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256mb")
# ── Verificar se AQE está ajudando via Spark UI ───────────────────
# 1. Spark UI → aba "SQL/DataFrame" → clique no job
# 2. No DAG, procure por nós com "(AQE)" no nome:
# - "AQEShuffleRead" → partições foram mescladas/divididas
# - "CustomShuffleReader" → AQE customizou a leitura do shuffle
# 3. Aba "Stages" → compare "Input Size" vs "Output Size" por stage
# → partições muito pequenas ou muito grandes indicam oportunidade de tuning
# 4. Stage com "skewed" na descrição → skewJoin foi acionado
# ── Diagnóstico via explain ───────────────────────────────────────
df = spark.range(10_000_000).groupBy("id").count()
df.explain(True)
# Com AQE, o plano "analyzed" e "optimized" são estáticos;
# o plano "executed" (disponível após a action) mostra as decisões reais do AQE
# ── Desabilitar AQE seletivamente (debug) ────────────────────────
spark.conf.set("spark.sql.adaptive.enabled", "false")
df.explain("cost") # ver plano sem AQE para comparar
spark.conf.set("spark.sql.adaptive.enabled", "true")