普通 SortShuffle 源码解析
write
方法
override def write(records: Iterator[Product2[K, V]]): Unit = {
// 排序器
sorter = if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
// 将 Map 任务的输出记录插入到缓存中
sorter.insertAll(records)
// 数据 shuffle 数据文件
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
try { // 将 map 端缓存的数据写入到磁盘中, 并生成 Block 文件对应的索引文件.
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
// 记录各个分区数据的长度
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
// 生成 Block 文件对应的索引文件. 此索引文件用于记录各个分区在 Block文件中的偏移量, 以便于
// Reduce 任务拉取时使用
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
}
}
bypass SortShuffle 源码解析
有时候, map
端不需要在持久化数据之前进行排序等操作, 那么 ShuffleWriter
的实现类之一BypassMergeSortShuffleWriter
就可以派上用场了.
触发
BypassMergeSort
private[spark] object SortShuffleWriter {
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
// We cannot bypass sorting if we need to do map-side aggregation.
// 如果 map 端有聚合, 则不能绕过排序
if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
false
} else {
val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
// 分区数不能超过200 默认值
dep.partitioner.numPartitions <= bypassMergeThreshold
}
}
}