spring-cloud-stream的手動(dòng)消息確認(rèn)問題
spring-cloud-stream的手動(dòng)消息確認(rèn)
對(duì)于kafka-binder來說,設(shè)置autoCommitOffset為false.然后在listen中手動(dòng)確認(rèn)
@StreamListener(Sink.INPUT)
void listen(@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment){
? ? //...業(yè)務(wù)代碼
? ? acknowledgment.acknowledge();
}需要注意的是autoCommitOffset的設(shè)置位置.
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false#應(yīng)該在這里設(shè)置 spring.cloud.stream.bindings.input.consumer.autoCommitOffset=false#這里設(shè)置是無效的,獲取Acknowledgment時(shí)會(huì)是null
springcloud的stream消息組件的使用@StreamListener
常見問題(使用rabbitmq)
消息分組防止多實(shí)例重復(fù)消費(fèi)
在一個(gè)服務(wù)多實(shí)例場(chǎng)景下使用默認(rèn)使用@StreamListener監(jiān)聽消息消費(fèi),yml中沒有特殊配置的話是會(huì)導(dǎo)致消息重復(fù)消費(fèi)的,原因是此時(shí)每個(gè)實(shí)例都是匿名在rabbitmq上注冊(cè)的隊(duì)列,需要給消費(fèi)者指定一個(gè)消費(fèi)組,讓消息在組里只被消費(fèi)一次;
spring.cloud.stream.bindings.xxx(消費(fèi)者隊(duì)列名).group=xxx(組名)
在springboot下在同一個(gè)服務(wù)(項(xiàng)目中)使用@input和@outPut時(shí)指定的隊(duì)列名是不可以重復(fù)的.會(huì)在啟動(dòng)編譯的時(shí)候報(bào)bean定義重復(fù)。需要在yml給生產(chǎn)者和消費(fèi)者指定同一個(gè)交換機(jī)。
spring:
rabbitmq:
host: xxx.xxx.xxx.xx
port: 35672
username: xxx
password: xxx
virtual-host: /xxx
cloud:
stream:
bindings:
in:
#若消息系統(tǒng)是RabbitMQ,目的地(destination)就是指exchange,消息系統(tǒng)是Kafka,那么就是指topic
destination: test
#在多實(shí)例的時(shí)候需要制定一個(gè)消息分組,不然每個(gè)實(shí)例都是匿名方式把隊(duì)列注冊(cè)到rabbitmq上去,導(dǎo)致一個(gè)交換機(jī)下有多個(gè)隊(duì)列
#并且默認(rèn)生成的交換機(jī)是topic類型的,會(huì)導(dǎo)致重復(fù)消費(fèi)
group: myIn
out:
destination: test先上依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.fchan</groupId>
<artifactId>springcloudstream</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springcloudstream</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<!-- <version>2.0.1.RELEASE</version>-->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>Ditmars.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>再上yml配置
spring:
rabbitmq:
host: xxx.xxx.xxx.xx
port: 35672
username: xxx
password: xxx
virtual-host: /xxx
cloud:
stream:
bindings:
in:
#若消息系統(tǒng)是RabbitMQ,目的地(destination)就是指exchange,消息系統(tǒng)是Kafka,那么就是指topic
destination: test
#在多實(shí)例的時(shí)候需要制定一個(gè)消息分組,不然每個(gè)實(shí)例都是匿名方式把隊(duì)列注冊(cè)到rabbitmq上去,導(dǎo)致一個(gè)交換機(jī)下有多個(gè)隊(duì)列
#并且默認(rèn)生成的交換機(jī)是topic類型的,會(huì)導(dǎo)致重復(fù)消費(fèi)
group: myIn
out:
destination: test消息生產(chǎn)者
package com.fchan.springcloudstream.service;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface MyMessageChannel {
String out = "out";
String in = "in";
@Output(out)
MessageChannel out();
@Input(in)
SubscribableChannel in();
}發(fā)送消息
package com.fchan.springcloudstream.controller;
import com.fchan.springcloudstream.service.MyMessageChannel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
@RestController
public class MessageController {
@Resource
private MyMessageChannel myMessageChannel;
@RequestMapping("test")
public String testMessage(){
Map<String,Object> map = new HashMap<>();
map.put("shopId", "123");
myMessageChannel.out().send(MessageBuilder.withPayload(map).build());
return "success";
}
}消息消費(fèi)者
package com.fchan.springcloudstream.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@EnableBinding({MyMessageChannel.class})
public class MyConsumer {
Logger log = LoggerFactory.getLogger(MyConsumer.class);
@StreamListener(MyMessageChannel.in)
public void input(Message<Map<String,Object>> message){
log.info("收到消息:{}", message.getPayload());
}
}總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- SpringCloudStream原理和深入使用小結(jié)
- SpringCloud中的Stream服務(wù)間消息傳遞詳解
- 使用Spring?Cloud?Stream處理事件的示例詳解
- SpringCloudStream中的消息分區(qū)數(shù)詳解
- 關(guān)于SpringCloudStream配置問題
- Spring Cloud Stream 高級(jí)特性使用詳解
- SpringCloud微服務(wù)開發(fā)基于RocketMQ實(shí)現(xiàn)分布式事務(wù)管理詳解
- SpringCloud+RocketMQ實(shí)現(xiàn)分布式事務(wù)的實(shí)踐
- Spring Cloud Stream整合RocketMQ的搭建方法
相關(guān)文章
vue+導(dǎo)航錨點(diǎn)聯(lián)動(dòng)-滾動(dòng)監(jiān)聽和點(diǎn)擊平滑滾動(dòng)跳轉(zhuǎn)實(shí)例
今天小編就為大家分享一篇vue+導(dǎo)航錨點(diǎn)聯(lián)動(dòng)-滾動(dòng)監(jiān)聽和點(diǎn)擊平滑滾動(dòng)跳轉(zhuǎn)實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-11-11
html-webpack-plugin修改頁面的title的方法
這篇文章主要介紹了html-webpack-plugin修改頁面的title的方法 ,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-06-06
Vue利用canvas實(shí)現(xiàn)移動(dòng)端手寫板的方法
本篇文章主要介紹了Vue利用canvas實(shí)現(xiàn)移動(dòng)端手寫板的方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-05-05
Vue-Router2.X多種路由實(shí)現(xiàn)方式總結(jié)
下面小編就為大家分享一篇Vue-Router2.X多種路由實(shí)現(xiàn)方式總結(jié),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-02-02
Vue.js監(jiān)聽select2的值改變進(jìn)行查詢方式
這篇文章主要介紹了Vue.js監(jiān)聽select2的值改變進(jìn)行查詢方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-04-04
vue-resourse將json數(shù)據(jù)輸出實(shí)例
這篇文章主要為大家詳細(xì)介紹了vue-resourse將json數(shù)據(jù)輸出實(shí)例,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-03-03
解決vue項(xiàng)目 build之后資源文件找不到的問題
這篇文章主要介紹了解決vue項(xiàng)目 build之后資源文件找不到的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-09-09
vue使用Vue.extend創(chuàng)建全局toast組件實(shí)例
這篇文章主要介紹了vue使用Vue.extend創(chuàng)建全局toast組件實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-03-03

