java實(shí)現(xiàn)異步線程,回調(diào)接口方式
更新時(shí)間:2024年07月05日 09:20:05 作者:墨筆之風(fēng)
這篇文章主要介紹了java實(shí)現(xiàn)異步線程,回調(diào)接口方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
最近在業(yè)余時(shí)間呢,無意間發(fā)現(xiàn)一個(gè)問題,使用異步線程推送回調(diào)數(shù)據(jù)
這里小編使用了兩個(gè)IDEA程序分別模擬接收方和發(fā)送方
發(fā)送方
package com.slg.util;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.slg.entity.dto.SettlementMergerResp;
import lombok.extern.slf4j.Slf4j;
import org.apache.poi.ss.formula.functions.T;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author Administrator
* 異步處理數(shù)據(jù)推送
*/
@Slf4j
public class CallbackUtil {
// 推送地址(使用另一個(gè)IDEA模擬(http://localhost:8848/callback)這個(gè)客戶端)
private static final String INTERFACE_CALLBACK = "http://localhost:8848/callback";
// private static final ExecutorService executorService = Executors.newFixedThreadPool(10);
private static final AtomicInteger SUBMITTED_TASKS = new AtomicInteger(0);
private static final ExecutorService executorService = CustomThreadPoolUtil.getExecutorService();
/**
* @param object
* @Description 構(gòu)建異步線程,回調(diào)接口
* @Date 2024/5/29
* 等待線程池關(guān)閉完成后再提交任務(wù) shutdown
* 如果你想立即關(guān)閉線程池 hutdownNow()
* 確保不會(huì)在關(guān)閉線程池后提交任務(wù) isShutdown
**/
public static void getCallBackThread(Object object) {
log.info("回調(diào)接口=======================>正在進(jìn)行數(shù)據(jù)推送:{}", object);
if (!executorService.isShutdown()) {
log.info("準(zhǔn)備推送數(shù)據(jù): {}", object);
SUBMITTED_TASKS.incrementAndGet();
executorService.submit(() -> {
try {
CallbackUtil.sendCallback(object);
log.info("數(shù)據(jù)成功推送給用戶!");
} catch (Exception e) {
log.error("推送數(shù)據(jù)時(shí)出現(xiàn)異常: {}", e.getMessage());
} finally {
SUBMITTED_TASKS.decrementAndGet();
if (SUBMITTED_TASKS.get() == 0) {
log.info("關(guān)閉線程池");
executorService.shutdown();
log.info("已關(guān)閉線程池");
}
}
});
SUBMITTED_TASKS.incrementAndGet();
}
log.info("回調(diào)接口=======================>正在進(jìn)行數(shù)據(jù)推送完畢:{}", object);
}
/**
* @param object (測(cè)試對(duì)象=>可自行模擬)
* @Description 數(shù)據(jù)推送地址
* @Date 2024/5/29
**/
public static void sendCallback(Object object) throws Exception {
URL url = new URL(INTERFACE_CALLBACK);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/json");
connection.setDoOutput(true);
String jsonPayload = convertToJson(object);
try (OutputStream outputStream = connection.getOutputStream()) {
outputStream.write(jsonPayload.getBytes());
outputStream.flush();
}
int responseCode = connection.getResponseCode();
if (responseCode == HttpURLConnection.HTTP_OK) {
// TODO 請(qǐng)求成功,可以根據(jù)需要進(jìn)行進(jìn)一步處理
} else {
// TODO 請(qǐng)求失敗,可以根據(jù)需要進(jìn)行錯(cuò)誤處理
}
connection.disconnect();
}
private static String convertToJson(Object object) {
return new Gson().toJson(object);
}
}接收方
package com.example.demo.controller;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import lombok.Data;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.concurrent.*;
/**
* @author Administrator
* 手動(dòng)創(chuàng)建線程池
*/
@Data
public class CustomThreadPoolUtil {
public static void main(String[] args) throws IOException {
int port = 8848;
HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
server.createContext("/callback", (HttpHandler) new CallbackHandler());
server.setExecutor(null);
server.start();
System.out.println("服務(wù)器已啟動(dòng),監(jiān)聽端口:" + port);
}
static class CallbackHandler implements HttpHandler {
@Override
public void handle(HttpExchange exchange) throws IOException {
String requestMethod = exchange.getRequestMethod();
if (requestMethod.equalsIgnoreCase("POST")) {
InputStream requestBody = exchange.getRequestBody();
byte[] buffer = new byte[requestBody.available()];
requestBody.read(buffer);
String requestData = new String(buffer);
System.out.println("接收到的數(shù)據(jù):" + requestData);
String response = "數(shù)據(jù)已接收";
exchange.sendResponseHeaders(200, response.getBytes().length);
OutputStream outputStream = exchange.getResponseBody();
outputStream.write(response.getBytes());
outputStream.close();
} else {
exchange.sendResponseHeaders(405, -1);
}
}
}
}自定義線程池
package com.slg.util;
import lombok.Data;
import java.util.concurrent.*;
/**
* @author Administrator
* 手動(dòng)創(chuàng)建線程池
*/
@Data
public class CustomThreadPoolUtil {
// 線程池大小
private static final int CORE_POOL_SIZE = 10;
private static final int MAXIMUM_POOL_SIZE = 20;
private static final long KEEP_ALIVE_TIME = 60L;
private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
private static final BlockingQueue<Runnable> WORK_QUEUE = new LinkedBlockingQueue<>();
// 自定義線程工廠
private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() {
private int count = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "CustomThreadPool-" + count++);
}
};
// 創(chuàng)建線程池
private static final ExecutorService executorService = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME,
TIME_UNIT,
WORK_QUEUE,
THREAD_FACTORY
);
public static ExecutorService getExecutorService() {
return executorService;
}
}測(cè)試效果:


總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
PowerJob的WorkerHealthReporter工作流程源碼解讀
這篇文章主要為大家介紹了PowerJob的WorkerHealthReporter工作流程源碼解讀,2023-12-12
Spring boot實(shí)現(xiàn)上傳文件到本地服務(wù)器
這篇文章主要為大家詳細(xì)介紹了Spring boot實(shí)現(xiàn)上傳文件到本地服務(wù)器,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-08-08
詳解Java中的do...while循環(huán)語(yǔ)句的使用方法
這篇文章主要介紹了Java中的do...while循環(huán)語(yǔ)句的使用方法,是Java入門學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下2015-10-10
idea如何設(shè)置Git忽略對(duì)某些文件或文件夾的版本追蹤
這篇文章主要介紹了idea如何設(shè)置Git忽略對(duì)某些文件或文件夾的版本追蹤問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-03-03
java正則表達(dá)式之Pattern與Matcher類詳解
這篇文章主要給大家介紹了關(guān)于java正則表達(dá)式之Pattern與Matcher類的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09

