Spark技术内幕: Shuffle详解

Spark技术内幕: Shuffle详解


通过上面一系列文章,我们知道在集群启动时,在Standalone模式下,Worker会向Master注册,使得Master可以感知进而管理整个集群;Master通过借助ZK,可以简单的实现HA;而应用方通过SparkContext这个与集群的交互接口,在创建SparkContext时就完成了Application的注册,Master为其分配Executor;在应用方创建了RDD并且在这个RDD上进行了很多的Transformation后,触发action,通过DAGScheduler将DAG划分为不同的Stage后,将Stage转换为TaskSet交给TaskSchedulerImpl;TaskSchedulerImpl通过SparkDeploySchedulerBackend的reviveOffers,最终向ExecutorBackend发送LaunchTask的消息;ExecutorBackend接收到消息后,启动Task,开始在集群中启动计算。

接下来,会介绍一些更详细的细节实现。

Shuffle,无疑是性能调优的一个重点,本文将从源码实现的角度,深入解析Spark Shuffle的实现细节。

每个Stage的上边界,要不是需要从外部存储读取数据,要么需要读取上一个Stage的输出;而下边界,要么是需要写入本地文件系统,以供child Stage读取,要么是ResultTask,需要输出结果了。

首先从org.apache.spark.rdd.ShuffledRDD开始, 因为ShuffledRDD是一个Stage的开始,它需要获取上一个Stage的输出结果,然后进行接下来的运算。那么这个数据获取是如何实现的?顺着ShuffledRDD的实现,我们可以理清这条线。首先可以看一下compute是如何实现的。

  override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context).read().asInstanceOf[Iterator[(K, C)]]}

它需要从ShuffleManager获取shuffleReader,然后读取数据进行计算。看一下shuffleManager:

 // Let the user specify short names for shuffle managersval shortShuffleMgrNames = Map("hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager","sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

ShuffleManager分为hash和sort,hash是默认的,即Shuffle时不排序。熟悉MapReduce的同学都知道,MapReduce是无论如何都要排序的,即到Reduce端的都是已经排序好的,当然这么做也是为了可以处理海量的数据。在Spark1.1之前,只支持hash based的Shuffle,sort based Shuffle是1.1新加入的实验功能。
hash顾名思义,在Reduce时的数据需要求有序,因此可以在Reduce获得了数据后,立即进行处理;而不需要等待所有的数据都得到后再处理。这个接下来会通过源码进行解释。而sort,意味着排序,实际上对于sortByKey这种转换可能sort是更有意义的。

ShuffledRDD是通过org.apache.spark.shuffle.hash.HashShuffleReader获取上一个Stage的结果。而HashShuffleReader通过org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$#fetch来获取结果。而fetch通过调用org.apache.spark.storage.BlockManager#getMultiple来转发请求:

  def getMultiple(blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],serializer: Serializer,readMetrics: ShuffleReadMetrics): BlockFetcherIterator = {val iter = new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer,readMetrics)iter.initialize()iter}

而最终的实现在org.apache.spark.storage.BlockFetcherIterator.BasicBlockFetcherIterator#initialize中,

  override def initialize() {// Split local and remote blocks.// 获得需要远程请求的数据列表,并且将已经在本地的数据的blockid放在localBlocksToFetch中,// 并且在org.apache.spark.storage.BlockFetcherIterator.BasicBlockFetcherIterator.getLocalBlocks进行本地读取val remoteRequests = splitLocalRemoteBlocks()// Add the remote requests into our queue in a random orderfetchRequests ++= Utils.randomize(remoteRequests)// Send out initial requests for blocks, up to our maxBytesInFlightwhile (!fetchRequests.isEmpty && //保证占用内存不超过设定的值spark.reducer.maxMbInFlight,默认值是48M(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {sendRequest(fetchRequests.dequeue())}val numFetches = remoteRequests.size - fetchRequests.sizelogInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))// Get Local BlocksstartTime = System.currentTimeMillisgetLocalBlocks() // 从本地获取logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")}

具体获取如何获取的策略都在org.apache.spark.storage.BlockFetcherIterator.BasicBlockFetcherIterator#splitLocalRemoteBlocks中

    protected def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {// Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them// smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5// nodes, rather than blocking on reading output from one node.// 为了快速的得到数据,每次都会启动5个线程去最多5个node上取数据;// 每次请求的数据不会超过spark.reducer.maxMbInFlight(默认值为48MB) / 5。// 这样做的原因有几个:// 1. 避免占用目标机器的过多带宽,在千兆网卡为主流的今天,带宽还是比较重要的。//    如果一个连接将要占用48M的带宽,这个Network IO可能会成为瓶颈。// 2. 请求数据可以平行化,这样请求数据的时间可以大大减少。请求数据的总时间就是那个请求最长的。//    如果不是并行请求,那么总时间将是所有的请求时间之和。// 而设置spark.reducer.maxMbInFlight,也是为了不要占用过多的内存val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)logInfo("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize)// Split local and remote blocks. Remote blocks are further split into FetchRequests of size// at most maxBytesInFlight in order to limit the amount of data in flight.val remoteRequests = new ArrayBuffer[FetchRequest]var totalBlocks = 0for ((address, blockInfos) <- blocksByAddress) { //  address实际上是executor_idtotalBlocks += blockInfos.sizeif (address == blockManagerId) { //数据在本地,那么直接走local read// Filter out zero-sized blockslocalBlocksToFetch ++= blockInfos.filter(_._2 != 0).map(_._1)_numBlocksToFetch += localBlocksToFetch.size} else {val iterator = blockInfos.iteratorvar curRequestSize = 0Lvar curBlocks = new ArrayBuffer[(BlockId, Long)]while (iterator.hasNext) {// blockId 是org.apache.spark.storage.ShuffleBlockId,// 格式:"shuffle_" + shuffleId + "_" + mapId + "_" + reduceIdval (blockId, size) = iterator.next()// Skip empty blocksif (size > 0) { //过滤掉为大小为0的文件curBlocks += ((blockId, size))remoteBlocksToFetch += blockId_numBlocksToFetch += 1curRequestSize += size} else if (size < 0) {throw new BlockException(blockId, "Negative block size " + size)}if (curRequestSize >= targetRequestSize) { // 避免一次请求的数据量过大// Add this FetchRequestremoteRequests += new FetchRequest(address, curBlocks)curBlocks = new ArrayBuffer[(BlockId, Long)]logDebug(s"Creating fetch request of $curRequestSize at $address")curRequestSize = 0}}// Add in the final requestif (!curBlocks.isEmpty) { // 将剩余的请求放到最后一个request中。remoteRequests += new FetchRequest(address, curBlocks)}}}logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " +totalBlocks + " blocks")remoteRequests}

原文链接
【1】https://blog.csdn.net/anzhsoft/article/details/41593807
【2】https://blog.csdn.net/anzhsoft/article/details/41620329


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部