spring-cloud-stream的手動消息確認問題
spring-cloud-stream的手動消息確認
對于kafka-binder來說,設置autoCommitOffset為false.然后在listen中手動確認
@StreamListener(Sink.INPUT) void listen(@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment){ ? ? //...業(yè)務代碼 ? ? acknowledgment.acknowledge(); }
需要注意的是autoCommitOffset的設置位置.
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false#應該在這里設置 spring.cloud.stream.bindings.input.consumer.autoCommitOffset=false#這里設置是無效的,獲取Acknowledgment時會是null
springcloud的stream消息組件的使用@StreamListener
常見問題(使用rabbitmq)
消息分組防止多實例重復消費
在一個服務多實例場景下使用默認使用@StreamListener監(jiān)聽消息消費,yml中沒有特殊配置的話是會導致消息重復消費的,原因是此時每個實例都是匿名在rabbitmq上注冊的隊列,需要給消費者指定一個消費組,讓消息在組里只被消費一次;
spring.cloud.stream.bindings.xxx(消費者隊列名).group=xxx(組名)
在springboot下在同一個服務(項目中)使用@input和@outPut時指定的隊列名是不可以重復的.會在啟動編譯的時候報bean定義重復。需要在yml給生產(chǎn)者和消費者指定同一個交換機。
spring: rabbitmq: host: xxx.xxx.xxx.xx port: 35672 username: xxx password: xxx virtual-host: /xxx cloud: stream: bindings: in: #若消息系統(tǒng)是RabbitMQ,目的地(destination)就是指exchange,消息系統(tǒng)是Kafka,那么就是指topic destination: test #在多實例的時候需要制定一個消息分組,不然每個實例都是匿名方式把隊列注冊到rabbitmq上去,導致一個交換機下有多個隊列 #并且默認生成的交換機是topic類型的,會導致重復消費 group: myIn out: destination: test
先上依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.8.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.fchan</groupId> <artifactId>springcloudstream</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springcloudstream</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <!-- <version>2.0.1.RELEASE</version>--> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-dependencies</artifactId> <version>Ditmars.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
再上yml配置
spring: rabbitmq: host: xxx.xxx.xxx.xx port: 35672 username: xxx password: xxx virtual-host: /xxx cloud: stream: bindings: in: #若消息系統(tǒng)是RabbitMQ,目的地(destination)就是指exchange,消息系統(tǒng)是Kafka,那么就是指topic destination: test #在多實例的時候需要制定一個消息分組,不然每個實例都是匿名方式把隊列注冊到rabbitmq上去,導致一個交換機下有多個隊列 #并且默認生成的交換機是topic類型的,會導致重復消費 group: myIn out: destination: test
消息生產(chǎn)者
package com.fchan.springcloudstream.service; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; public interface MyMessageChannel { String out = "out"; String in = "in"; @Output(out) MessageChannel out(); @Input(in) SubscribableChannel in(); }
發(fā)送消息
package com.fchan.springcloudstream.controller; import com.fchan.springcloudstream.service.MyMessageChannel; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; @RestController public class MessageController { @Resource private MyMessageChannel myMessageChannel; @RequestMapping("test") public String testMessage(){ Map<String,Object> map = new HashMap<>(); map.put("shopId", "123"); myMessageChannel.out().send(MessageBuilder.withPayload(map).build()); return "success"; } }
消息消費者
package com.fchan.springcloudstream.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import java.util.Map; @Component @EnableBinding({MyMessageChannel.class}) public class MyConsumer { Logger log = LoggerFactory.getLogger(MyConsumer.class); @StreamListener(MyMessageChannel.in) public void input(Message<Map<String,Object>> message){ log.info("收到消息:{}", message.getPayload()); } }
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
vue+導航錨點聯(lián)動-滾動監(jiān)聽和點擊平滑滾動跳轉(zhuǎn)實例
今天小編就為大家分享一篇vue+導航錨點聯(lián)動-滾動監(jiān)聽和點擊平滑滾動跳轉(zhuǎn)實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-11-11html-webpack-plugin修改頁面的title的方法
這篇文章主要介紹了html-webpack-plugin修改頁面的title的方法 ,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-06-06Vue-Router2.X多種路由實現(xiàn)方式總結(jié)
下面小編就為大家分享一篇Vue-Router2.X多種路由實現(xiàn)方式總結(jié),具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-02-02Vue.js監(jiān)聽select2的值改變進行查詢方式
這篇文章主要介紹了Vue.js監(jiān)聽select2的值改變進行查詢方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-04-04vue-resourse將json數(shù)據(jù)輸出實例
這篇文章主要為大家詳細介紹了vue-resourse將json數(shù)據(jù)輸出實例,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-03-03vue使用Vue.extend創(chuàng)建全局toast組件實例
這篇文章主要介紹了vue使用Vue.extend創(chuàng)建全局toast組件實例,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-03-03