Kafka中生产者Producer消息发送流程源码解析

Kafka的Java客户端通过封装类kafka.producer.Producer来提供消息发送服务,所以消息发送的逻辑主要是在kafka.producer.Producer中完成。Producer的代码如下:

class Producer[K,V](val config: ProducerConfig,private val eventHandler: EventHandler[K,V])  // only for unit testingextends Logging {private val hasShutdown = new AtomicBoolean(false)//异步发送消息时,queue用来接收用户的消息,发送线程从queue中取消息发送给kafka broker。private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)private var sync: Boolean = true//消息发送线程private var producerSendThread: ProducerSendThread[K,V] = nullprivate val lock = new Object()config.producerType match {case "sync" =>case "async" =>sync = falseproducerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,queue,eventHandler,config.queueBufferingMaxMs,config.batchNumMessages,config.clientId)producerSendThread.start()}//利用了ProducerConfig初始化Producer对象,ProducerPool是通信连接池def this(config: ProducerConfig) =this(config,new DefaultEventHandler[K,V](config,Utils.createObject[Partitioner](config.partitionerClass, config.props),Utils.createObject[Encoder[V]](config.serializerClass, config.props),Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),new ProducerPool(config)))//发送消息的函数def send(messages: KeyedMessage[K,V]*) {lock synchronized {if (hasShutdown.get)throw new ProducerClosedExceptionrecordStats(messages)sync match {//同步发送消息case true => eventHandler.handle(messages)//异步发送消息case false => asyncSend(messages)}}}
}

通过上面的代码可以看出,客户端主要是通过函数send将消息发送出去的,Producer内部有几个主要的模块:

       1)ProducerSendThread:消息发送线程。当消息是异步发送时,ProducerSendThread主要用于缓存客户端的KeyedMessage,然后累计到配置的数量,或者间隔一定时间(queue.enqueue.timeout.ms)还没有获取到新的消息,则调用DefaultEventHandler的函数将KeyedMessage发送出去。

     2)ProducerPool:缓存客户端和各个broker的连接,DefaultEventHandler从ProducerPool获取和某个broker的通信对象SyncProducer,然后通过SyncProducer将KeyedMessage发送到指定的broker。

    3)DefaultEventHandler:将KeyedMessage消息按照分区规则计算不同的Broker Server所应接收的部分KeyedMessage消息,然后通过SyncProducer将KeyedMessage发送到指定的broker。在内部,在DefaultEventHandler模块内部提供了SyncProducer发送失败的重试机制和平滑扩容broker的机制。

ProducerSendThread

当用户配置producer.type等于async,表示消息异步发送,Producer客户端会启动ProducerSendThread线程,该线程负责从存放消息的阻塞队列BlockingQueue中取出消息,当超过一定数据或者间隔一段时间没有获取到消息,将累计到的消息调用DefaultEventHandler模板发送出去。

class ProducerSendThread[K,V](val threadName: String,//存放消息的队列val queue: BlockingQueue[KeyedMessage[K,V]],val handler: EventHandler[K,V],val queueTime: Long,   //消息间隔的时间,默认为5000msval batchSize: Int,    //消息累计触发发送的数量,默认为10000条val clientId: String) extends Thread(threadName) with Logging with KafkaMetricsGroup {private val shutdownLatch = new CountDownLatch(1)private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V])newGauge("ProducerQueueSize",new Gauge[Int] {def value = queue.size},Map("clientId" -> clientId))override def run {try {processEvents}catch {case e: Throwable => error("Error in sending events: ", e)}finally {shutdownLatch.countDown}}private def processEvents() {var lastSend = SystemTime.millisecondsvar events = new ArrayBuffer[KeyedMessage[K,V]]var full: Boolean = false// 从阻塞队列中拉取消息Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS)).takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {currentQueueItem =>val elapsed = (SystemTime.milliseconds - lastSend)// 检查是否超时val expired = currentQueueItem == nullif(currentQueueItem != null) {//消息累加events += currentQueueItem}// 判断是否超过设置的个数full = events.size >= batchSizeif(full || expired) {if(expired)debug(elapsed + " ms elapsed. Queue time reached. Sending..")if(full)debug("Batch full. Sending..")//发送消息tryToHandle(events)lastSend = SystemTime.milliseconds//清空发送的消息events = new ArrayBuffer[KeyedMessage[K,V]]}}// 发送最后一批消息tryToHandle(events)if(queue.size > 0)throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue".format(queue.size))}def tryToHandle(events: Seq[KeyedMessage[K,V]]) {val size = events.sizetry {debug("Handling " + size + " events")if(size > 0)handler.handle(events)}catch {case e: Throwable => error("Error in handling batch of " + size + " events", e)}}}

ProducerPool

producerPool缓存了和不同Broker Server的通信链路,每个通信链路用SyncProducer对象表示,该对象通过SyncProducer,该对象通过doSend接口将ProducerRequest和TopicMetadataRequest发送出去。其中ProducerRequest为生产者将消息发送客户端消息的请求,TopicMetadataRequest为生产者获取Topic元数据的请求。doSend的代码如下:

class SyncProducer(val config: SyncProducerConfig) extends Logging {private def doSend(request: RequestOrResponse, readResponse: Boolean = true): Receive = {lock synchronized {//验证请求verifyRequest(request)//如果没有和当前broker建立连接,就创建连接getOrMakeConnection()var response: Receive = nulltry {//调用阻塞通道,将请求发送出去blockingChannel.send(request)if(readResponse)//从阻塞通道获取数据response = blockingChannel.receive()elsetrace("Skipping reading response")} catch {case e: java.io.IOException =>// 发送失败,断开连接disconnect()throw ecase e: Throwable => throw e}//返回响应response}}
}

producerPool通过BrokerId将不同的SyncProducer一一映射,并且通过updateProducer来更新内部的SyncProducer连接池。

class ProducerPool(val config: ProducerConfig) extends Logging {//syncProducers的key为BrokerId,value为SyncProducerprivate val syncProducers = new HashMap[Int, SyncProducer]private val lock = new Object()def updateProducer(topicMetadata: Seq[TopicMetadata]) {val newBrokers = new collection.mutable.HashSet[Broker]//统计Topic所有分区的Leader ReplicatopicMetadata.foreach(tmd => {tmd.partitionsMetadata.foreach(pmd => {if(pmd.leader.isDefined)newBrokers+=(pmd.leader.get)})})lock synchronized {newBrokers.foreach(b => {if(syncProducers.contains(b.id)){//已经存在的SyncProducer,关闭连接后重新创建连接syncProducers(b.id).close()syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))} else//不存在连接,则新建syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))})}}def getProducer(brokerId: Int) : SyncProducer = {lock.synchronized {val producer = syncProducers.get(brokerId)producer match {case Some(p) => pcase None => throw new UnavailableProducerException("Sync producer for broker id %d does not exist".format(brokerId))}}}
}

DefaultEventHandler

DefaultEventHandler决定了发送消息的具体逻辑,主要分以下几个步骤:

    1)、通过分区规则首先将KeyedMessage分组,不同的KeyedMessage落入到不同的topic分区,然后按照Leader Replcia所在的Broker Server分组,每个Broker Server对应不同的KeyedMessage

    2)、从producerPool取出不同的Broker Server对应的SyncProducer对象,通过SyncProducer对象将消息发送出去。

 首先看一下DefaultEventHandler的消息处理逻辑

class DefaultEventHandler[K,V](config: ProducerConfig,private val partitioner: Partitioner,private val encoder: Encoder[V],private val keyEncoder: Encoder[K],private val producerPool: ProducerPool,private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata])extends EventHandler[K,V] with Logging {//是否同步val isSync = ("sync" == config.producerType)val correlationId = new AtomicInteger(0)val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos)private val topicMetadataRefreshInterval = config.topicMetadataRefreshIntervalMsprivate var lastTopicMetadataRefreshTime = 0Lprivate val topicMetadataToRefresh = Set.empty[String]private val sendPartitionPerTopicCache = HashMap.empty[String, Int]private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId)private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)def handle(events: Seq[KeyedMessage[K,V]]) {//序列化eventsval serializedData = serialize(events)serializedData.foreach {keyed =>val dataSize = keyed.message.payloadSizeproducerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)producerTopicStats.getProducerAllTopicsStats.byteRate.mark(dataSize)}var outstandingProduceRequests = serializedData//设置失败重试次数,默认为3var remainingRetries = config.messageSendMaxRetries + 1//获取客户端发送消息的correlationId,相同的correlationId表示对应的请求和响应val correlationIdStart = correlationId.get()//重试次数没有达到次数 且还有待发送的数据while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {//获取待发送的Topic的集合topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)//如果长时间没有刷新Topic元数据,则主动刷新元数据if (topicMetadataRefreshInterval >= 0 &&SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {//如果有新增的partition到Broker,此时就可以被发现,并在ProducerPool中缓存下来Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))sendPartitionPerTopicCache.clear()topicMetadataToRefresh.clearlastTopicMetadataRefreshTime = SystemTime.milliseconds}//发送消息,返回发送失败的消息,需要重新发送outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)if (outstandingProduceRequests.size > 0) {//发送失败,sleep一段时间Thread.sleep(config.retryBackoffMs)
Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))sendPartitionPerTopicCache.clear()remainingRetries -= 1producerStats.resendRate.mark()}}//超过重试次数还是没有发送成功,抛出异常if(outstandingProduceRequests.size > 0) {producerStats.failedSendRate.mark()val correlationIdEnd = correlationId.get()throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null)}}

DefaultEventHandler内部会定期刷新Topic的元数据,并更新ProducerPool。因为Kafka集群中元数据会发生变化,比如新增Broker Server,或者Topic新增partition,partition迁移到其它Broker Server上,这些变化都需要客户端能感知到,所以采用定期发送元数据获取请求来感知,这样DefaultEventHandler就可以实现对Broker Server的平滑扩容。

DefaultEventHandler更新元数据的流程如下:

class BrokerPartitionInfo(producerConfig: ProducerConfig,producerPool: ProducerPool,topicPartitionInfo: HashMap[String, TopicMetadata])extends Logging {val brokerList = producerConfig.brokerListval brokers = ClientUtils.parseBrokerList(brokerList)def updateInfo(topics: Set[String], correlationId: Int) {var topicsMetadata: Seq[TopicMetadata] = Nil//发送TopicMetadataRequest请求获取元数据val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)topicsMetadata = topicMetadataResponse.topicsMetadata// throw partition specific exceptiontopicsMetadata.foreach(tmd =>{if(tmd.errorCode == ErrorMapping.NoError) {//没有异常,更新Topic元数据topicPartitionInfo.put(tmd.topic, tmd)} elsetmd.partitionsMetadata.foreach(pmd =>{if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,ErrorMapping.exceptionFor(pmd.errorCode).getClass))} // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata})})producerPool.updateProducer(topicsMetadata)}
}

从上述流程可知,DefaultEventHandler主要通过发送TopicMetadataRequest请求来获取元数据。更新元数据后还需要更新ProducerPool中的通信连接。

DefaultEventHandler的主要流程会进入函数dispatchSerializedData,在dispatchSerializedData函数中会先将KeyedMessage按照发送的Broker Server进行分组,然后调用ProducerPool中的连接进行消息的发送。其具体实现如下:

private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {/** 将消息分组,发往相同Broker Server的消息分为一组。* partitionedDataOpt 类型为Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]*/val partitionedDataOpt = partitionAndCollate(messages)partitionedDataOpt match {case Some(partitionedData) =>val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]try {for ((brokerid, messagesPerBrokerMap) <- partitionedData) {if (logger.isTraceEnabled)messagesPerBrokerMap.foreach(partitionAndEvent =>//聚合消息,将发往同一个Broker Server不同TopicAndPartition的消息聚会在一起val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)//发送消息,将发送失败的消息返回val failedTopicPartitions = send(brokerid, messageSetPerBroker)failedTopicPartitions.foreach(topicPartition => {messagesPerBrokerMap.get(topicPartition) match {case Some(data) => failedProduceRequests.appendAll(data)case None => // nothing}})}} catch {case t: Throwable => error("Failed to send messages", t)}//返回发送失败的消息集合failedProduceRequestscase None => // all produce requests failedmessages}}

可见dispatchSerializedData关键是对KeyedMessage进行分组,分组后找到所对应的Broker Server然后发送消息。

DefaultEventHandler内部提供了getPartition函数,输入topic,key,topicPartitionList,输出partition索引,具体实现如下:

private def getPartition(topic: String, key: Any,topicPartitionList: Seq[PartitionAndLeader]): Int = {//计算topic的分区个数val numPartitions = topicPartitionList.sizeif(numPartitions <= 0)throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")val partition =if(key == null) {//分区键为空,则从cache中获取分区,每个topic发往一个分区val id = sendPartitionPerTopicCache.get(topic)id match {//cache中存在,直接返回case Some(partitionId) =>partitionId//cache不存在,则需要重新计算case None =>//筛选出leader partition列表val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)if (availablePartitions.isEmpty)throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)//针对在线的 leader replica随机选择其中一个,写入缓存并返回val index = Utils.abs(Random.nextInt) % availablePartitions.sizeval partitionId = availablePartitions(index).partitionIdsendPartitionPerTopicCache.put(topic, partitionId)partitionId}} else/分区键不为空,则根据分区函数选择其中的一个分区partitioner.partition(key, numPartitions)if(partition < 0 || partition >= numPartitions)throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic +"; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]")trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition))partition}

可见如果没有传入分区键,系统会从在线的leader replica中选择一个发送,并且在一个周期内固定的往这个partition发送,直到下一次重新发送Topic元数据请求,重新刷新本地的元数据数据,清除掉之前的映射关系,重新计算下一个周期内Topic选定的Partition。因此客户端看到的现象是一段时间内集中发往某个分区,过一段时间又集中发往另一个分区。如果用户传入了分区键,就会调用分区函数进行分区,这样相同分区键的消息就可以发往同一个分区。


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部