詳解基于java的Socket聊天程序——服務(wù)端(附demo)
寫在前面:
昨天在博客記錄自己抽空寫的一個Socket聊天程序的初始設(shè)計,那是這個程序的整體設(shè)計,為了完整性,今天把服務(wù)端的設(shè)計細化記錄一下,首頁貼出Socket聊天程序的服務(wù)端大體設(shè)計圖,如下圖:
功能說明:
服務(wù)端主要有兩個操作,一是阻塞接收客戶端的socket并做響應(yīng)處理,二是檢測客戶端的心跳,如果客戶端一段時間內(nèi)沒有發(fā)送心跳則移除該客戶端,由Server創(chuàng)建ServerSocket,然后啟動兩個線程池去處理這兩件事(newFixedThreadPool,newScheduledThreadPool),對應(yīng)的處理類分別是SocketDispatcher、SocketSchedule,其中SocketDispatcher根據(jù)socket不同的請求分發(fā)給不同SocketHandler去處理,而SocketWrapper則是對socket加了一層外殼包裝,用lastAliveTime記錄socket最新的交互時間,SocketHolder存儲當(dāng)前跟服務(wù)端交互的socket集合。
具體實現(xiàn):
[Server.java]
Server是服務(wù)端的入口,由Server的start()方法啟動ServerSocket,然后阻塞接收客戶端的請求,交由SocketDispatcher去分發(fā),SocketDispatcher由newFixedThread類型的線程池啟動,當(dāng)連接數(shù)超過最大數(shù)據(jù)時將被隊列處理,使用scheduleAtFixedRate啟動SocketSchedule定時循環(huán)去監(jiān)聽客戶端的心跳包,這兩個類型都實現(xiàn)了Runnable接口,下面給出服務(wù)端的代碼:
package yaolin.chat.server; import java.io.IOException; import java.net.ServerSocket; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import yaolin.chat.common.ConstantValue; import yaolin.chat.util.LoggerUtil; /** * 服務(wù)器 * @author yaolin */ public class Server { private final ServerSocket server; private final ExecutorService pool; public Server() throws IOException { server = new ServerSocket(ConstantValue.SERVER_PORT); pool = Executors.newFixedThreadPool(ConstantValue.MAX_POOL_SIZE); } public void start() { try { ScheduledExecutorService schedule = Executors.newScheduledThreadPool(1); // Watch dog. Exception?? schedule.scheduleAtFixedRate(new SocketSchedule(), 10, ConstantValue.TIME_OUT, TimeUnit.SECONDS); while (true) { pool.execute(new SocketDispatcher(server.accept())); LoggerUtil.info("ACCEPT A CLIENT AT " + new Date()); } } catch (IOException e) { pool.shutdown(); } } public static void main(String[] args) { try { new Server().start(); } catch (IOException e) { LoggerUtil.error("Server start failed! -> " + e.getMessage(), e); } } }
[SocketDispatcher.java]
Server只是服務(wù)端的入口,并指揮中心,SocketDispatcher才是服務(wù)端的指揮中心,對客戶端不同的消息類型請求進行分發(fā),讓不同的SocketHandler去處理對應(yīng)的消息請求,這里服務(wù)端和客戶端的消息交互都是用JSON數(shù)據(jù),所有消息類都繼承BaseMessage,所以將接收到數(shù)據(jù)轉(zhuǎn)換成BaseMessage類型,再判斷其類型,(數(shù)據(jù)類型模塊屬于common模塊),這里需要提一下的是當(dāng)消息類型是文件類型的時候會睡眠配置執(zhí)行的間隔時間,這樣FileHandler才能有時間對文件流進行讀取和重新發(fā)送給指定的客戶端,而不會立即進入下一次循環(huán)對消息類型的判斷(可能這里設(shè)計有點問題,不過暫時先這樣做),下面給出SocketDispatcher的代碼:
/** * SocketDispatcher * * @author yaolin */ public class SocketDispatcher implements Runnable { private final Socket socket; public SocketDispatcher(Socket socket) { this.socket = socket; } @Override public void run() { if (socket != null) { while (!socket.isClosed()) { try { InputStream is = socket.getInputStream(); String line = null; StringBuffer sb = null; if (is.available() > 0) { BufferedReader bufr = new BufferedReader(new InputStreamReader(is)); sb = new StringBuffer(); while (is.available() > 0 && (line = bufr.readLine()) != null) { sb.append(line); } LoggerUtil.trach("RECEIVE [" + sb.toString() + "] AT " + new Date()); BaseMessage message = JSON.parseObject(sb.toString(), BaseMessage.class); switch (message.getType()) { case MessageType.ALIVE: HandlerFactory.getHandler(MessageType.ALIVE).handle(socket, sb.toString()); break; case MessageType.CHAT: HandlerFactory.getHandler(MessageType.CHAT).handle(socket, sb.toString()); break; case MessageType.FILE: HandlerFactory.getHandler(MessageType.FILE).handle(socket, sb.toString()); LoggerUtil.trach("SEVER:PAUSE TO RECEIVE FILE"); Thread.sleep(ConstantValue.MESSAGE_PERIOD); break; case MessageType.LOGIN: HandlerFactory.getHandler(MessageType.LOGIN).handle(socket, sb.toString()); break; case MessageType.LOGOUT: break; case MessageType.REGISTER: HandlerFactory.getHandler(MessageType.REGISTER).handle(socket, sb.toString()); break; } } else { Thread.sleep(ConstantValue.MESSAGE_PERIOD); } } catch (Exception e) { // catch all handler exception LoggerUtil.error("SocketDispatcher Error!" + e.getMessage(), e); } } } } }
[SocketSchedule.java]
跟Server有直接關(guān)系的另一個類(組件)是SocketSchedule,SocketSchedule主要負責(zé)檢測客戶端的最新一次跟服務(wù)端的交互時間是否超過系統(tǒng)配置允許最大的時間,如果超過了,則將該客戶端socket從服務(wù)端移除,否則更新客戶端的最新一次跟服務(wù)端的交互時間。下面是具體的實現(xiàn):
/** * Remove socket from SocketHolder if lastAliveTime > TIME_OUT * @author yaolin * */ public class SocketSchedule implements Runnable { @Override public void run() { for (String key : SocketHolder.keySet()) { SocketWrapper wrapper = SocketHolder.get(key); if (wrapper != null && wrapper.getLastAliveTime() != null) { if (((new Date().getTime() - wrapper.getLastAliveTime().getTime()) / 1000) > ConstantValue.TIME_OUT) { // remove socket if timeout SocketHolder.remove(key); } } } } }
[SocketHolder.java、SocketWrapper.java]
從上面的代碼可以看出,SocketSchedule#run()只是簡單的對時間進行一次判斷,真正有意義的其實是SocketHolder和SocketWrapper,SocketWrapper則是對socket加了一層外殼包裝,SocketHolder的存儲了當(dāng)前有效時間內(nèi)所有跟服務(wù)端有交互的客戶端,SocketHolder以客戶端的唯一標(biāo)識(這里使用用戶名),作為KEY,客戶端所在的socket作為VALUE的鍵值對形式存儲,其中SocketHolder#flushClientStatus()的處理邏輯是用于通知其他客戶端當(dāng)前客戶端的上線/離線狀態(tài),下面給出這兩個類的具體實現(xiàn):
/** * Wrap Socket, SocketSchedule remove socket if lastAliveTime > TIME_OUT * @author yaolin * */ public class SocketWrapper { private Socket socket; private Date lastAliveTime; // full constructor public SocketWrapper(Socket socket, Date lastAliveTime) { this.socket = socket; this.lastAliveTime = lastAliveTime; } public Socket getSocket() { return socket; } public void setSocket(Socket socket) { this.socket = socket; } public Date getLastAliveTime() { return lastAliveTime; } public void setLastAliveTime(Date lastAliveTime) { this.lastAliveTime = lastAliveTime; } }
/** * SocketHolder * @author yaolin */ public class SocketHolder { private static ConcurrentMap<String, SocketWrapper> listSocketWrap = new ConcurrentHashMap<String, SocketWrapper>(); public static Set<String> keySet() { return listSocketWrap.keySet(); } public static SocketWrapper get(String key) { return listSocketWrap.get(key); } public static void put(String key, SocketWrapper value) { listSocketWrap.put(key, value); flushClientStatus(key, true); } public static SocketWrapper remove(String key) { flushClientStatus(key, false); return listSocketWrap.remove(key); } public static void clear() { listSocketWrap.clear(); } /** * <pre>content:{username:"",flag:false}</pre> * @param flag true:put,false:remove; */ private static void flushClientStatus(String key, boolean flag) { ClientNotifyDTO dto = new ClientNotifyDTO(flag, key); ReturnMessage rm = new ReturnMessage().setKey(Key.NOTIFY).setSuccess(true).setContent(dto); rm.setFrom(ConstantValue.SERVER_NAME); for (String toKey : listSocketWrap.keySet()) { if (!toKey.equals(key)) { // not send to self rm.setTo(toKey); SocketWrapper wrap = listSocketWrap.get(toKey); if (wrap != null) { SendHelper.send(wrap.getSocket(), rm); } } } } }
[SocketHandler.java、HandlerFactory.java、OtherHandlerImpl.java]
SocketDispatcher讓不同的SocketHandler去處理對應(yīng)的消息請求,SocketHandler的設(shè)計其實就是一套簡單的工廠組件吧(其中ReturnHandler暫時由SendHelper實現(xiàn)信息傳送,暫時沒有用到,已經(jīng)@Deprecated ,這里還是給出),完整類圖如下:
下面給出這一塊的代碼,為了縮小篇幅,將所有Handler實現(xiàn)的代碼收起來。
/** * SocketHandler * @author yaolin */ public interface SocketHandler { /** * Handle Client Socket */ public Object handle(Socket client,Object data); }
/** * SocketHandlerFactory * @author yaolin */ public class HandlerFactory { // can not create instance private HandlerFactory(){} public static SocketHandler getHandler(int type) { switch (type) { case MessageType.ALIVE: // usually use return new AliveHandler(); case MessageType.CHAT: return new ChatHandler(); case MessageType.LOGIN: return new LoginHandler(); // case MessageType.RETURN: // return new ReturnHandler(); case MessageType.LOGOUT: return new LogoutHandler(); case MessageType.REGISTER: return new RegisterHandler(); case MessageType.FILE: return new FileHandler(); } return null; // NullPointException } }
/** * AliveSocketHandler * @author yaolin */ public class AliveHandler implements SocketHandler { /** * @return null */ @Override public Object handle(Socket client, Object data) { if (data != null) { BaseMessage message = JSON.parseObject(data.toString(), BaseMessage.class); if (StringUtil.isNotEmpty(message.getFrom())) { SocketWrapper wrapper = SocketHolder.get(message.getFrom()); if (wrapper != null) { wrapper.setLastAliveTime(new Date()); // KEEP SOCKET ... SocketHolder.put(message.getFrom(), wrapper); } } } return null; } }
/** * ChatHandler * * @author yaolin */ public class ChatHandler implements SocketHandler { @Override public Object handle(Socket client, Object data) { if (data != null) { ChatMessage message = JSON.parseObject(data.toString(), ChatMessage.class); if (StringUtil.isNotEmpty(message.getFrom()) && StringUtil.isNotEmpty(message.getTo())) { // exist & send if (SocketHolder.keySet().contains(message.getFrom())) { String owner = message.getFrom(); message.setOwner(owner); // owner will be display if (ConstantValue.TO_ALL.equals(message.getTo())) { // one-to-all // TO_ALL TAB will be select; message.setFrom(ConstantValue.TO_ALL); for (String key : SocketHolder.keySet()) { // also send to self SocketWrapper wrapper = SocketHolder.get(key); if (wrapper != null) { SendHelper.send(wrapper.getSocket(), message); } } } else {// one-to-one SocketWrapper wrapper = SocketHolder.get(message.getTo()); if (wrapper != null) { // owner = from SendHelper.send(wrapper.getSocket(), message); // also send to self // TO TAB will be select; message.setFrom(message.getTo()).setTo(owner); SendHelper.send(client, message); } } } } } return null; } }
public class FileHandler implements SocketHandler { @Override public Object handle(Socket client, Object data) { if (client != null) { FileMessage message = JSON.parseObject(data.toString(), FileMessage.class); if (StringUtil.isNotEmpty(message.getFrom()) && StringUtil.isNotEmpty(message.getTo())) { // exist & send if (SocketHolder.keySet().contains(message.getFrom())) { if (!ConstantValue.TO_ALL.equals(message.getTo())) { // one-to-all SocketWrapper wrapper = SocketHolder.get(message.getTo()); if (wrapper != null) { SendHelper.send(wrapper.getSocket(), message); try { if (client != null && wrapper.getSocket() != null && message.getSize() > 0) { InputStream is = client.getInputStream(); OutputStream os = wrapper.getSocket().getOutputStream(); int total = 0; while (!client.isClosed() && !wrapper.getSocket().isClosed()) { if (is.available() > 0) { byte[] buff = new byte[ConstantValue.BUFF_SIZE]; int len = -1; while (is.available() > 0 && (len = is.read(buff)) != -1) { os.write(buff, 0, len); total += len; LoggerUtil.debug("SEND BUFF [" + len + "]"); } os.flush(); if (total >= message.getSize()) { LoggerUtil.info("SEND BUFF [OK]"); break; } } } // AFTER SEND FILE // SEND SUCCESSFULLY ReturnMessage result = new ReturnMessage().setKey(Key.TIP) .setSuccess(true) .setContent(I18N.INFO_FILE_SEND_SUCCESSFULLY); result.setFrom(message.getTo()).setTo(message.getFrom()) .setOwner(ConstantValue.SERVER_NAME); SendHelper.send(client, result); // RECEIVE SUCCESSFULLY result.setContent(I18N.INFO_FILE_RECEIVE_SUCCESSFULLY) .setFrom(message.getFrom()) .setTo(message.getTo()); SendHelper.send(wrapper.getSocket(), result); } } catch (Exception e) { LoggerUtil.error("Handle file failed !" + e.getMessage(), e); } } } } } } return null; } }
/** * LoginHandler * * @author yaolin * */ public class LoginHandler implements SocketHandler { private UsrService usrService = new UsrService(); @Override public Object handle(Socket client, Object data) { ReturnMessage result = new ReturnMessage(); result.setSuccess(false); if (data != null) { LoginMessage message = JSON.parseObject(data.toString(), LoginMessage.class); if (StringUtil.isNotEmpty(message.getUsername()) && StringUtil.isNotEmpty(message.getPassword())) { if (usrService.login(message.getUsername(), message.getPassword()) != null) { result.setSuccess(true); } else { result.setMessage(I18N.INFO_LOGIN_ERROR_DATA); } result.setFrom(ConstantValue.SERVER_NAME).setTo(message.getUsername()); } else { result.setMessage(I18N.INFO_LOGIN_EMPTY_DATA); } // AFTER LOGIN result.setKey(Key.LOGIN); if (result.isSuccess()) { // HOLD SOCKET SocketHolder.put(result.getTo(), new SocketWrapper(client, new Date())); } SendHelper.send(client, result); if (result.isSuccess()) { // SEND LIST USER ClientListUserDTO dto = new ClientListUserDTO(); dto.setListUser(SocketHolder.keySet()); result.setContent(dto).setKey(Key.LISTUSER); SendHelper.send(client, result); } } return null; } }
public class LogoutHandler implements SocketHandler { @Override public Object handle(Socket client, Object data) { if (data != null) { LogoutMessage message = JSON.parseObject(data.toString(), LogoutMessage.class); if (message != null && StringUtil.isNotEmpty(message.getFrom())) { SocketWrapper wrapper = SocketHolder.get(message.getFrom()); Socket socket = wrapper.getSocket(); if (socket != null) { try { socket.close(); socket = null; } catch (Exception ignore) { } } SocketHolder.remove(message.getFrom()); } } return null; } }
public class RegisterHandler implements SocketHandler { private UsrService usrService = new UsrService(); @Override public Object handle(Socket client, Object data) { ReturnMessage result = new ReturnMessage(); result.setSuccess(false).setFrom(ConstantValue.SERVER_NAME); if (data != null) { RegisterMessage message = JSON.parseObject(data.toString(), RegisterMessage.class); if (StringUtil.isNotEmpty(message.getUsername()) && StringUtil.isNotEmpty(message.getPassword())) { if (usrService.register(message.getUsername(), message.getPassword()) != null) { result.setSuccess(true).setContent(I18N.INFO_REGISTER_OK); } else { result.setMessage(I18N.INFO_REGISTER_CLIENT_EXIST); } } else { result.setMessage(I18N.INFO_REGISTER_EMPTY_DATA); } if (StringUtil.isNotEmpty(message.getUsername())) { result.setTo(message.getUsername()); } // AFTER REGISTER result.setKey(Key.REGISTER); SendHelper.send(client, result); } return null; } }
/** * Use SendHelper to send ReturnMessage, * @see yaolin.chat.server.SocketDispatcher#run() * @author yaolin */ @Deprecated public class ReturnHandler implements SocketHandler { /** * @param data ReturnMessage */ @Override public Object handle(Socket client, Object data) { if (data != null) { ReturnMessage message = (ReturnMessage) data; if(StringUtil.isNotEmpty(message.getFrom()) && StringUtil.isNotEmpty(message.getTo())) { SocketWrapper wrap = SocketHolder.get(message.getTo()); if (wrap != null) { SendHelper.send(wrap.getSocket(), message); } } } return null; } }
用戶業(yè)務(wù):
服務(wù)端除了socket之外,還有一點點具體的業(yè)務(wù),那就是用戶的注冊、登陸等,這里簡單的列出Usr和UsrService這兩個類,這些業(yè)務(wù)暫時沒有怎么實現(xiàn),我并不打算在這個程序中引入ORM框架,所以自己寫一套DBUtil(待改善),在這里也一并貼出來。
這里只進行了簡單的校驗,沒有持久化存儲到DB中,下面是Usr和UsrService:
public class Usr { private long id; private String username; private String password; public long getId() { return id; } public void setId(long id) { this.id = id; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } }
/** * // TODO * @see yaolin.chat.server.usr.repository.UsrRepository * @author yaolin * */ public class UsrService { // TODO db private static Map<String,Usr> db = new HashMap<String,Usr>(); public Usr register(String username, String password) { if (StringUtil.isEmpty(username) || StringUtil.isEmpty(password)) { return null; } if (db.containsKey(username)) { return null; // exist; } Usr usr = new Usr(); usr.setUsername(username); usr.setPassword(MD5Util.getMD5Code(password)); db.put(username, usr); return usr; } public Usr login(String username, String password) { if (StringUtil.isEmpty(username) || StringUtil.isEmpty(password)) { return null; } if (db.containsKey(username)) { Usr usr = db.get(username); if (MD5Util.getMD5Code(password).equals(usr.getPassword())) { return usr; } } return null; } }
下面是DBUtil工具:
/** * DBUtils // TODO 有待調(diào)整&優(yōu)化!! * @author yaolin */ public class DBUtil { // make connection used repeatedly private static final List<Connection> cache = new LinkedList<Connection>(); private static String url; private static String driver; private static String user; private static String password; private static Boolean debug; static { InputStream is = DBUtil.class.getResourceAsStream("/db.properties"); try { Properties p = new Properties(); p.load(is); url = p.getProperty("url"); driver = p.getProperty("driver"); user = p.getProperty("user"); password = p.getProperty("password"); // just for debug try { debug = Boolean.valueOf(p.getProperty("debug")); } catch (Exception ignore) { debug = false; } } catch (Exception e) { throw new RuntimeException(e); } finally { if (is != null) { try { is.close(); is = null; } catch (Exception ignore) { } } } } public synchronized static Connection getConnection() { if (cache.isEmpty()) { cache.add(makeConnection()); } Connection conn = null; int i = 0; try { do { conn = cache.remove(i); } while (conn != null && conn.isClosed() && i < cache.size()); } catch (Exception ignore) { } try { if (conn == null || conn.isClosed()) { cache.add(makeConnection()); conn = cache.remove(0); } return conn; } catch (Exception e) { throw new RuntimeException(e); } } public synchronized static void close(Connection connection) { try { if (connection != null && !connection.isClosed()) { if (debug) debug("release connection!"); cache.add(connection); } } catch (SQLException ignore) { } } public static Object query(String sql, ResultSetMapper mapper, Object... args) { if (debug) debug(sql); Connection conn = getConnection(); PreparedStatement ps = null; ResultSet rs = null; Object result = null; try { ps = conn.prepareStatement(sql); int i = 1; for (Object object : args) { ps.setObject(i++, object); } rs = ps.executeQuery(); result = mapper.mapper(rs); } catch (Exception e) { throw new RuntimeException(e); } finally { try { if (rs != null) { rs.close(); rs = null; } if (ps != null) { ps.close(); ps = null; } } catch (Exception ignore) { } } close(conn); return result; } public static int modify(String sql, Object... args) { if (debug) debug(sql); Connection conn = getConnection(); PreparedStatement ps = null; int row = 0; try { ps = conn.prepareStatement(sql); int i = 1; for (Object object : args) { ps.setObject(i++, object); } row = ps.executeUpdate(); } catch (Exception e) { throw new RuntimeException(e); } finally { try { if (ps != null) { ps.close(); ps = null; } } catch (Exception ignore) { } } close(conn); return row; } public static int[] batch(List<String> sqls) { if (debug) debug(sqls.toString()); Connection conn = getConnection(); Statement stmt = null; int[] row; try { stmt = conn.createStatement(); for (String sql : sqls) { stmt.addBatch(sql); } row = stmt.executeBatch(); } catch (Exception e) { throw new RuntimeException(e); } finally { try { if (stmt != null) { stmt.close(); stmt = null; } } catch (Exception ignore) { } } close(conn); return row; } public static int[] batch(String sql, PreparedStatementSetter setter) { if (debug) debug(sql); Connection conn = getConnection(); PreparedStatement ps = null; int[] row; try { ps = conn.prepareStatement(sql); setter.setter(ps); row = ps.executeBatch(); } catch (Exception e) { throw new RuntimeException(e); } finally { try { if (ps != null) { ps.close(); ps = null; } } catch (Exception ignore) { } } close(conn); return row; } private static Connection makeConnection() { try { Class.forName(driver).newInstance(); Connection conn = DriverManager.getConnection(url, user, password); if (debug) debug("create connection!"); return conn; } catch (Exception e) { throw new RuntimeException(e); } } private static void debug(String sqls) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println(sdf.format(new Date()) + " DEBUG " + Thread.currentThread().getId() + " --- [" + Thread.currentThread().getName() + "] " + "excute sqls : " + sqls); } }
/** * PreparedStatementSetter * @author yaolin */ public interface PreparedStatementSetter { public void setter(PreparedStatement ps); }
/** * ResultSetMapper * @author yaolin */ public interface ResultSetMapper { public Object mapper(ResultSet rs); }
源碼下載:demo
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
用IDEA創(chuàng)建SpringBoot項目的詳細步驟記錄
Idea有著非常簡便的Spring Boot新建過程,同時依靠pom自動下載依賴,下面這篇文章主要給大家介紹了關(guān)于用IDEA創(chuàng)建SpringBoot項目的詳細步驟,文中通過圖文介紹的非常詳細,需要的朋友可以參考下2022-08-08WebUploader+SpringMVC實現(xiàn)文件上傳功能
WebUploader是由Baidu團隊開發(fā)的一個簡單的以HTML5為主,F(xiàn)LASH為輔的現(xiàn)代文件上傳組件。這篇文章主要介紹了WebUploader+SpringMVC實現(xiàn)文件上傳功能,需要的朋友可以參考下2017-06-06SpringBoot集成P6Spy實現(xiàn)SQL日志的記錄詳解
P6Spy是一個框架,它可以無縫地攔截和記錄數(shù)據(jù)庫活動,而無需更改現(xiàn)有應(yīng)用程序的代碼。一般我們使用的比較多的是使用p6spy打印我們最后執(zhí)行的sql語句2022-11-11Delegate IDE build/run actions to maven 配置會影響程序運行嗎?
這篇文章主要介紹了Delegate IDE build/run actions to maven 配置會影響程序運行嗎,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-08-08