SpringCloudStream中的消息分區(qū)數(shù)詳解
一、前言
本文僅針對 Kafka 來聊消息分區(qū)數(shù)相關的話題。
SpringCloudStream 中的消息分區(qū)數(shù)如何配置?
或者說消息分區(qū)數(shù)會受到哪些配置的影響。
- SpringCloudStream:Greenwich.SR2
- Kafka:kafka_2.12-2.3.0
二、影響因素
2.1 Kafka服務端
首先應該想到的,Kafka 配置文件 server.properties 中默認每一個 topic 的分區(qū)數(shù) num.partitions=1
# The default number of log partitions per topic. More partitions allow greater num.partitions=1
2.2 生產(chǎn)者端
從SpringCloudStream的配置中可以看到,生產(chǎn)者可以指定分區(qū)數(shù),默認1:
spring.cloud.stream.bindings.<channelName>.partitionCount.producer=n
【說明】:當分區(qū)功能開啟時,使用該參數(shù)來配置消息數(shù)據(jù)的分區(qū)數(shù)。
如果消息生產(chǎn)者已經(jīng)配置了分區(qū)鍵的生成策略,那么它的值必須大于1。
2.3 消費者端
SpringCloudStream 允許通過配置,使得消費者能夠自動創(chuàng)建分區(qū)。
#輸入通道消費者的并發(fā)數(shù),默認1 spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2
若想以上配置生效,還需添加如下通用配置:
#Kafka綁定器允許在需要的時候自動創(chuàng)建分區(qū)。默認false spring.cloud.stream.kafka.binder.autoAddPartitions=true
消費者端如此配置以后,將表現(xiàn)為一個消費者服務或進程中,會有2個線程各自消費1個分區(qū),即2個消費者線程同時消費。
以下是該配置的效果驗證步驟:
消費者代碼:
1個 @StreamListener 消費自己的 topic 或自己的輸出channel:
@EnableBinding(SpiderSink.class) @Slf4j public class SpiderSinkReceiver { @Autowired private SpiderMessageService spiderMessageService; @StreamListener(SpiderSink.INPUT) public void receive(Object payload) { log.info("SPIDER-SINK received: {}", payload); } }
方式一:通過日志驗證:
通過在 log4j 日志中,打印線程名稱的方式,驗證 spring.cloud.stream.bindings.<channelName>.consumer.concurrency 的配置確確實實會新增1個消費者線程。
[INFO ] 2020-05-09 01:19:34,700 [thread: [Ljava.lang.String;@5b40de43.container-1-C-1] com.cjia.spidersink.sink.SpiderSinkReceiver.receive(SpiderSinkReceiver.java:50) [INFO ] 2020-05-09 01:19:35,888 [thread: [Ljava.lang.String;@5b40de43.container-0-C-1] com.cjia.spidersink.sink.SpiderSinkReceiver.receive(SpiderSinkReceiver.java:50)
方式二:直接查看分區(qū)數(shù)來驗證:
另外,也可在啟動一個生產(chǎn)者服務時,等待自動創(chuàng)建一個新 topic 后(此時默認分區(qū)數(shù)為1),比如我們創(chuàng)建的 topic 為“topic-spider-dev”,此時通過kafka命令查看分區(qū)數(shù),此時分區(qū)數(shù)為1:
[root@bi-zhaopeng01 kafka]# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-spider-dev Topic:topic-spider-dev ?PartitionCount:1 ? ? ? ?ReplicationFactor:1 ? ? Configs: ? ? ? ? Topic: topic-spider-dev Partition: 0 ? ?Leader: 1 ? ? ? Replicas: 1 ? ? Isr: 1
然后,配置消費者服務的 spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2,啟動一個消費者服務,再次查看分區(qū)數(shù),已經(jīng)變?yōu)?了:
[root@bi-zhaopeng01 kafka]# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-spider-dev Topic:topic-spider-dev ?PartitionCount:2 ? ? ? ?ReplicationFactor:1 ? ? Configs: ? ? ? ? Topic: topic-spider-dev Partition: 0 ? ?Leader: 1 ? ? ? Replicas: 1 ? ? Isr: 1 ? ? ? ? Topic: topic-spider-dev Partition: 1 ? ?Leader: 2 ? ? ? Replicas: 2 ? ? Isr: 2
同時查看消費者端的應用日志,看到2個消費者線程各自分配了一個分區(qū):
[INFO ] 2020-05-12 17:22:43,940 [thread: [Ljava.lang.String;@299dd381.container-0-C-1] org.springframework.kafka.listener.AbstractMessageListenerContainer$1.onPartitionsAssigned(AbstractMessageListenerContainer.java:363) partitions assigned: [topic-spider-dev-0] [INFO ] 2020-05-12 17:22:44,004 [thread: [Ljava.lang.String;@299dd381.container-1-C-1] org.springframework.kafka.listener.AbstractMessageListenerContainer$1.onPartitionsAssigned(AbstractMessageListenerContainer.java:363) partitions assigned: [topic-spider-dev-1]
最終,確確實實地驗證了 concurrency 配置對消費者線程數(shù)和分區(qū)數(shù)的影響。
2.4 其他因素
比如,SpringCloudStream 中 Kafka 綁定器的配置中,也有一個相關的影響因素:
#最小分區(qū)數(shù),默認1 spring.cloud.stream.kafka.binder.minPartitionCount=n
【說明】:該參數(shù)僅在設置了 autoCreateTopics 和 autoAddPartitions 時生效,用來設置該綁定器所使用主題的全局分區(qū)最小數(shù)量。
如果當生產(chǎn)者的 partitionCount 參數(shù)或 instanceCount * concurrency 設置大于該參數(shù)配置時,該參數(shù)值將被覆蓋。
總結
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
java?常規(guī)輪詢長輪詢Long?polling實現(xiàn)示例詳解
這篇文章主要為大家介紹了java?常規(guī)輪詢長輪詢Long?polling實現(xiàn)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-12-12java實現(xiàn)解析二進制文件的方法(字符串、圖片)
本篇文章主要介紹了java實現(xiàn)解析二進制文件的方法(字符串、圖片),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-02-02