def partition(key: Any, numPartitions: Int): Int = { Utils.abs(key.hashCode) % numPartitions} 這就保證了相同key的消息一定會被路由到相同的分區。如果你沒有指定key,那么Kafka是如何確定這條消息去往哪個分區的呢?

if(key == null) { // 如果沒有指定key val id = sendPartitionPerTopicCache.get(topic) // 先看看Kafka有沒有緩存的現成的分區Id id match { case Some(partitionId) => partitionId // 如果有的話直接使用這個分區Id就好了 case None => // 如果沒有的話, val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) //找出所有可用分區的leader所在的broker if (availablePartitions.isEmpty) throw new LeaderNotAvailableException("No leader for any partition in topic " + topic) val index = Utils.abs(Random.nextInt) % availablePartitions.size // 從中隨機挑一個 val partitionId = availablePartitions(index).partitionId sendPartitionPerTopicCache.put(topic, partitionId) // 更新緩存以備下一次直接使用 partitionId } }
可以看出,Kafka幾乎就是隨機找一個分區發送無key的消息,然后把這個分區號加入到緩存中以備后面直接使用——當然了,Kafka本身也會清空該緩存(默認每10分鐘或每次請求topic元數據時)
如何設定consumer線程數 我個人的觀點,如果你的分區數是N,那么最好線程數也保持為N,這樣通常能夠達到最大的吞吐量。超過N的配置只是浪費系統資源,因為多出的線程不會被分配到任何分區。讓我們來看看具體Kafka是如何分配的。 topic下的一個分區只能被同一個consumer group下的一個consumer線程來消費,但反之并不成立,即一個consumer線程可以消費多個分區的數據,比如Kafka提供的ConsoleConsumer,默認就只是一個線程來消費所有分區的數據。——其實ConsoleConsumer可以使用通配符的功能實現同時消費多個topic數據,但這和本文無關。 再討論分配策略之前,先說說KafkaStream——它是consumer的關鍵類,提供了遍歷方法用于consumer程序調用實現數據的消費。其底層維護了一個阻塞隊列,所以在沒有新消息到來時,consumer是處于阻塞狀態的,表現出來的狀態就是consumer程序一直在等待新消息的到來。——你當然可以配置成帶超時的consumer,具體參看參數consumer.timeout.ms的用法。 下面說說Kafka提供的兩種分配策略: range和roundrobin,由參數partition.assignment.strategy指定,默認是range策略。本文只討論range策略。所謂的range其實就是按照階段平均分配。舉個例子就明白了,假設你有10個分區,P0 ~ P9,consumer線程數是3, C0 ~ C2,那么每個線程都分配哪些分區呢? C0 消費分區 0, 1, 2, 3C1 消費分區 4, 5, 6C2 消費分區 7, 8, 9 具體算法就是:
val nPartsPerConsumer = curPartitions.size / curConsumers.size // 每個consumer至少保證消費的分區數val nConsumersWithExtraPart = curPartitions.size % curConsumers.size // 還剩下多少個分區需要單獨分配給開頭的線程們...for (consumerThreadId <- consumerThreadIdSet) { // 對于每一個consumer線程 val myConsumerPosition = curConsumers.indexOf(consumerThreadId) //算出該線程在所有線程中的位置,介于[0, n-1] assert(myConsumerPosition >= 0)// startPart 就是這個線程要消費的起始分區數 val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)// nParts 就是這個線程總共要消費多少個分區 val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)...}
針對于這個例子,nPartsPerConsumer就是10/3=3,nConsumersWithExtraPart為10%3=1,說明每個線程至少保證3個分區,還剩下1個分區需要單獨分配給開頭的若干個線程。這就是為什么C0消費4個分區,后面的2個線程每個消費3個分區,具體過程詳見下面的Debug截圖信息:
ctx.myTopicThreadIds

nPartsPerConsumer = 10 / 3 = 3nConsumersWithExtraPart = 10 % 3 = 1
第一次:myConsumerPosition = 1startPart = 1 * 3 + min(1, 1) = 4 ---也就是從分區4開始讀nParts = 3 + (if (1 + 1 > 1) 0 else 1) = 3 讀取3個分區, 即4,5,6第二次:myConsumerPosition = 0startPart = 3 * 0 + min(1, 0) =0 --- 從分區0開始讀nParts = 3 + (if (0 + 1 > 1) 0 else 1) = 4 讀取4個分區,即0,1,2,3第三次:myConsumerPosition = 2startPart = 3 * 2 + min(2, 1) = 7 --- 從分區7開始讀nParts = 3 + if (2 + 1 > 1) 0 else 1) = 3 讀取3個分區,即7, 8, 9至此10個分區都已經分配完畢
說到這里,經常有個需求就是我想讓某個consumer線程消費指定的分區而不消費其他的分區。坦率來說,目前Kafka并沒有提供自定義分配策略。做到這點很難,但仔細想一想,也許我們期望Kafka做的事情太多了,畢竟它只是個消息引擎,在Kafka中加入消息消費的邏輯也許并不是Kafka該做的事情。
新聞熱點
疑難解答