普通 SortShuffle 源码解析

write 方法

override def write(records: Iterator[Product2[K, V]]): Unit = {
    // 排序器
    sorter = if (dep.mapSideCombine) {
        require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
        new ExternalSorter[K, V, C](
            context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
    } else {
        // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
        // care whether the keys get sorted in each partition; that will be done on the reduce side
        // if the operation being run is sortByKey.
        new ExternalSorter[K, V, V](
            context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
    // 将 Map 任务的输出记录插入到缓存中

    // 数据 shuffle 数据文件
    val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)

    try { // 将 map 端缓存的数据写入到磁盘中, 并生成 Block 文件对应的索引文件.
        val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
        // 记录各个分区数据的长度
        val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
        // 生成 Block 文件对应的索引文件. 此索引文件用于记录各个分区在 Block文件中的偏移量, 以便于
        // Reduce 任务拉取时使用
        shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
        mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
    } finally {


bypass SortShuffle 源码解析

有时候, map 端不需要在持久化数据之前进行排序等操作, 那么 ShuffleWriter的实现类之一BypassMergeSortShuffleWriter 就可以派上用场了.

触发 BypassMergeSort

private[spark] object SortShuffleWriter {
    def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
        // We cannot bypass sorting if we need to do map-side aggregation.
        // 如果 map 端有聚合, 则不能绕过排序
        if (dep.mapSideCombine) {
            require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
        } else {
            val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
            // 分区数不能超过200 默认值
            dep.partitioner.numPartitions <= bypassMergeThreshold
