亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

springboot使用kafka事務(wù)的示例代碼

 更新時間:2024年06月14日 09:38:31   作者:????????五敷有你  
Kafka?同數(shù)據(jù)庫一樣支持事務(wù),當(dāng)發(fā)生異常的時候可以進(jìn)行回滾,確保消息監(jiān)聽器不會接收到一些錯誤的或者不需要的消息,本文就來介紹一下springboot使用kafka事務(wù)的示例代碼,具有一定的參考價值,感興趣的可以了解一下

先看下下面這種情況,程序都出錯了,按理說消息也不應(yīng)該成功

@GetMapping("/send")
public void test9(String message) {
    kafkaTemplate.send(topic, message);
    throw new RuntimeException("fail");
}

但是執(zhí)行結(jié)果是發(fā)生了異常并且消息發(fā)送成功了:

Kafka 同數(shù)據(jù)庫一樣支持事務(wù),當(dāng)發(fā)生異常的時候可以進(jìn)行回滾,確保消息監(jiān)聽器不會接收到一些錯誤的或者不需要的消息。

kafka事務(wù)屬性是指一系列的生產(chǎn)者生產(chǎn)消息和消費(fèi)者提交偏移量的操作在一個事務(wù),或者說是是一個原子操作),同時成功或者失敗。使用事務(wù)也很簡單,需要先開啟事務(wù)支持,然后再使用。

如何開啟事務(wù)

如果使用默認(rèn)配置只需要在yml添加spring.kafka.producer.transaction-id-prefix配置來開啟事務(wù),之前沒有使用默認(rèn)的配置,自定義的kafkaTemplate,那么需要在ProducerFactory中設(shè)置事務(wù)Id前綴開啟事務(wù)并將KafkaTransactionManager注入到spring中,看下KafkaProducerConfig完整代碼:

@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.producer.servers}")
private String servers;
@Value("${kafka.producer.retries}")
private int retries;
 
public Map<String,Object> producerConfigs(){
    Map<String,Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
    props.put(ProducerConfig.RETRIES_CONFIG,retries);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // 配置分區(qū)策略
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.example.springbootkafka.config.CustomizePartitioner");
    // 配置生產(chǎn)者攔截器
    props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.example.springbootkafka.interceptor.CustomProducerInterceptor");
    // 配置攔截器消息處理類
    SendMessageInterceptorUtil sendMessageInterceptorUtil = new SendMessageInterceptorUtil();
    props.put("interceptorUtil",sendMessageInterceptorUtil);
    return props;
}
 
@Bean
public ProducerFactory<String,String> producerFactory(){
    DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(producerConfigs());
    //設(shè)置事務(wù)Id前綴 開啟事務(wù)
    producerFactory.setTransactionIdPrefix("tx-");
    return producerFactory;
}
 
@Bean
public KafkaTemplate<String,String> kafkaTemplate(){
    return new KafkaTemplate<>(producerFactory());
}
 
@Bean
public KafkaTransactionManager<Integer, String> kafkaTransactionManager(ProducerFactory<String, String> producerFactory) {
    return new KafkaTransactionManager(producerFactory);
}
} 

配置開啟事務(wù)后,使用大體有兩種方式,先記錄下第一種使用事務(wù)方式:使用 executeInTransaction 方法

直接看下代碼:

@GetMapping("/send11")
public void test11(String message) {
    kafkaTemplate.executeInTransaction(operations ->{
        operations.send(topic,message);
        throw new RuntimeException("fail");
    });
}

當(dāng)然你可以這么寫:

@GetMapping("/send11")
public void test11(String message) {
    kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback(){
        @Override
        public Object doInOperations(KafkaOperations operations) {
            operations.send(topic,message);
            throw new RuntimeException("fail");
        }
    });
}

啟動項(xiàng)目,訪問http://localhost:8080/send10?message=test10 結(jié)果如下:

如上:消費(fèi)者沒打印消息,說明消息沒發(fā)送成功,并且前面會報錯org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted 的錯誤,說明事務(wù)生效了。

第一種使用事務(wù)方式:使用 @Transactional 注解方式 直接在方法上加上@Transactional注解即可,看下代碼:

@GetMapping("/send12")
@Transactional
public void test12(String message) {
    kafkaTemplate.send(topic, message);
    throw new RuntimeException("fail");
}

如果開啟的事務(wù),則后續(xù)發(fā)送消息必須使用@Transactional注解或者使用kafkaTemplate.executeInTransaction() ,否則拋出異常,異常信息如下:

貼下完整的異常吧:java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record

到此這篇關(guān)于springboot使用kafka事務(wù)的文章就介紹到這了,更多相關(guān)springboot kafka事務(wù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Jenkins一鍵打包部署SpringBoot應(yīng)用的方法步驟

    Jenkins一鍵打包部署SpringBoot應(yīng)用的方法步驟

    本文主要介紹了使用Jenkins一鍵打包部署SpringBoot應(yīng)用的方法步驟,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-12-12
  • 詳解Java如何關(guān)閉線程以及線程池

    詳解Java如何關(guān)閉線程以及線程池

    java如何正確關(guān)閉線程以及線程池是一個高頻的面試題,本文將為大家詳細(xì)介紹實(shí)現(xiàn)的方法與代碼,感興趣的小伙伴快跟隨小編一起學(xué)習(xí)一下
    2022-04-04
  • @SpringBootApplication注解的使用

    @SpringBootApplication注解的使用

    這篇文章主要介紹了@SpringBootApplication注解的使用,幫助大家更好的理解和學(xué)習(xí)使用springboot框架,感興趣的朋友可以了解下
    2021-04-04
  • 快速排序算法原理及java遞歸實(shí)現(xiàn)

    快速排序算法原理及java遞歸實(shí)現(xiàn)

    快速排序 對冒泡排序的一種改進(jìn),若初始記錄序列按關(guān)鍵字有序或基本有序,蛻化為冒泡排序。使用的是遞歸原理,在所有同數(shù)量級O(n longn) 的排序方法中,其平均性能最好。就平均時間而言,是目前被認(rèn)為最好的一種內(nèi)部排序方法
    2014-01-01
  • Java 如何在switch case語句中聲明變量

    Java 如何在switch case語句中聲明變量

    這篇文章主要介紹了Java 如何在switch case語句中聲明變量,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • 淺談java安全編碼指南之堆污染

    淺談java安全編碼指南之堆污染

    什么是堆污染呢?是指當(dāng)參數(shù)化類型變量引用的對象不是該參數(shù)化類型的對象時而發(fā)生的。我們知道在JDK5中,引入了泛型的概念,在創(chuàng)建集合類的時候,指定該集合類中應(yīng)該存儲的對象類型。如果在指定類型的集合中,引用了不同的類型,那么這種情況就叫做堆污染。
    2021-06-06
  • 將Java對象序列化成JSON和XML格式的實(shí)例

    將Java對象序列化成JSON和XML格式的實(shí)例

    下面小編就為大家分享一篇將Java對象序列化成JSON和XML格式的實(shí)例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2017-12-12
  • Java超詳細(xì)精講數(shù)據(jù)結(jié)構(gòu)之bfs與雙端隊(duì)列

    Java超詳細(xì)精講數(shù)據(jù)結(jié)構(gòu)之bfs與雙端隊(duì)列

    廣搜BFS的基本思想是: 首先訪問初始點(diǎn)v并將其標(biāo)志為已經(jīng)訪問。接著通過鄰接關(guān)系將鄰接點(diǎn)入隊(duì)。然后每訪問過一個頂點(diǎn)則出隊(duì)。按照順序,訪問每一個頂點(diǎn)的所有未被訪問過的頂點(diǎn)直到所有的頂點(diǎn)均被訪問過。廣度優(yōu)先遍歷類似與層次遍歷
    2022-07-07
  • Java在Linux下 不能處理圖形的解決辦法 分享

    Java在Linux下 不能處理圖形的解決辦法 分享

    Java在Linux下 不能處理圖形的解決辦法 分享,需要的朋友可以參考一下
    2013-06-06
  • Spring事務(wù)原理解析

    Spring事務(wù)原理解析

    Spring事務(wù)有可能會提交,回滾、掛起、恢復(fù),所以Spring事務(wù)提供了一種機(jī)制,可以讓程序員來監(jiān)聽當(dāng)前Spring事務(wù)所處于的狀態(tài),這篇文章主要介紹了Spring底層事務(wù)原理,需要的朋友可以參考下
    2022-12-12

最新評論