Top 5 Hurdles in High-Stakes Big Data Leveraging Distributed Compute
I) Overview
To understand why Read, Map, Reduce, Shuffle, Reduce, and Write will always be the most significant task, when considering performance, in any distributed environment we must first set the foundation for what a distributed environment is and how it works. In this blog, I provide a brief overview of distributed compute. The focus of the blog the five most common performance problems when doing distributed compute. Distributed computing in the cloud represents the frontier of modern technology, a space where the vastness of computational power meets the agility of cloud-based services. Picture this: an intricate web of servers scattered across the globe, each contributing a thread of processing power to weave a tapestry of unparalleled functionality. This is the essence of distributed computing in the Cloud – a symphony of interconnected nodes harmoniously working together to shoulder complex, data-intensive tasks with astounding efficiency and reliability.
Distributed Systems Explained
Imagine you're opening a quaint little bistro called "Memo's Café" that's renowned for its unique service. Patrons call in, and share their favorite memories or messages, and these are meticulously penned into personalized journals. When they ring back, a friendly voice recites their cherished words, reviving the moment with delightful precision.
As word spreads about the warmth of your service, calls flood in. To keep up with demand, you install a second phone line and bring in a second scribe. Now, with two phones and two scribes, Memo's Café is a distributed operation, akin to distributed computing where tasks are spread across multiple processors.
However, as your café grows, it begins to face challenges similar to those in distributed computing, particularly those outlined by the CAP Theorem. In computing, CAP stands for Consistency, Availability, and Partition Tolerance, and the theorem posits that in a distributed system, you can only guarantee two out of these three aspects at any given time.
Here's how it plays out in Memo's Café:
As Memo's Café flourishes, ensuring that each scribe's journal is a perfect mirror of the other (Consistency) while answering every call that comes (Availability) becomes more complex. If a scribe is sick one day (a partition), and a patron calls on their line, how do you ensure the memory is accurately recounted from the other scribe's journal? You could slow down service to make sure the journals are always identical, but then you might not answer every call (sacrificing Availability). Or you could answer every call promptly, but risk occasional inaccuracies if the scribes' journals aren't perfectly synced (sacrificing Consistency).
In essence, Memo's Café, in its charming simplicity, encapsulates the delicate balance of distributed computing - the art of ensuring that, even as the café scales up, each patron's call is as rewarding as the one before, without a moment lost in waiting or a memory compromised in the telling.
In this blog, we embark on an exploratory journey through the dynamic realm of distributed computing. We'll demystify how cloud services harness the collective strength of remote servers, allowing for scalable, resilient, and highly available systems. Whether it’s accelerating data analysis, powering extensive machine learning models, or simply hosting a multitude of applications, distributed compute in the Cloud is the backbone that supports these heavy lifters without breaking a sweat.
From the foundational principles that guide distributed systems to the intricate dance of load balancing and fault tolerance, we will navigate the nuances that make this technology a cornerstone of IT infrastructure. So buckle up, as we dive into the vast ocean of distributed compute, unraveling its complexities and understanding how it's reshaping the way we interact with technology in our inexorably connected world.
II) Distributed Compute Introduction
Big data platforms often encounter a variety of performance problems, some of the most common being Spill, Skew, Shuffle, Storage, and Serialization. Here's a brief description of each:
Addressing these issues typically involves a combination of system tuning, code optimization, and sometimes architectural changes to ensure that the big data platform can handle the scale and complexity of the tasks efficiently.
III) The 5 Most Common Performance Problems with Distributed Compute (The 5 Ss)
Common Problems
Distributed compute environments are complex to monitor, measure, and manage. Distributed computing is likened to overseeing a crowded, dynamic city or conducting an orchestra, with the complex task of real-time monitoring akin to observing every individual in a crowd. Measuring its productivity parallels assessing the varied tasks of numerous office workers, requiring consideration of numerous, complex metrics. Managing a distributed system is compared to conducting an orchestra, ensuring each part is in harmony and adjusting as necessary. Just as a conductor addresses the performance of each musician, IT managers must manage resources, maintain communication, and ensure data consistency across the system, while addressing failures and keeping the system efficient and responsive to user needs. This illustrates the complex, yet critical role of IT professionals in maintaining distributed compute environments. The most egregious problems fall into one of five categories:
Why It's So Hard
Identifying problems in a distributed compute is challenging because of the symbiotic relationship between the 5 Ss. Problems are hard when one problem can cause another. Skew can induce Spill.Storage issues can induce excess Shuffle.Incorrectly addressing Shuffle can exacerbate Skew. Many of these problems can be present simultaneously.
Distributed compute environments, much like a bustling city during peak hours, present a complexity in their operation that is multifaceted and continuous. Imagine trying to observe every individual in a crowd, tracking their movements, understanding their interactions, and predicting their next steps—all in real-time. This is akin to the challenge of monitoring a distributed system, where countless processes and tasks are executing simultaneously across various nodes in the cloud.
When we talk about measuring such an environment, think of attempting to assess the productivity of each individual in a large office. Some tasks are short and quick, others long and involved. Some processes depend on the completion of others, and some can be carried out in parallel. The metrics needed to gauge efficiency, resource usage, and throughput are diverse and complex, just as in distributed computing, where you must consider factors like network latency, CPU and memory utilization, and input/output operations.
Managing these environments, then, is akin to being the conductor of a grand orchestra. Each musician plays a different instrument, contributing their unique sound to the symphony. The conductor must ensure that each musician is in sync, entering and exiting the composition at the precise moment, maintaining the right tempo, and adjusting the volume. Similarly, in distributed compute, managing entails deploying resources efficiently, orchestrating communication and data consistency across the network, and ensuring that all components interact harmoniously to achieve the desired output.
Each node in a distributed system can be thought of as an individual musician. Some may perform flawlessly, while others might falter. The conductor, or the IT manager in this case, must not only address these individual inconsistencies but also understand how they impact the overall performance. They must deal with the possibility of system failures, data inconsistencies, and the challenge of maintaining the balance as per the CAP theorem, all while ensuring the ensemble performs without interruption to the end-user experience.
This analogy underscores the intricate dance that IT professionals perform daily to ensure that distributed compute environments are not just operational, but also efficient, resilient, and able to scale according to demand—all without missing a beat.
Benchmarking
Benchmarking Apache Spark alongside its peers in the distributed systems arena is a meticulous endeavor, much like evaluating the performance of elite athletes. Each system, whether it's Hadoop, Flink, or Storm, brings its unique strengths to the race. Spark may shine in iterative processing, leveraging in-memory computations, while others might excel in stream processing or fault tolerance. The crux of benchmarking lies in setting up a series of standardized tests that measure data processing speed, scalability, and resilience under varied workloads. [1] [2] By simulating real-world scenarios, from batch processing to real-time analytics, we can gauge how each framework performs, not just in isolation but more importantly, in concert with the complex demands of big data workflows. This meticulous assessment helps in identifying the right tool for the right job, ensuring that the chosen framework aligns with the specific performance criteria and business goals at hand. There are generally three common approaches to benchmarking:
IV) Spill
When your system's memory is drowning in data, "spill" becomes the lifeboat that saves it from crashing—yet this very solution could be silently sinking your distributed system's performance. In distributed systems, "spill" refers to the event where data that cannot be held in a system's primary memory (RAM) due to insufficient space is temporarily written to secondary storage, such as a disk. This is often a mechanism to prevent an Out of Memory (OOM) error when dealing with large volumes of data that exceed the memory capacity. While it allows the system to continue processing, spilling to disk is significantly slower than processing in RAM, leading to decreased performance. Therefore, frequent spilling is a common bottleneck in distributed systems and can be indicative of suboptimal resource allocation or data partitioning strategies.
The term "spill" in the context of Apache Spark refers to the process where a Resilient Distributed Dataset (RDD) is transferred from the computer's RAM to disk storage and then later reloaded into RAM. This spillage happens when a partition of data is too large to be accommodated by the available RAM. To manage this, Spark resorts to potentially costly disk operations to read and write data, thereby freeing up RAM. As in distributed systems, these operations are undertaken to prevent an OOM error, which can halt the execution of a program.
Spilling in distributed systems, especially within Apache Spark, is a necessary but performance-impacting process where data overflows from RAM to disk and back again to avoid OOM errors. This often signals deeper issues with resource allocation and can severely slow down data processing. To maintain efficiency and speed in your distributed computing tasks, it's crucial to optimize your data partitioning and resource management strategies. Don't let spillage be your bottleneck—take action to fine-tune your system's performance today. There are a number of ways to induce this problem:
In the Spark UI, spill is represented by two values:
The two values are always presented together. The size on disk will always be smaller due to the natural compressiongained in the act of serializing that data before writing it to disk. A couple of notes:
- Summary Metrics
- Aggregated Metrics by Executor
- The Tasks table
The Spark Spill Listener is a component that monitors spill events within Apache Spark jobs. It's designed to track when spills occur by extending the functionality of SparkListener. The Spill Listener provides methods like numSpilledStages(), which returns the number of stages that have experienced a spill, and onStageCompleted(), which is called when a stage is finished, providing details on whether a spill occurred during that stage. Similarly, onTaskEnd(SparkListenerTaskEnd taskEnd) is invoked when a task ends, which can also provide information regarding spills during the task's execution.
V) Skew
Data skew is a common issue when processing large datasets, particularly in distributed computing environments like Apache Spark. Here's an elaboration on the concept and its implications:
Understanding Data Skew
1. Definition of Skew: Data skew happens when the data distribution is uneven across different partitions or nodes in a distributed system. In simple terms, some partitions end up with much more data than others.
2. Partitioning in Spark: Apache Spark typically reads data into partitions, which are chunks of data of a certain size (commonly around 128 MB). This partitioning allows Spark to distribute the workload across multiple nodes in a cluster.
3. Causes of Skew:
- Key-based Operations: Skew often arises during operations that involve grouping or aggregating data based on keys. For instance, if a large number of records share the same key, they will all be processed in the same partition, leading to an imbalance.
- Data Characteristics: Inherent characteristics of the data, such as uneven distribution of values, can also lead to skew.
Impacts of Skew
1. Performance Degradation: A heavily skewed partition can significantly slow down processing since it takes much longer to process than other partitions. This leads to inefficient use of cluster resources, as some nodes are overburdened while others are underutilized.
2. Resource Overloads: In extreme cases, the node handling the large partition might run out of memory (OOM errors) or disk space (spill issues). This can happen because the node might not have enough resources to handle the disproportionately large amount of data.
3. Difficulties in Diagnosis: Diagnosing and resolving skew-related issues can be challenging. It's often not apparent until a job runs longer than expected or fails due to resource constraints.
Managing Data Skew
1. Detecting Skew: Monitoring tools and Spark's UI can help identify skew by showing the distribution of data across partitions.
2. Repartitioning: One common approach to address skew is to repartition the data. This can be done by increasing the number of partitions or by repartitioning based on a better-chosen key.
3. Salting Keys: In cases where skew is caused by key-based operations, 'salting' the keys (modifying them to distribute the data more evenly) can be effective.
4. Custom Partitioning: Implementing custom partitioners that are aware of the data distribution can help in distributing the data more evenly across the nodes.
5. Resource Allocation: Adjusting the resource allocation for skewed tasks can sometimes mitigate the issue, though it's more of a workaround than a solution.
In summary, data skew is a significant issue in distributed data processing that can lead to performance bottlenecks and resource overloads. Identifying and addressing skew is crucial for efficient and reliable data processing in environments like Apache Spark.
Source Data Before Aggregation
Data After Aggregation
In this example, we have 4 partitions. They should be evenly distributed after the initial ingest by Spark. Region A, B, and C might be similar IF the data is correlated around something like population. The assumption here is that Regions A, B, and C are the same size. But city D has twice the population, thus twice as many records Who knows what regions E-ZZZ might look like - it doesn't matter for this illustration.
If Region D is 2x larger than A, B or C:
The ramifications of that is:
In this example, region D has a larger amount of data compared to regions A, B, or C. As a result, the partition of data for city D is larger, containing more records. This size discrepancy means that it will take Spark longer to process the partition for Region D than the partitions for the other regions. It is implied that the processing time for region D could be up to twice as long as the time required for cities A, B, or C. This illustrates the challenges of data skew, where uneven data distribution can lead to inefficiencies in processing times within distributed computing systems like Spark.
There are three "common" solutions to Skew:
2. Databricks’ Skew Hint
3. Adaptive Skew Join (enabled by default in Spark 3.1)
Skew Mitigation Roundup
See Experiment #1596, Step C, Step D, Step E, and Step F. Especially the SQL diagram forStep E showing skew=true
VI) Storage
The choice and implementation of data structures are critically important in computer programming because they fundamentally affect the performance and efficiency of applications. Different data structures are optimized for specific types of operations; for example, arrays allow fast access to elements but can be slow for insertion and deletion, whereas linked lists facilitate quick insertion and deletion but have slower access times. Efficient data structure selection can drastically improve the performance of algorithms, impacting how quickly and effectively a program can process large amounts of data. According to Cormen et al. in "Introduction to Algorithms," understanding data structures is key to designing efficient algorithms, as the choice of an appropriate data structure can enhance the efficiency of an algorithm significantly (Cormen, Leiserson, Rivest, and Stein, 2009).
Furthermore, as Big O notation illustrates, different data structures have varying complexities (time and space complexity) for different operations (searching, insertion, deletion, etc.). The Big O notation is a mathematical representation used to describe the efficiency of an algorithm or data structure, which remains critical in evaluating performance, especially in large-scale and resource-intensive applications. Selecting the wrong data structure or algorithm can lead to performance bottlenecks, as evidenced in the work of Thomas H. Cormen and others. For instance, using an array instead of a hash table for search operations can increase the time complexity from O(1) (constant time in a hash table) to O(n) (linear time in an array), significantly impacting performance as data size grows (Cormen et al., 2009).
If you had only 60 seconds to pick up as many coins as you can, one coin at a time, which pile do you want to work from?
Recommended by LinkedIn
The tiny file problem.
See Experiment #8923
Choosing the right storage and configuration in a distributed computing environment is crucial for several reasons:
Performance Optimization: Different storage solutions offer varying performance characteristics. For example, solid-state drives (SSDs) are faster than hard disk drives (HDDs), and in-memory storage is faster than both. The choice of storage directly impacts the speed at which data can be read and written, which is critical in distributed computing where large volumes of data are processed.
Scalability: Distributed computing environments often need to scale up or down based on demand. The storage solution and configuration must be scalable to handle increasing data volumes and user load without degrading performance.
Data Accessibility and Availability: In a distributed environment, data needs to be readily accessible from different nodes. The storage configuration should ensure that data is available where and when it is needed, with minimal latency. This involves strategies like data replication and effective data distribution across the network.
Fault Tolerance and Reliability: Storage systems in distributed environments must be reliable and resilient to faults. This can involve redundancy, regular backups, and data recovery mechanisms to prevent data loss in case of hardware failures or other issues.
Cost Efficiency: Storage costs can be significant, especially at scale. It's important to choose a storage solution that is cost-effective without compromising on the necessary performance and reliability. This often involves a trade-off analysis between different types of storage media and configurations.
Data Management and Security: Efficient storage configurations help in better data management, including data lifecycle management, compliance with data regulations, and security measures. Proper configuration ensures that sensitive data is adequately protected and managed according to organizational and legal requirements.
In summary, the right storage and configuration in a distributed computing environment are essential for ensuring high performance, scalability, data availability, fault tolerance, cost efficiency, and secure data management. These factors collectively determine the overall efficiency and effectiveness of distributed computing systems.
See Experiment #8973, contrast Step B, Step C, and Step D
Step B: 1 directory, 345K files clocks in at ~1 minute
Step C: 12 directories, 6K files clocks in at ~5 seconds
Step D: 8K directories, 6k files clocks in at ~14 minutes
Inferring Schemas & Merging Schemas
Inferring schemas for JSON and CSV requires a full readof 100% of the data even when filtered - SLOWEST. For Parquet the schema is read from a single part-file-FAST. Parquet w/Schema Merging requires aread of the schema from all part-files-SLOW. For Delta, the schema is read from the transaction log - FASTER. For tables, the schema is read from the meta store - FASTEST.
NVMe & and SSD are up to 10x faster than Magnetic Drives
NVMe, which stands for Non-Volatile Memory Express, is a storage protocol designed to capitalize on the high-speed potential of solid-state drives (SSDs) in a computer or data system. Unlike traditional storage interfaces like SATA, NVMe provides a much faster connection between the storage device and the system's CPU via high-speed PCIe (Peripheral Component Interconnect Express) bus lanes. This enables significantly higher data transfer rates and lower latency compared to older storage technologies. In the context of big data systems, NVMe is increasingly used to accelerate data access and processing, enhancing overall system performance and efficiency in handling large volumes of data. This does not apply to traditional data reads - those come from cloud storage. This applies to cluster-local disk operations like readingand writing shuffle files and Delta/Parquet IO Cache
SW* = Spill Write Time found in the Query Details
Other Options To Improve Performance, Stability, and Recovery
Use Delta + Unity Catalog - Delta provides its optimizations and reading table schema from the catalog always outperforms reading from disk.Use NVMe & SSD for faster disk IO - further helps to mitigate shuffle. Employ AQE and spark.sql.adaptive.coalescePartitions.enabled tocontrol your spark partitions to minimize the production of tiny files.Employ spark.databricks.adaptive.autoOptimizeShuffle.enabled
Use Delta’s OPTIMIZE operation to compact tiny files. Consider enabling Delta’s Auto Compaction and Optimized Writes.Consider employing Databricks’ Auto Loader.
VII) Shuffle
Overview
Minimizing shuffle in distributed data systems is crucial because shuffle operations, which involve redistributing data across different nodes, can be highly resource-intensive and a major bottleneck in system performance. Shuffles require significant network I/O and can drastically slow down data processing, especially in large-scale environments. They also increase the complexity of data management and can lead to inefficiencies in resource utilization. By minimizing shuffle, you can reduce network traffic, lower latency, and improve overall system efficiency, leading to faster processing times and more optimal use of computational resources. This is particularly important in big data applications where the volume and velocity of data are very high. Shuffling is a side effect of wide transformation:
And technically some actions, e.g. count()
Don’t get hung up on trying to remove every shuffle.Shuffles are often a necessary evil.Focus on the more expensive operations instead.Many shuffle operations are quite fast.Targeting skew, spill, tiny files, etc often yield better payoffs.
Excessive Shuffle can be mitigated. Here are some examples of how excessive Shuffle can be mitigated:
It is important to optimize joins across the data landscape. We have several different options we can explore here:
Reordering the join
If we join three tables, it logically makes more sense to order the joins such that we reduce the number of records involved in each shuffle. See Figure 10 below
Scenario A
If we join Table A and Table B, we shuffle the full recordset and do not reduce the number of records giving us Table AB. Then we Table AB to Table C but Table C can be broadcasted and the result of the join is Table ABC with only 1K records
Scenario B
If we join Table A and Table C, we get the immediate benefit of the broadcast, but we also reduce the number of records to 1K in Table AC. Because Table AC is only 1K records, it can be broadcasted as it’s joined to Table B. 90% automatic because you might want to help AQE & CBO by capturing statistics for join keys and filtered columns
Bucketing
If you are bucketing datasets, you are very likely doing it wrong.Bucketing is hard to get right and is an expensive operation especially if you are bucketing a periodically changing dataset.Eliminates the sort in the Sort-Merge Join by pre-sorting partitions.The cost is paid in the production of the dataset on the assumptionthat savings will be made by frequent joins of both tables. Not worth considering for datasets less than 1-5 terabytes.
If you must bucket, consider the following when performing bucketing. To properly implement bucketing in a distributed data cloud landscape, several key elements are needed:
Effective Partitioning Strategy: Properly defining the criteria for partitioning data into buckets is crucial. This often involves understanding the data distribution and selecting the right key or attribute to partition by.
Scalable Storage Infrastructure: The storage system must be able to efficiently handle the created buckets, ensuring that they can scale and are accessible across the distributed environment.
Data Management Tools: Tools that can manage and maintain the integrity of the buckets, ensuring data consistency and reliability across the distributed system.
Resource Allocation: Adequate computational and memory resources must be allocated to handle the processing of data within these buckets efficiently.
Load Balancing: Mechanisms to ensure that the workload is evenly distributed across the buckets, preventing data skew and bottlenecks.
Security and Compliance: Ensuring that data within each bucket is secure and that the bucketing strategy complies with any relevant data governance or privacy regulations.
These elements collectively ensure that bucketing is implemented effectively, leading to improved data organization, processing efficiency, and overall system performance in a distributed data cloud landscape.
Optimize Joins
VIII) Serialization
Data serialization in a big data environment refers to the process of converting data structures or objects into a format that can be easily stored, transferred, and reconstructed. In big data contexts, this process is critical because of the need to efficiently handle large volumes of data that may be distributed across different systems and platforms. Managing the serialization of data in a big-data cloud environment is vital for several reasons:
Efficient Data Transfer: Proper serialization ensures that data can be efficiently transferred over the network. Inefficient serialization can lead to larger data sizes, increasing the time and bandwidth required for data transmission.
Interoperability: Serialization formats need to be standardized or well-understood to enable different systems within the cloud environment to exchange data seamlessly.
Performance Optimization: Efficient serialization and deserialization processes minimize the computational overhead, thereby improving the overall performance of big data applications.
Data Integrity: Correct serialization preserves the integrity of data during transfer, ensuring that it remains accurate and reliable.
Scalability: In cloud environments, where scalability is key, efficient serialization allows for the effective handling of large volumes of data without performance degradation. Properly managing serialization is, therefore, essential for optimizing data flow, maintaining data integrity, and ensuring high performance in big data cloud environments.
Why Serialization is Bad: Spark SQL and DataFrame instructions are highly optimized.All UDFs must be serialized and distributed to each executor. The parameters and return value of each UDFmust be converted for each row of data. Python UDFs take an even harder hit. The Python code has to be pickled. Spark must instantiate a Python interpreter in every Executor. The conversion of each row from Python to DataFrame costs even more.
- Step E uses Scala UDFs & clocks in at ~36 minutes
- Step F uses Scala’s Typed Transformations & clocks in at ~26 minutes
- Step D uses higher-order function clocks in at ~23 min. (same as Scala)
- Step E uses Python UDFs & clocks in at ~1 hour & 45 min.
- Step F uses Python and Vectorized UDFS & clocks in at ~1 hour & 20 min.
Serialization - UDFs vs Catalyst Optimizer: UDFs create an analysis barrier for the Catalyst Optimizer. The Catalyst Optimizer cannot connect code before and after UDF. The UDF is a black box which means optimizations are limited to the code before and after, excluding the UDF and how all the code works together.
Mitigating Serialization Issues
Don’t use UDFs.I challenge you to find a set of transformations that cannot be done with the built-in, continuously optimized, community-supported, higher-order functions.If you have to use UDFs in Python (common for Data Scientist) use the Vectorized UDFs as opposed to the stock Python UDFs.If you have to use UDFs in Scala use Typed Transformationsas opposed to the stock Scala UDFs.Resist the temptation to use UDFs to integrate Spark code withexisting business logic - porting that logic to Spark almost always pays off
IX) Conclusions
Understanding the core tasks of Read, Map, Reduce, Shuffle, Reduce, and Write is essential in any distributed computing environment, as they are pivotal in performance optimization. This blog delved into the intricacies of distributed computing, highlighting the criticality of managing common performance issues like Spill, Skew, Shuffle, Storage, and Serialization. These challenges, akin to orchestrating a symphony or managing a bustling city, require meticulous monitoring, measurement, and management. By addressing these issues effectively, IT professionals can ensure that distributed systems not only function seamlessly but also maintain efficiency, resilience, and scalability in the ever-evolving landscape of cloud technology.
References
Don, thanks for sharing!
Getting more people on bikes using Data, Northwestern Alumni
1yGarrett Baltzer , Sahil Gera , Mayank Jain