java實(shí)現(xiàn)異步線程,回調(diào)接口方式
最近在業(yè)余時(shí)間呢,無(wú)意間發(fā)現(xiàn)一個(gè)問(wèn)題,使用異步線程推送回調(diào)數(shù)據(jù)
這里小編使用了兩個(gè)IDEA程序分別模擬接收方和發(fā)送方
發(fā)送方
package com.slg.util; import com.alibaba.fastjson.JSONObject; import com.google.gson.Gson; import com.slg.entity.dto.SettlementMergerResp; import lombok.extern.slf4j.Slf4j; import org.apache.poi.ss.formula.functions.T; import org.springframework.beans.factory.annotation.Autowired; import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URL; import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; /** * @author Administrator * 異步處理數(shù)據(jù)推送 */ @Slf4j public class CallbackUtil { // 推送地址(使用另一個(gè)IDEA模擬(http://localhost:8848/callback)這個(gè)客戶端) private static final String INTERFACE_CALLBACK = "http://localhost:8848/callback"; // private static final ExecutorService executorService = Executors.newFixedThreadPool(10); private static final AtomicInteger SUBMITTED_TASKS = new AtomicInteger(0); private static final ExecutorService executorService = CustomThreadPoolUtil.getExecutorService(); /** * @param object * @Description 構(gòu)建異步線程,回調(diào)接口 * @Date 2024/5/29 * 等待線程池關(guān)閉完成后再提交任務(wù) shutdown * 如果你想立即關(guān)閉線程池 hutdownNow() * 確保不會(huì)在關(guān)閉線程池后提交任務(wù) isShutdown **/ public static void getCallBackThread(Object object) { log.info("回調(diào)接口=======================>正在進(jìn)行數(shù)據(jù)推送:{}", object); if (!executorService.isShutdown()) { log.info("準(zhǔn)備推送數(shù)據(jù): {}", object); SUBMITTED_TASKS.incrementAndGet(); executorService.submit(() -> { try { CallbackUtil.sendCallback(object); log.info("數(shù)據(jù)成功推送給用戶!"); } catch (Exception e) { log.error("推送數(shù)據(jù)時(shí)出現(xiàn)異常: {}", e.getMessage()); } finally { SUBMITTED_TASKS.decrementAndGet(); if (SUBMITTED_TASKS.get() == 0) { log.info("關(guān)閉線程池"); executorService.shutdown(); log.info("已關(guān)閉線程池"); } } }); SUBMITTED_TASKS.incrementAndGet(); } log.info("回調(diào)接口=======================>正在進(jìn)行數(shù)據(jù)推送完畢:{}", object); } /** * @param object (測(cè)試對(duì)象=>可自行模擬) * @Description 數(shù)據(jù)推送地址 * @Date 2024/5/29 **/ public static void sendCallback(Object object) throws Exception { URL url = new URL(INTERFACE_CALLBACK); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); connection.setRequestMethod("POST"); connection.setRequestProperty("Content-Type", "application/json"); connection.setDoOutput(true); String jsonPayload = convertToJson(object); try (OutputStream outputStream = connection.getOutputStream()) { outputStream.write(jsonPayload.getBytes()); outputStream.flush(); } int responseCode = connection.getResponseCode(); if (responseCode == HttpURLConnection.HTTP_OK) { // TODO 請(qǐng)求成功,可以根據(jù)需要進(jìn)行進(jìn)一步處理 } else { // TODO 請(qǐng)求失敗,可以根據(jù)需要進(jìn)行錯(cuò)誤處理 } connection.disconnect(); } private static String convertToJson(Object object) { return new Gson().toJson(object); } }
接收方
package com.example.demo.controller; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpServer; import lombok.Data; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.util.concurrent.*; /** * @author Administrator * 手動(dòng)創(chuàng)建線程池 */ @Data public class CustomThreadPoolUtil { public static void main(String[] args) throws IOException { int port = 8848; HttpServer server = HttpServer.create(new InetSocketAddress(port), 0); server.createContext("/callback", (HttpHandler) new CallbackHandler()); server.setExecutor(null); server.start(); System.out.println("服務(wù)器已啟動(dòng),監(jiān)聽(tīng)端口:" + port); } static class CallbackHandler implements HttpHandler { @Override public void handle(HttpExchange exchange) throws IOException { String requestMethod = exchange.getRequestMethod(); if (requestMethod.equalsIgnoreCase("POST")) { InputStream requestBody = exchange.getRequestBody(); byte[] buffer = new byte[requestBody.available()]; requestBody.read(buffer); String requestData = new String(buffer); System.out.println("接收到的數(shù)據(jù):" + requestData); String response = "數(shù)據(jù)已接收"; exchange.sendResponseHeaders(200, response.getBytes().length); OutputStream outputStream = exchange.getResponseBody(); outputStream.write(response.getBytes()); outputStream.close(); } else { exchange.sendResponseHeaders(405, -1); } } } }
自定義線程池
package com.slg.util; import lombok.Data; import java.util.concurrent.*; /** * @author Administrator * 手動(dòng)創(chuàng)建線程池 */ @Data public class CustomThreadPoolUtil { // 線程池大小 private static final int CORE_POOL_SIZE = 10; private static final int MAXIMUM_POOL_SIZE = 20; private static final long KEEP_ALIVE_TIME = 60L; private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS; private static final BlockingQueue<Runnable> WORK_QUEUE = new LinkedBlockingQueue<>(); // 自定義線程工廠 private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() { private int count = 0; @Override public Thread newThread(Runnable r) { return new Thread(r, "CustomThreadPool-" + count++); } }; // 創(chuàng)建線程池 private static final ExecutorService executorService = new ThreadPoolExecutor( CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TIME_UNIT, WORK_QUEUE, THREAD_FACTORY ); public static ExecutorService getExecutorService() { return executorService; } }
測(cè)試效果:
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
PowerJob的WorkerHealthReporter工作流程源碼解讀
這篇文章主要為大家介紹了PowerJob的WorkerHealthReporter工作流程源碼解讀,2023-12-12Spring Boot如何開(kāi)啟并使用郵件服務(wù)
這篇文章主要介紹了Spring Boot如何開(kāi)啟并使用郵件服務(wù),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-06-06Spring boot實(shí)現(xiàn)上傳文件到本地服務(wù)器
這篇文章主要為大家詳細(xì)介紹了Spring boot實(shí)現(xiàn)上傳文件到本地服務(wù)器,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-08-08詳解Java中的do...while循環(huán)語(yǔ)句的使用方法
這篇文章主要介紹了Java中的do...while循環(huán)語(yǔ)句的使用方法,是Java入門學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下2015-10-10idea如何設(shè)置Git忽略對(duì)某些文件或文件夾的版本追蹤
這篇文章主要介紹了idea如何設(shè)置Git忽略對(duì)某些文件或文件夾的版本追蹤問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-03-03java正則表達(dá)式之Pattern與Matcher類詳解
這篇文章主要給大家介紹了關(guān)于java正則表達(dá)式之Pattern與Matcher類的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09