Java Kafka實現(xiàn)延遲隊列的示例代碼
kafka作為一個使用廣泛的消息隊列,很多人都不會陌生,但當你在網(wǎng)上搜索“kafka 延遲隊列”,出現(xiàn)的都是一些講解時間輪或者只是提供了一些思路,并沒有一份真實可用的代碼實現(xiàn),今天我們就來打破這個現(xiàn)象,提供一份可運行的代碼,拋磚引玉,吸引更多的大神來分享。
基于kafka如何實現(xiàn)延遲隊列
想要解決一個問題,我們需要先分解問題。kafka作為一個高性能的消息隊列,只要消費能力足夠,發(fā)出的消息都是會立刻收到的,因此我們需要想一個辦法,讓消息延遲發(fā)送出去。
網(wǎng)上已經(jīng)有大神給出了如下方案:
- 在發(fā)送延遲消息時不直接發(fā)送到目標topic,而是發(fā)送到一個用于處理延遲消息的topic,例如
delay-minutes-1
- 寫一段代碼拉取
delay-minutes-1
中的消息,將滿足條件的消息發(fā)送到真正的目標主題里。
就像畫一匹馬一樣簡單。
方案是好的,但是我們還需要更多細節(jié)。
完善細節(jié)
問題出在哪里?
問題出在延遲消息發(fā)出去之后,代碼程序就會立刻收到延遲消息,要如何處理才能讓延遲消息等待一段時間才發(fā)送到真正的topic里面。
可能有同學會覺得很簡單嘛,在代碼程序收到消息之后判斷條件不滿足,就調(diào)用sleep
方法,過了一段時間我再進行下一個循環(huán)拉取消息。
真的可行嗎?
一切好像都很美好,但這是不可行的。
這是因為在輪詢kafka拉取消息的時候,它會返回由max.poll.records
配置指定的一批消息,但是當程序代碼不能在max.poll.interval.ms
配置的期望時間內(nèi)處理這些消息的話,kafka就會認為這個消費者已經(jīng)掛了,會進行rebalance
,同時你這個消費者就無法再拉取到任何消息了。
舉個例子:當你需要一個24小時的延遲消息隊列,在代碼里面寫下了Thread.sleep(1000*60*60*24);
,為了不發(fā)生rebalance
,你把max.poll.interval.ms
也改成了1000*60*60*24
,這個時候你或許會感覺到一絲絲的怪異,我是誰?我在哪?我為什么要寫出來這樣的代碼?
其實我們可以更優(yōu)雅的處理這個問題。
KafkaConsumer 提供了暫停和恢復的API函數(shù),調(diào)用消費者的暫停方法后就無法再拉取到新的消息,同時長時間不消費kafka也不會認為這個消費者已經(jīng)掛掉了。另外為了能夠更加優(yōu)雅,我們會啟動一個定時器來替換sleep
。,完整流程如下圖,當消費者發(fā)現(xiàn)消息不滿足條件時,我們就暫停消費者,并把偏移量seek到上一次消費的位置以便等待下一個周期再次消費這條消息。
Java代碼實現(xiàn)
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; @SpringBootTest public class DelayQueueTest { private KafkaConsumer<String, String> consumer; private KafkaProducer<String, String> producer; private volatile Boolean exit = false; private final Object lock = new Object(); private final String servers = ""; @BeforeEach void initConsumer() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "d"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "5000"); consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer()); } @BeforeEach void initProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producer = new KafkaProducer<>(props); } @Test void testDelayQueue() throws JsonProcessingException, InterruptedException { String topic = "delay-minutes-1"; List<String> topics = Collections.singletonList(topic); consumer.subscribe(topics); Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { synchronized (lock) { consumer.resume(consumer.paused()); lock.notify(); } } }, 0, 1000); do { synchronized (lock) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(200)); if (consumerRecords.isEmpty()) { lock.wait(); continue; } boolean timed = false; for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { long timestamp = consumerRecord.timestamp(); TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition()); if (timestamp + 60 * 1000 < System.currentTimeMillis()) { String value = consumerRecord.value(); ObjectMapper objectMapper = new ObjectMapper(); JsonNode jsonNode = objectMapper.readTree(value); JsonNode jsonNodeTopic = jsonNode.get("topic"); String appTopic = null, appKey = null, appValue = null; if (jsonNodeTopic != null) { appTopic = jsonNodeTopic.asText(); } if (appTopic == null) { continue; } JsonNode jsonNodeKey = jsonNode.get("key"); if (jsonNodeKey != null) { appKey = jsonNode.asText(); } JsonNode jsonNodeValue = jsonNode.get("value"); if (jsonNodeValue != null) { appValue = jsonNodeValue.asText(); } // send to application topic ProducerRecord<String, String> producerRecord = new ProducerRecord<>(appTopic, appKey, appValue); try { producer.send(producerRecord).get(); // success. commit message OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1); HashMap<TopicPartition, OffsetAndMetadata> metadataHashMap = new HashMap<>(); metadataHashMap.put(topicPartition, offsetAndMetadata); consumer.commitSync(metadataHashMap); } catch (ExecutionException e) { consumer.pause(Collections.singletonList(topicPartition)); consumer.seek(topicPartition, consumerRecord.offset()); timed = true; break; } } else { consumer.pause(Collections.singletonList(topicPartition)); consumer.seek(topicPartition, consumerRecord.offset()); timed = true; break; } } if (timed) { lock.wait(); } } } while (!exit); } }
這段程序是基于SpringBoot 2.4.4
版本和 kafka-client 2.7.0
版本編寫的一個單元測試,需要修改私有變量servers
為kafka broker的地址。
在啟動程序后,向Topic delay-minutes-
1 發(fā)送如以下格式的json字符串數(shù)據(jù)
{ "topic": "target", "key": "key1", "value": "value1" }
同時啟動一個消費者監(jiān)聽topic target
,在一分鐘后,將會收到一條 key="key1", value="value1"的數(shù)據(jù)。
還需要做什么
創(chuàng)建多個topic用于處理不同時間的延遲消息,例如delay-minutes-1
delay-minutes-5
delay-minutes-10
delay-minutes-15
以提供指數(shù)級別的延遲時間,這樣比一個topic要好很多,畢竟在順序拉取消息的時候,有一條消息不滿足條件,后面的將全部進行排隊。
到此這篇關于Java Kafka實現(xiàn)延遲隊列的示例代碼的文章就介紹到這了,更多相關Java Kafka延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
解決Java中SimpleDateFormat線程不安全的五種方案
SimpleDateFormat 就是一個典型的線程不安全事例,本文主要介紹了解決Java中SimpleDateFormat線程不安全的五種方案,需要的朋友們下面隨著小編來一起學習學習吧2021-05-05Mybatis中typeAliases標簽和package標簽使用
這篇文章主要介紹了Mybatis中typeAliases標簽和package標簽使用,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09Spring中@Autowired和@Resource注解相同點和不同點
這篇文章主要介紹了Spring中@Autowired和@Resource注解相同點和不同點,本文通過示例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧2024-01-01