亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Java Kafka實現(xiàn)延遲隊列的示例代碼

 更新時間:2022年08月22日 10:30:51   作者:整點bug  
kafka作為一個使用廣泛的消息隊列,很多人都不會陌生。本文將利用Kafka實現(xiàn)延遲隊列,文中的示例代碼講解詳細,感興趣的小伙伴可以嘗試一下

kafka作為一個使用廣泛的消息隊列,很多人都不會陌生,但當你在網(wǎng)上搜索“kafka 延遲隊列”,出現(xiàn)的都是一些講解時間輪或者只是提供了一些思路,并沒有一份真實可用的代碼實現(xiàn),今天我們就來打破這個現(xiàn)象,提供一份可運行的代碼,拋磚引玉,吸引更多的大神來分享。

基于kafka如何實現(xiàn)延遲隊列

想要解決一個問題,我們需要先分解問題。kafka作為一個高性能的消息隊列,只要消費能力足夠,發(fā)出的消息都是會立刻收到的,因此我們需要想一個辦法,讓消息延遲發(fā)送出去。

網(wǎng)上已經(jīng)有大神給出了如下方案:

  • 在發(fā)送延遲消息時不直接發(fā)送到目標topic,而是發(fā)送到一個用于處理延遲消息的topic,例如delay-minutes-1
  • 寫一段代碼拉取delay-minutes-1中的消息,將滿足條件的消息發(fā)送到真正的目標主題里。

就像畫一匹馬一樣簡單。

方案是好的,但是我們還需要更多細節(jié)。

完善細節(jié)

問題出在哪里?

問題出在延遲消息發(fā)出去之后,代碼程序就會立刻收到延遲消息,要如何處理才能讓延遲消息等待一段時間才發(fā)送到真正的topic里面。

可能有同學會覺得很簡單嘛,在代碼程序收到消息之后判斷條件不滿足,就調(diào)用sleep方法,過了一段時間我再進行下一個循環(huán)拉取消息。

真的可行嗎?

一切好像都很美好,但這是不可行的。

這是因為在輪詢kafka拉取消息的時候,它會返回由max.poll.records配置指定的一批消息,但是當程序代碼不能在max.poll.interval.ms配置的期望時間內(nèi)處理這些消息的話,kafka就會認為這個消費者已經(jīng)掛了,會進行rebalance,同時你這個消費者就無法再拉取到任何消息了。

舉個例子:當你需要一個24小時的延遲消息隊列,在代碼里面寫下了Thread.sleep(1000*60*60*24);,為了不發(fā)生rebalance,你把max.poll.interval.ms 也改成了1000*60*60*24,這個時候你或許會感覺到一絲絲的怪異,我是誰?我在哪?我為什么要寫出來這樣的代碼?

其實我們可以更優(yōu)雅的處理這個問題。

KafkaConsumer 提供了暫停和恢復的API函數(shù),調(diào)用消費者的暫停方法后就無法再拉取到新的消息,同時長時間不消費kafka也不會認為這個消費者已經(jīng)掛掉了。另外為了能夠更加優(yōu)雅,我們會啟動一個定時器來替換sleep。,完整流程如下圖,當消費者發(fā)現(xiàn)消息不滿足條件時,我們就暫停消費者,并把偏移量seek到上一次消費的位置以便等待下一個周期再次消費這條消息。

Java代碼實現(xiàn)

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;

@SpringBootTest
public class DelayQueueTest {

    private KafkaConsumer<String, String> consumer;
    private KafkaProducer<String, String> producer;
    private volatile Boolean exit = false;
    private final Object lock = new Object();
    private final String servers = "";

    @BeforeEach
    void initConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "d");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "5000");
        consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
    }

    @BeforeEach
    void initProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producer = new KafkaProducer<>(props);
    }

    @Test
    void testDelayQueue() throws JsonProcessingException, InterruptedException {
        String topic = "delay-minutes-1";
        List<String> topics = Collections.singletonList(topic);
        consumer.subscribe(topics);

        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                synchronized (lock) {
                    consumer.resume(consumer.paused());
                    lock.notify();
                }
            }
        }, 0, 1000);

        do {

            synchronized (lock) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(200));

                if (consumerRecords.isEmpty()) {
                    lock.wait();
                    continue;
                }

                boolean timed = false;
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    long timestamp = consumerRecord.timestamp();
                    TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
                    if (timestamp + 60 * 1000 < System.currentTimeMillis()) {

                        String value = consumerRecord.value();
                        ObjectMapper objectMapper = new ObjectMapper();
                        JsonNode jsonNode = objectMapper.readTree(value);
                        JsonNode jsonNodeTopic = jsonNode.get("topic");

                        String appTopic = null, appKey = null, appValue = null;

                        if (jsonNodeTopic != null) {
                            appTopic = jsonNodeTopic.asText();
                        }
                        if (appTopic == null) {
                            continue;
                        }
                        JsonNode jsonNodeKey = jsonNode.get("key");
                        if (jsonNodeKey != null) {
                            appKey = jsonNode.asText();
                        }

                        JsonNode jsonNodeValue = jsonNode.get("value");
                        if (jsonNodeValue != null) {
                            appValue = jsonNodeValue.asText();
                        }
                        // send to application topic
                        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(appTopic, appKey, appValue);
                        try {
                            producer.send(producerRecord).get();
                            // success. commit message
                            OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1);
                            HashMap<TopicPartition, OffsetAndMetadata> metadataHashMap = new HashMap<>();
                            metadataHashMap.put(topicPartition, offsetAndMetadata);
                            consumer.commitSync(metadataHashMap);
                        } catch (ExecutionException e) {
                            consumer.pause(Collections.singletonList(topicPartition));
                            consumer.seek(topicPartition, consumerRecord.offset());
                            timed = true;
                            break;
                        }
                    } else {
                        consumer.pause(Collections.singletonList(topicPartition));
                        consumer.seek(topicPartition, consumerRecord.offset());
                        timed = true;
                        break;
                    }
                }

                if (timed) {
                    lock.wait();
                }
            }
        } while (!exit);

    }
}

這段程序是基于SpringBoot 2.4.4版本和 kafka-client 2.7.0版本編寫的一個單元測試,需要修改私有變量servers為kafka broker的地址。

在啟動程序后,向Topic delay-minutes-1 發(fā)送如以下格式的json字符串數(shù)據(jù)

{
    "topic": "target",
    "key": "key1",
    "value": "value1"
}

同時啟動一個消費者監(jiān)聽topic target,在一分鐘后,將會收到一條 key="key1", value="value1"的數(shù)據(jù)。

源代碼地址

還需要做什么

創(chuàng)建多個topic用于處理不同時間的延遲消息,例如delay-minutes-1 delay-minutes-5 delay-minutes-10 delay-minutes-15以提供指數(shù)級別的延遲時間,這樣比一個topic要好很多,畢竟在順序拉取消息的時候,有一條消息不滿足條件,后面的將全部進行排隊。

 到此這篇關于Java Kafka實現(xiàn)延遲隊列的示例代碼的文章就介紹到這了,更多相關Java Kafka延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • 詳細談談Java中l(wèi)ong和double的原子性

    詳細談談Java中l(wèi)ong和double的原子性

    原子性是指一個操作或多個操作要么全部執(zhí)行,且執(zhí)行的過程不會被任何因素打斷,要么就都不執(zhí)行,下面這篇文章主要給大家介紹了關于Java中l(wèi)ong和double原子性的相關資料,需要的朋友可以參考下
    2021-08-08
  • Java棧的應用之括號匹配算法實例分析

    Java棧的應用之括號匹配算法實例分析

    這篇文章主要介紹了Java棧的應用之括號匹配算法,結(jié)合實例形式分析了Java使用棧實現(xiàn)括號匹配算法的相關原理、操作技巧與注意事項,需要的朋友可以參考下
    2020-03-03
  • Javaweb基礎入門HTML之table與form

    Javaweb基礎入門HTML之table與form

    HTML的全稱為超文本標記語言,是一種標記語言。它包括一系列標簽.通過這些標簽可以將網(wǎng)絡上的文檔格式統(tǒng)一,使分散的Internet資源連接為一個邏輯整體。HTML文本是由HTML命令組成的描述性文本,HTML命令可以說明文字,圖形、動畫、聲音、表格、鏈接等
    2022-03-03
  • Java 正確地從類路徑中獲取資源

    Java 正確地從類路徑中獲取資源

    Java 有能力從類路徑中查找獲取資源,可將資源放在 CLASSPATH 里,也可打包到 Jar 中。本文將具體講述獲取資源的步驟,感興趣的朋友可以了解下
    2021-05-05
  • 解決Java中SimpleDateFormat線程不安全的五種方案

    解決Java中SimpleDateFormat線程不安全的五種方案

    SimpleDateFormat 就是一個典型的線程不安全事例,本文主要介紹了解決Java中SimpleDateFormat線程不安全的五種方案,需要的朋友們下面隨著小編來一起學習學習吧
    2021-05-05
  • Spring手寫簡化版MVC流程詳解

    Spring手寫簡化版MVC流程詳解

    Spring MVC是Spring Framework的一部分,是基于Java實現(xiàn)MVC的輕量級Web框架。本文將通過簡單示例帶大家掌握SpringMVC簡化版手寫方法,感興趣的可以了解一下
    2022-11-11
  • Java Random.nextInt()方法原理解析

    Java Random.nextInt()方法原理解析

    這篇文章主要介紹了Java Random.nextInt()方法原理解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-09-09
  • Mybatis中typeAliases標簽和package標簽使用

    Mybatis中typeAliases標簽和package標簽使用

    這篇文章主要介紹了Mybatis中typeAliases標簽和package標簽使用,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • struts2的流程和一系列相關知識代碼解析

    struts2的流程和一系列相關知識代碼解析

    這篇文章主要介紹了struts2的流程和一系列相關知識代碼解析,具有一定借鑒價值,需要的朋友可以參考下。
    2017-12-12
  • Spring中@Autowired和@Resource注解相同點和不同點

    Spring中@Autowired和@Resource注解相同點和不同點

    這篇文章主要介紹了Spring中@Autowired和@Resource注解相同點和不同點,本文通過示例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2024-01-01

最新評論