Spark Shuffle consists of two types: Hash Shuffle; The other is Sort based Shuffle. First, we will introduce their development process to help us better understand Shuffle:
Before Spark 1.1, Spark implements only one Shuffle mode, that is, hash-based Shuffle. In Spark 1.1, Sort based Shuffle is introduced. After Spark 1.2, the default Shuffle implementation mode is changed from Hash based Shuffle to Sort based Shuffle. The ShuffleManager used was changed from the default hash to sort. In Spark 2.0, Hash Shuffle is no longer used.
One of the main purposes of Spark's implementation of Shuffle based on Hash in the first place is to avoid unnecessary sorting. If you think of MapReduce in Hadoop, sort is a fixed step. There are many tasks that do not need sorting. MapReduce also sorts them, causing a lot of unnecessary overhead.
In the Shuffle implementation based on Hash, each Task in the Mapper phase generates a file for each Task in the Reduce phase, which usually generates a large number of files (corresponding to M * R intermediate files, among which, M indicates the number of tasks in the Mapper phase and R indicates the number of tasks in the Reduce phase.) A large number of random disk I/O operations and a large amount of memory overhead are required.
To alleviate the above problems, the ShuffleConsolidate mechanism (file merging mechanism) was introduced in Spark 0.8.1 for the hash-based Shuffle implementation to merge intermediate files generated by the Mapper side. Configure the spark.shuffie. consolidateFiles = true property to reduce the number of intermediate files. After file merging, you can change the generation mode of intermediate files to generate one file for each Task in the Reduce phase.
The execution unit is the number of Cores on each Mapper/the number of Cores allocated to each Task (the default value is 1). You can change the number of files from M * R to E * C/T * R, where E indicates the number of Executors, C indicates the number of Cores available, and T indicates the number of Cores allocated to the Task.
Spark1.1 introduced Sort Shuffle:
In the implementation of Hash-based Shuffle, the number of intermediate result files generated depends on the number of tasks in the Reduce phase, that is, the degree of parallelism on the Reduce side. Therefore, the number of files is still uncontrollable and problems cannot be solved. To better solve the problem, a sort-based Shuffle implementation was introduced in Spark1.1, and after Spark 1.2, the default implementation changed from hash-based Shuffle, Change to the Shuffle implementation based on Sort. That is, the ShuffleManager is changed from the default hash to Sort.
In sort-based Shuffle, tasks in each Mapper phase do not generate a separate file for tasks in each Reduce phase. Instead, tasks in each Mapper phase are written to a Data file and an Index file is generated. Tasks in the Reduce phase can obtain related data through the index file. The immediate benefit of avoiding large files is reduced random disk I / 0 and memory overhead. The number of generated files is reduced to 2 x M, where M indicates the number of tasks in the Mapper phase. Each Task in the Mapper phase generates two files (one data file and one index file), and the final number of files is M data files and M index files. Therefore, the final number of files is 2 * M.
Starting from the 1.4 version of Spark, the Shuffie implementation based on Tungstin-sort was introduced in Shuffle process. The optimization of Tungsten project can greatly improve the performance of Spark in data processing. (Tungsten translates to Chinese as Tungsten)
Note: In some specific application scenarios, the Shuffle mechanism based on Hash may outperform the Shuffle mechanism based on Sort.