spring中websocket定時任務實現實時推送
有時候業(yè)務要求websocket連接后,服務端實時每隔一段時間就將數據推送給客戶端進行響應,這時就需要websocket+定時任務一起來實現實時推送數據給客戶端了。
使用的定時任務方式為spring的TaskScheduler對象實現任務調度。
TaskScheduler定時任務實現
TaskScheduler接口提供了多種調度方法來實現運行任務的執(zhí)行。
public interface TaskScheduler { //通過觸發(fā)器來決定task是否執(zhí)行 ScheduledFuture schedule(Runnable task, Trigger trigger); //在starttime的時候執(zhí)行一次 ScheduledFuture schedule(Runnable task, Date startTime); ScheduledFuture schedule(Runnable task, Instant startTime); //從starttime開始每個period時間段執(zhí)行一次task ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period); ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period); //每隔period執(zhí)行一次 ScheduledFuture scheduleAtFixedRate(Runnable task, long period); ScheduledFuture scheduleAtFixedRate(Runnable task, Duration period); //從startTime開始每隔delay長時間執(zhí)行一次 ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay); //每隔delay時間執(zhí)行一次 ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay); }
簡單測試一下
import cn.hutool.core.date.DateUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.Trigger; import org.springframework.scheduling.support.CronTrigger; import org.springframework.stereotype.Component; /** * The type Task scheduler test. * * @author yjj * @version 1.0 * @since 2022 -12-28 15:45:17 */ @Slf4j @Component @RequiredArgsConstructor public class TaskSchedulerTest { private final TaskScheduler taskScheduler; @Bean public void test() { //每隔3秒執(zhí)行一次 Trigger trigger = new CronTrigger("0/3 * * * * *"); //每隔1秒執(zhí)行一次 //Trigger trigger1 = new PeriodicTrigger(1, TimeUnit.SECONDS); taskScheduler.schedule(new MyThread(), trigger); } private class MyThread implements Runnable { @Override public void run() { log.info("定時執(zhí)行線程名稱=【{}】,執(zhí)行時間=【{}】", Thread.currentThread().getName(), DateUtil.date()); } } }
效果就是每個3秒執(zhí)行一次
websocket+定時任務實時推送
實現的業(yè)務需求如下:客戶端連上來以后就每隔3秒向客戶端實時推送消息。有關websocket的實現見文章websocket簡單實現
TestWebsocket.java
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.date.DateUtil; import cn.hutool.core.map.MapUtil; import cn.hutool.core.text.CharSequenceUtil; import cn.hutool.json.JSONUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.Trigger; import org.springframework.scheduling.support.CronTrigger; import org.springframework.stereotype.Component; import org.springframework.web.socket.*; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledFuture; /** * 測試websocket * * @author yjj * @version 1.0 * @since 2022 -12-28 14:55:29 */ @Slf4j @Component @RequiredArgsConstructor public class TestWebsocket implements WebSocketHandler { protected static final CopyOnWriteArrayList<WebSocketSession> WEB_SOCKET_SESSIONS = new CopyOnWriteArrayList<>(); /** * 定時任務集合 */ Map<String, ScheduledFuture<?>> stringScheduledFutureMap = new ConcurrentHashMap<>(); /** * taskScheduler */ private final TaskScheduler taskScheduler; /** * 建立連接后操作 * * @param session 連接session信息 * @throws Exception exception */ @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { sendMessage("連接成功~~~~~~,sessionId=" + session.getId()); WEB_SOCKET_SESSIONS.add(session); //設置定時任務,每隔3s執(zhí)行一次 Trigger trigger = new CronTrigger("0/3 * * * * *"); //開啟一個定時任務 ScheduledFuture<?> schedule = taskScheduler.schedule(new CustomizeTask(session.getId()), trigger); //根據session連接id定時任務線程存到map中 stringScheduledFutureMap.put(session.getId(), schedule); } private class CustomizeTask implements Runnable { private final String sessionId; CustomizeTask(String sessionId) { this.sessionId = sessionId; } @Override public void run() { try { String message = CharSequenceUtil.format("定時執(zhí)行線程名稱=【{}】,執(zhí)行時間=【{}】", Thread.currentThread().getName(), DateUtil.date()); sendMessage(JSONUtil.toJsonStr(message), sessionId); } catch (IOException e) { e.printStackTrace(); } } } /** * 接收到消息后的處理 * * @param session 連接session信息 * @param message 信息 * @throws Exception exception */ @Override public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { sendMessage("接收到的消息為=【" + message + "】,sessionId=【" + session.getId() + "】,回復消息=【你好呀!】"); } /** * ws連接出錯時調用 * * @param session session連接信息 * @param exception exception * @throws Exception exception */ @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { if (session.isOpen()) { sendMessage("ws連接出錯,即將關閉此session,sessionId=【" + session.getId() + "】"); session.close(); } WEB_SOCKET_SESSIONS.remove(session); } /** * 連接關閉后調用 * * @param session session連接信息 * @param closeStatus 關閉狀態(tài) * @throws Exception exception */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { if (session.isOpen()) { sendMessage("ws連接即將關閉此session,sessionId=【" + session.getId() + "】"); session.close(); } WEB_SOCKET_SESSIONS.remove(session); String sessionId = session.getId(); ScheduledFuture<?> scheduledFuture = MapUtil.get(stringScheduledFutureMap, sessionId, ScheduledFuture.class); if (scheduledFuture != null) { //暫停對應session的開啟的定時任務 scheduledFuture.cancel(true); //集合移除 stringScheduledFutureMap.remove(sessionId); } } /** * 是否支持分片消息 */ @Override public boolean supportsPartialMessages() { return false; } /** * 群發(fā)發(fā)送消息 * * @param message 消息 * @throws IOException ioException */ public void sendMessage(String message) throws IOException { if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) { for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) { webSocketSession.sendMessage(new TextMessage(message)); } } } /** * 發(fā)給指定連接消息 * * @param message 消息 * @throws IOException ioException */ public void sendMessage(String message, String sessionId) throws IOException { if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) { for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) { if (sessionId.equals(webSocketSession.getId())) { webSocketSession.sendMessage(new TextMessage(message)); } } } } }
websocket綁定URL
import com.yjj.test.websocket.TestWebsocket; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; import javax.annotation.Resource; /** * websocket配置 * * @author yjj * @version 1.0 * @since 2022 -12-28 15:10:11 */ @EnableWebSocket @Configuration public class WebSocketConfig implements WebSocketConfigurer { @Resource private TestWebsocket testWebsocket; /** * Register {@link WebSocketHandler WebSocketHandlers} including SockJS fallback options if desired. * * @param registry */ @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(testWebsocket, "/test").setAllowedOrigins("*"); } }
websocket與定時任務同時存在時,需要加入配置定義線程池進行線程的管理
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; /** * 當定時任務和websocket同時存在時報錯解決 * * @author yjj * @version 1.0 * @since 2022 -04-28 17:35:54 */ @Configuration public class ScheduledConfig { /** * Schedule本身是單線程執(zhí)行的 * * @return the task scheduler */ @Bean public TaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduling = new ThreadPoolTaskScheduler(); scheduling.setPoolSize(20); return scheduling; } }
效果如下
連接上以后服務每隔3秒會向客戶端實時推送消息
到此這篇關于spring中websocket定時任務實現實時推送的文章就介紹到這了,更多相關spring websocket實時推送內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
- Spring?Boot+Vue實現Socket通知推送的完整步驟
- Springboot集成SSE實現單工通信消息推送流程詳解
- SpringBoot整合WebSocket實現后端向前端主動推送消息方式
- Spring?Boot?使用?SSE?方式向前端推送數據詳解
- SpringBoot+WebSocket實現消息推送功能
- Springboot整合企業(yè)微信機器人助手推送消息的實現
- SpringBoot整合WxJava開啟消息推送的實現
- SpringBoot2.0集成WebSocket實現后臺向前端推送信息
- SpringBoot+WebSocket+Netty實現消息推送的示例代碼
- Spring SseEmitter推送消息及常用方法
相關文章
詳解SpringBoot如何優(yōu)雅的進行全局異常處理
在SpringBoot的開發(fā)中,為了提高程序運行的魯棒性,我們經常需要對各種程序異常進行處理,但是如果在每個出異常的地方進行單獨處理的話,這會引入大量業(yè)務不相關的異常處理代碼,這篇文章帶大家了解一下如何優(yōu)雅的進行全局異常處理2023-07-07Spring?cloud負載均衡@LoadBalanced?&?LoadBalancerClient
由于Spring?cloud2020之后移除了Ribbon,直接使用Spring?Cloud?LoadBalancer作為客戶端負載均衡組件,我們討論Spring負載均衡以Spring?Cloud2020之后版本為主,學習Spring?Cloud?LoadBalance2023-11-11BeanUtils.copyProperties復制屬性失敗的原因及解決方案
這篇文章主要介紹了BeanUtils.copyProperties復制屬性失敗的原因及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08