springboot使用kafka事務(wù)的示例代碼
先看下下面這種情況,程序都出錯了,按理說消息也不應(yīng)該成功
@GetMapping("/send") public void test9(String message) { kafkaTemplate.send(topic, message); throw new RuntimeException("fail"); }
但是執(zhí)行結(jié)果是發(fā)生了異常并且消息發(fā)送成功了:
Kafka 同數(shù)據(jù)庫一樣支持事務(wù),當(dāng)發(fā)生異常的時候可以進(jìn)行回滾,確保消息監(jiān)聽器不會接收到一些錯誤的或者不需要的消息。
kafka事務(wù)屬性是指一系列的生產(chǎn)者生產(chǎn)消息和消費(fèi)者提交偏移量的操作在一個事務(wù),或者說是是一個原子操作),同時成功或者失敗。使用事務(wù)也很簡單,需要先開啟事務(wù)支持,然后再使用。
如何開啟事務(wù)
如果使用默認(rèn)配置只需要在yml添加spring.kafka.producer.transaction-id-prefix配置來開啟事務(wù),之前沒有使用默認(rèn)的配置,自定義的kafkaTemplate,那么需要在ProducerFactory中設(shè)置事務(wù)Id前綴開啟事務(wù)并將KafkaTransactionManager注入到spring中,看下KafkaProducerConfig完整代碼:
@Configuration @EnableKafka public class KafkaProducerConfig { @Value("${kafka.producer.servers}") private String servers; @Value("${kafka.producer.retries}") private int retries; public Map<String,Object> producerConfigs(){ Map<String,Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG,retries); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 配置分區(qū)策略 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.example.springbootkafka.config.CustomizePartitioner"); // 配置生產(chǎn)者攔截器 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.example.springbootkafka.interceptor.CustomProducerInterceptor"); // 配置攔截器消息處理類 SendMessageInterceptorUtil sendMessageInterceptorUtil = new SendMessageInterceptorUtil(); props.put("interceptorUtil",sendMessageInterceptorUtil); return props; } @Bean public ProducerFactory<String,String> producerFactory(){ DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(producerConfigs()); //設(shè)置事務(wù)Id前綴 開啟事務(wù) producerFactory.setTransactionIdPrefix("tx-"); return producerFactory; } @Bean public KafkaTemplate<String,String> kafkaTemplate(){ return new KafkaTemplate<>(producerFactory()); } @Bean public KafkaTransactionManager<Integer, String> kafkaTransactionManager(ProducerFactory<String, String> producerFactory) { return new KafkaTransactionManager(producerFactory); } }
配置開啟事務(wù)后,使用大體有兩種方式,先記錄下第一種使用事務(wù)方式:使用 executeInTransaction 方法
直接看下代碼:
@GetMapping("/send11") public void test11(String message) { kafkaTemplate.executeInTransaction(operations ->{ operations.send(topic,message); throw new RuntimeException("fail"); }); }
當(dāng)然你可以這么寫:
@GetMapping("/send11") public void test11(String message) { kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback(){ @Override public Object doInOperations(KafkaOperations operations) { operations.send(topic,message); throw new RuntimeException("fail"); } }); }
啟動項(xiàng)目,訪問http://localhost:8080/send10?message=test10 結(jié)果如下:
如上:消費(fèi)者沒打印消息,說明消息沒發(fā)送成功,并且前面會報錯org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted 的錯誤,說明事務(wù)生效了。
第一種使用事務(wù)方式:使用 @Transactional 注解方式 直接在方法上加上@Transactional注解即可,看下代碼:
@GetMapping("/send12") @Transactional public void test12(String message) { kafkaTemplate.send(topic, message); throw new RuntimeException("fail"); }
如果開啟的事務(wù),則后續(xù)發(fā)送消息必須使用@Transactional注解或者使用kafkaTemplate.executeInTransaction() ,否則拋出異常,異常信息如下:
貼下完整的異常吧:java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
到此這篇關(guān)于springboot使用kafka事務(wù)的文章就介紹到這了,更多相關(guān)springboot kafka事務(wù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- springboot使用kafka推送數(shù)據(jù)到服務(wù)端的操作方法帶認(rèn)證
- 如何使用SpringBoot集成Kafka實(shí)現(xiàn)用戶數(shù)據(jù)變更后發(fā)送消息
- Kafka的安裝及接入SpringBoot的詳細(xì)過程
- springboot使用@KafkaListener監(jiān)聽多個kafka配置實(shí)現(xiàn)
- SpringBoot如何集成Kafka低版本和高版本
- springboot如何配置多kafka
- kafka springBoot配置的實(shí)現(xiàn)
- Springboot項(xiàng)目消費(fèi)Kafka數(shù)據(jù)的方法
相關(guān)文章
Jenkins一鍵打包部署SpringBoot應(yīng)用的方法步驟
本文主要介紹了使用Jenkins一鍵打包部署SpringBoot應(yīng)用的方法步驟,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-12-12Java超詳細(xì)精講數(shù)據(jù)結(jié)構(gòu)之bfs與雙端隊(duì)列
廣搜BFS的基本思想是: 首先訪問初始點(diǎn)v并將其標(biāo)志為已經(jīng)訪問。接著通過鄰接關(guān)系將鄰接點(diǎn)入隊(duì)。然后每訪問過一個頂點(diǎn)則出隊(duì)。按照順序,訪問每一個頂點(diǎn)的所有未被訪問過的頂點(diǎn)直到所有的頂點(diǎn)均被訪問過。廣度優(yōu)先遍歷類似與層次遍歷2022-07-07