kafka生產(chǎn)者發(fā)送消息流程深入分析講解
消息發(fā)送過程
消息的發(fā)送可能會經(jīng)過攔截器、序列化、分區(qū)器等過程。消息發(fā)送的主要涉及兩個線程,分別為main線程和sender線程。
如圖所示,主線程由 afkaProducer 創(chuàng)建消息,然后通過可能的攔截器、序列化器和分區(qū)器的作用之后緩存到消息累加器RecordAccumulator (也稱為消息收集器)中。 Sender 線程負責從RecordAccumulator 獲取消息并將其發(fā)送到 Kafka中。
攔截器
在消息序列化之前會經(jīng)過消息攔截器,自定義攔截器需要實現(xiàn)ProducerInterceptor接口,接口主要有兩個方案#onSend和#onAcknowledgement,在消息發(fā)送之前會調(diào)用前者方法,可以在發(fā)送之前假如處理邏輯,比如計費。在收到服務端ack響應后會觸發(fā)后者方法。需要注意的是攔截器中不要加入過多的復雜業(yè)務邏輯,以免影響發(fā)送效率。
消息分區(qū)
消息ProducerRecord會將消息路由到那個分區(qū)中,分兩種情況:
1.指定了partition字段
如果消息ProducerRecord中指定了 partition字段,那么就不需要走分區(qū)器,直接發(fā)往指定得partition分區(qū)中。
2.沒有指定partition,但自定義了分區(qū)器
3.沒指定parittion,也沒有自定義分區(qū)器,但key不為空
4.沒指定parittion,也沒有自定義分區(qū)器,key也為空
看源碼
// KafkaProducer#partition private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { //指定分區(qū)partition則直接返回,否則走分區(qū)器 Integer partition = record.partition(); return partition != null ? partition : partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }
//DefaultPartitioner#partition public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { if (keyBytes == null) { return stickyPartitionCache.partition(topic, cluster); } List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; }
partition 方法中定義了分區(qū)分配邏輯 如果 ke 不為 null , 那 么默認的分區(qū)器會對 key 進行哈 希(采 MurmurHash2 算法 ,具備高運算性能及 低碰 撞率),最終根據(jù)得到 哈希值來 算分區(qū)號, 有相同 key 的消息會被寫入同一個分區(qū) 如果 key null ,那么消息將會以輪詢的方式發(fā)往主題內(nèi)的各個可用分區(qū)。
消息累加器
分區(qū)確定好了之后,消息并不是直接發(fā)送給broker,因為一個個發(fā)送網(wǎng)絡消耗太大,而是先緩存到消息累加器RecordAccumulator,RecordAccumulator主要用來緩存消息 Sender 線程可以批量發(fā)送,進 減少網(wǎng)絡傳輸 的資源消耗以提升性能 RecordAccumulator 緩存的大 小可以通過生產(chǎn)者客戶端參數(shù) buffer memory 配置,默認值為 33554432B ,即 32MB如果生產(chǎn)者發(fā)送消息的速度超過發(fā) 送到服務器的速度 ,則會導致生產(chǎn)者空間不足,這個時候 KafkaProducer的send()方法調(diào)用要么 被阻塞,要么拋出異常,這個取決于參數(shù) max block ms 的配置,此參數(shù)的默認值為 60秒。
消息累加器本質(zhì)上是個ConcurrentMap,
ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
發(fā)送流程源碼分析
//KafkaProducer @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions //首先執(zhí)行攔截器鏈 ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); } private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { throwIfProducerClosed(); // first make sure the metadata for the topic is available long nowMs = time.milliseconds(); ClusterAndWaitTime clusterAndWaitTime; try { clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs); } catch (KafkaException e) { if (metadata.isClosed()) throw new KafkaException("Producer closed while send in progress", e); throw e; } nowMs += clusterAndWaitTime.waitedOnMetadataMs; long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; byte[] serializedKey; try { //key序列化 serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce); } byte[] serializedValue; try { //value序列化 serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); } //獲取分區(qū)partition int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); //消息壓縮 int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); //判斷消息是否超過最大允許大小,消息緩存空間是否已滿 ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? nowMs : record.timestamp(); if (log.isTraceEnabled()) { log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); } // producer callback will make sure to call both 'callback' and interceptor callback Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); if (transactionManager != null && transactionManager.isTransactional()) { transactionManager.failIfNotReadyForSend(); } //將消息緩存在消息累加器RecordAccumulator中 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs); //開辟新的ProducerBatch if (result.abortForNewBatch) { int prevPartition = partition; partitioner.onNewBatch(record.topic(), cluster, prevPartition); partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); if (log.isTraceEnabled()) { log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition); } // producer callback will make sure to call both 'callback' and interceptor callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs); } if (transactionManager != null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp); //判斷消息是否已滿,喚醒sender線程進行發(fā)送消息 if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; // handling exceptions and record the errors; // for API exceptions return them in the future, // for other exceptions throw directly } catch (Exception e) { // we notify interceptor about all exceptions, since onSend is called before anything else in this method this.interceptors.onSendError(record, tp, e); throw e; } }
生產(chǎn)消息的可靠性
消息發(fā)送到broker,什么情況下生產(chǎn)者才確定消息寫入成功了呢?ack是生產(chǎn)者一個重要的參數(shù),它有三個值,ack=1表示leader副本寫入成功服務端即可返回給生產(chǎn)者,是吞吐量和消息可靠性的平衡方案;ack=0表示生產(chǎn)者發(fā)送消息之后不需要等服務端響應,這種消息丟失風險最大;ack=-1表示生產(chǎn)者需要等等ISR中所有副本寫入成功后才能收到響應,這種消息可靠性最高但吞吐量也是最小的。
到此這篇關于kafka生產(chǎn)者發(fā)送消息流程深入分析講解的文章就介紹到這了,更多相關kafka發(fā)送消息流程內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Java?IO篇之Reactor?網(wǎng)絡模型的概念
Reactor?模式也叫做反應器設計模式,是一種為處理服務請求并發(fā)提交到一個或者多個服務處理器的事件設計模式,Reactor?模式主要由?Reactor?和處理器?Handler?這兩個核心部分組成,本文給大家介紹Java?IO篇之Reactor?網(wǎng)絡模型的概念,感興趣的朋友一起看看吧2022-01-01MyBatis-Plus中使用EntityWrappe進行列表數(shù)據(jù)倒序設置方式
這篇文章主要介紹了MyBatis-Plus中使用EntityWrappe進行列表數(shù)據(jù)倒序設置方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03