Spark Job Optimisation

Spark Job Optimisation

Optimising running Spark jobs involves improving resource utilisation, execution efficiency, and reducing overall runtime.

Here are steps and best practices to optimise Spark jobs:

1. Analyze and Debug Existing Jobs

  • Enable Spark UI and Logs: Use the Spark UI to identify bottlenecks, such as skewed data, long-running tasks, or shuffle issues.
  • Inspect Stages and Tasks: Look for stages taking too long or tasks with imbalanced execution times.
  • Enable Job Metrics: Set up metrics sinks like Graphite, Ganglia, or Prometheus for better observability.


2. Optimize Data Operations

  • Repartition and Coalesce:

Use repartition() to evenly distribute data across partitions for parallelism.

Use coalesce() for reducing the number of partitions if less parallelism is needed.

  • Avoid Skewed Data:

Use salting techniques or partition your data based on more balanced keys.

Use randomSplit() or aggregate less-skewed data before joining.

  • Optimize Shuffles:

Minimise the use of wide transformations like join, groupBy, or distinct.

Use broadcast() joins when one dataset is small enough to fit in memory


3. Tune Resource Allocation

  • Adjust Executors and Cores:

Balance the number of executors and cores per executor.

Typically, allocate one core per task. Over-provisioning can lead to task failures or resource contention.

  • Memory Settings:

Increase executor memory (spark.executor.memory) if you encounter memory errors.

Tune JVM settings, such as `spark.executor.memoryOverhead` and `spark.driver.memory`.

  • Dynamic Allocation:

Enable spark.dynamicAllocation to adjust resources automatically based on workload.


4. Use Efficient Data Formats

File Formats:

  • Use columnar formats like Parquet or ORC for analytical queries.
  • Prefer compressed data formats (e.g., Snappy, Gzip) for faster I/O.

Partitioning:

  • Partition large datasets by frequently queried columns.

Caching and Persistence:

  • Cache hot data using persist() with appropriate storage levels (e.g., MEMORY_AND_DISK).
  • Avoid caching unnecessary data to free up memory.


5. Optimize Configurations

  • Serialization: Use Kryo serialization (spark.serializer=org.apache.spark.serializer.KryoSerializer) for better performance. Register custom classes to Kryo if applicable.
  • Parallelism: Set spark.sql.shuffle.partitions and spark.default.parallelism appropriately based on the size of the dataset and cluster resources.
  • Broadcast Joins: Configure spark.sql.autoBroadcastJoinThreshold to control when Spark uses broadcast joins.
  • Adaptive Query Execution (AQE): Enable AQE (spark.sql.adaptive.enabled=true) for dynamic query optimization.


Example Tuning Scenario

For a Spark job that reads a large Parquet file, groups data by a key, and writes it back:

scala  
// Example code snippet
val df = spark.read.parquet("path/to/data")
val groupedDF = df.groupBy("key").agg(sum("value").as("total"))

// Cache if reused later
groupedDF.cache()

// Write optimized data
groupedDF.write.mode("overwrite").parquet("path/to/output")        

Optimization Steps:

  1. Set spark.sql.shuffle.partitions to a reasonable value (e.g., 200 for small clusters, 1000+ for large).
  2. Use broadcast join if joining with a small dataset.

To view or add a comment, sign in

Insights from the community

Others also viewed

Explore topics