Java使用WebFlux調(diào)用大模型實(shí)現(xiàn)智能對(duì)話
1.引入依賴
如果使用了tomcat作為容器需要排除tomcat,webflux使用Netty作為容器
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
2.定義請(qǐng)求類和接收類
AiPromptDto 用于接收用戶輸入信息
@Data
public class AiPromptDto {
/**
* 大模型id
*/
private String serviceId;
/**
* 用戶輸入
*/
private String userInput;
/**
* sessionId
*/
private String sessionId;
/**
* 請(qǐng)求id
*/
private String requestId;
/**
* 獲取token
*/
private String token;
}
答案接收對(duì)象
@Data
@AllArgsConstructor
public class AnswerChunk {
/**
* 返回的內(nèi)容
*/
private String content;
private String sessionId;
}
3.修改application.yml
此處配置response沒有緩存,否則可能會(huì)阻塞,不會(huì)實(shí)時(shí)返回
reactor:
netty:
response:
buffer-size: 0
4.測試大模型獲取數(shù)據(jù)格式
1.歡迎詞
userinput:你好?
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"messageId":"d47cce80-bcf0-49fe-8e23-06bb5ab79af3","messageContent":"消息1:我是一個(gè)聊天機(jī)器人,這里是我的消息"}
id:[DONE]
data:[DONE]
2.問答
userinput:物料00NY681的庫存有多少個(gè)?
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"庫"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"存"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"中"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"物"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"料"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"0"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"0"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"N"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"Y"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"6"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"8"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"1"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"的"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"數(shù)"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"量"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"為"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"1"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"個(gè)"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"。"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"inquiryList":"[\"物料00NY681的庫存是否充足?\",\"物料00NY681的庫存位置在哪里?\",\"如何補(bǔ)充物料00NY681的庫存?\"]"}
id:[DONE]
data:[DONE]
5.定義Service接口和實(shí)現(xiàn)類
webflux返回Mono或者Flux
public interface AiService {
/**
* 根據(jù)請(qǐng)求獲取流式返回的答案
* @param request
* @return
*/
Flux<AnswerChunk> processStream(AiPromptDto request);
}
實(shí)現(xiàn)類AIServiceImpl
import org.springframework.web.reactive.function.client.WebClient;
@Service
public class AIServiceImpl implements AiService {
private final WebClient webClient;
//初始化webClient,并ssl校驗(yàn),生產(chǎn)環(huán)境不要跳過
public AIServiceImpl(WebClient.Builder webClientBuilder) {
// 使用InsecureTrustManagerFactory來信任所有證書
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE);
HttpClient httpClient = HttpClient.create()
.secure(sslContextSpec -> sslContextSpec.sslContext(sslContextBuilder))
.responseTimeout(Duration.ofMinutes(timeout));
this.webClient = webClientBuilder.clientConnector(
new ReactorClientHttpConnector(httpClient)
).baseUrl(aiForceUrl).build();
}
@Override
public Flux<AnswerChunk> processStream(AiPromptDto request) {
String body = JSONUtil.toJsonStr(request);//參數(shù)都轉(zhuǎn)化為json字符串
return webClient.post()
.uri(aiForceUrl + "/aiforceplatformapi/openapi/llm/debugSse")//大模型地址
.bodyValue(body)//body參數(shù)
.header("token", request.getToken())//設(shè)置請(qǐng)求頭
.header("Content-Type", "application/json")
.retrieve()//retrieve 方法會(huì)從服務(wù)器響應(yīng)中提取數(shù)據(jù)
.bodyToFlux(String.class)//響應(yīng)體解析為一個(gè)流式的 String 類型序列
.map(chunk -> {//解析數(shù)據(jù)以供存儲(chǔ)
//System.out.println("chunk = " + chunk);
String content = "";
// 解析大模型返回?cái)?shù)據(jù)
if (!chunk.contains("[DONE]")) {//結(jié)束標(biāo)志
if (chunk.contains("inquiryList")) {//處理返回的關(guān)聯(lián)查詢列表
content = parseChunk(chunk);
finalAnswer[0].setQueryList(content);
}else if (chunk.contains("messageId")&&chunk.contains("messageContent")) {//處理返回提示message
parseMessage(chunk, messageMap);
} else if (chunk.contains("data")) {//處理返回的問題答案
content = parseChunk(chunk);
redisTemplate.opsForValue().append(request.getRequestId() + "_result", content);
} else if (chunk.contains("question")) {//處理返回question
//先刪除
questionService.deleteQuestionsByPreviousIdAndRequest(questionId, requestId);
//保存ai返回的question
} else if (chunk.contains("image")) {//處理圖片
parseImages(chunk, imagesUrl);
} else if (chunk.contains("referenceInfo")) {//處理參照信息
parseReference(chunk, aiAnswerReferenceList);
}
} else {
// 處理結(jié)束
end.set("[DONE]");
finalAnswer[0].setState("DONE");
}
if (StringUtils.isEmpty(chunk)) {
chunk = "";
}
return new AnswerChunk(chunk, request.getRequestId());
})
.doOnComplete(() -> {//答案都完成后存儲(chǔ)對(duì)應(yīng)數(shù)據(jù)到數(shù)據(jù)庫中
String finalContent = redisTemplate.opsForValue().get(request.getRequestId() + "_result");
redisTemplate.delete(request.getRequestId());
//保存答案
String returnAnswer = "";
JSONObject answer1 = new JSONObject().putOnce("data", finalContent);
//具體實(shí)現(xiàn)
})
.onErrorResume(e -> {//錯(cuò)誤情況處理
finalAnswer[0].setState("FAILED");
answerService.saveOrUpdate(finalAnswer[0]);
return Flux.error(e);
});
}
}
6.定義Controller
@RestController
@RequestMapping("/aiAgent")
public class AiForceController {
/**
* 獲取內(nèi)容
*
* @param request MediaType.TEXT_EVENT_STREAM_VALUE 流式輸出,否則會(huì)一次返回
* charset=UTF-8 字符集,不設(shè)置會(huì)亂碼
* 注意:使用get會(huì)中文亂碼
* @return
*/
@PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE + ";charset=UTF-8")
public Flux<ServerSentEvent<String>> streamResponse(@RequestBody AiForcePromptDto request) {
return aiService.processStream(request)
.limitRate(100) // 限制每秒最大請(qǐng)求數(shù)
.onBackpressureBuffer(100,//背壓策略:緩沖區(qū)大小為 100
buffer -> {
logger.warn("Backpressure buffer overflow, dropping {} items", buffer);
}).publishOn(Schedulers.boundedElastic(),1) // 單線程調(diào)度確保順序
.flatMap(chunk -> { // 使用 flatMap 將一個(gè)異步流中的每個(gè)元素映射為另一個(gè)流,并將這些流合并為一個(gè)單一的流
String content = chunk.getContent();
if (StringUtils.isNotBlank(content)) {
String processedContent = content.replaceAll("`{3}", "\n```"); // 規(guī)范代碼塊格式
return Flux.just(ServerSentEvent.<String>builder()
.id(request.getRequestId())
.data(processedContent)
.build());
}
return Flux.empty();//如果內(nèi)容為空,就返回空的flux
}, 1) // 設(shè)置并發(fā)度為 1,確保逐條發(fā)送
.doOnNext(event -> logger.info("Streaming chunk: {}", event.data())); // 日志記錄每次發(fā)送的數(shù)據(jù)
}
}
// Flux<ServerSentEvent<String>> 實(shí)現(xiàn) SSE(Server-Sent Events),以便客戶端可以實(shí)時(shí)接收服務(wù)器推送的消息
7.調(diào)用結(jié)果

注意:在部署時(shí),如果使用到了nginx需要配置

- chunked_transfer_encoding off 關(guān)閉分塊傳輸,會(huì)發(fā)送完整的數(shù)據(jù)
- proxy_buffering off #禁用代理緩沖,適用于流式傳輸
- gzip off ##關(guān)閉壓縮,數(shù)據(jù)以未壓縮的方式傳輸
- add_header Cache-Control “no-cache” header定義無緩存
- add_header X-Accel-Buffering no;##禁用 Nginx 的緩沖功能,確保數(shù)據(jù)實(shí)時(shí)傳輸
到此這篇關(guān)于Java使用WebFlux調(diào)用大模型實(shí)現(xiàn)智能對(duì)話的文章就介紹到這了,更多相關(guān)Java智能對(duì)話內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring mvc整合mybatis(crud+分頁插件)操作mysql
這篇文章主要介紹了Spring mvc整合mybatis(crud+分頁插件)操作mysql的步驟詳解,需要的朋友可以參考下2017-04-04
Java對(duì)象深復(fù)制與淺復(fù)制實(shí)例詳解
這篇文章主要介紹了 Java對(duì)象深復(fù)制與淺復(fù)制實(shí)例詳解的相關(guān)資料,需要的朋友可以參考下2017-05-05
java ThreadPool線程池的使用,線程池工具類用法說明
這篇文章主要介紹了java ThreadPool線程池的使用,線程池工具類用法說明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-10-10
淺談spring中isolation和propagation的用法
這篇文章主要介紹了淺談spring中isolation 和propagation的用法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07
Java內(nèi)存結(jié)構(gòu)和數(shù)據(jù)類型
本文重點(diǎn)給大家介紹java內(nèi)存結(jié)構(gòu)和數(shù)據(jù)類型知識(shí),非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友參考下2016-12-12
Java實(shí)現(xiàn)較大二進(jìn)制文件的讀、寫方法
本篇文章主要介紹了Java實(shí)現(xiàn)較大二進(jìn)制文件的讀、寫方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-02-02

