BlockingQueue隊列處理高并發(fā)下的日志
前言
當(dāng)系統(tǒng)流量負(fù)載比較高時,業(yè)務(wù)日志的寫入操作也要納入系統(tǒng)性能考量之內(nèi),如若處理不當(dāng),將影響系統(tǒng)的正常業(yè)務(wù)操作,之前寫過一篇《spring boot通過MQ消費(fèi)log4j2的日志》的博文,采用了RabbitMQ消息中間件來存儲抗高并發(fā)下的日志,因為引入了中間件,操作使用起來可能沒那么簡便,今天分享使用多線程消費(fèi)阻塞隊列的方式來處理我們的海量日志
what阻塞隊列?
阻塞隊列(BlockingQueue)是區(qū)別于普通隊列多了兩個附加操作的線程安全的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强?。?dāng)隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用于生產(chǎn)者和消費(fèi)者的場景,生產(chǎn)者是往隊列里添加元素的線程,消費(fèi)者是從隊列里拿元素的線程。阻塞隊列就是生產(chǎn)者存放元素的容器,而消費(fèi)者也只從容器里拿元素。
1.聲明存儲固定消息的隊列
/** * Created by kl on 2017/3/20. * Content :銷售操作日志隊列 */ public class SalesLogQueue{ //隊列大小 public static final int QUEUE_MAX_SIZE = 1000; private static SalesLogQueue alarmMessageQueue = new SalesLogQueue(); //阻塞隊列 private BlockingQueueblockingQueue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE); private SalesLogQueue(){} public static SalesLogQueue getInstance() { return alarmMessageQueue; } /** * 消息入隊 * @param salesLog * @return */ public boolean push(SalesLog salesLog) { return this.blockingQueue.add(salesLog);//隊列滿了就拋出異常,不阻塞 } /** * 消息出隊 * @return */ public SalesLog poll() { SalesLog result = null; try { result = this.blockingQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } return result; } /** * 獲取隊列大小 * @return */ public int size() { return this.blockingQueue.size(); } }
ps:因為業(yè)務(wù)原因,采用add的方式入隊,隊列滿了就拋異常,不阻塞
2.消息入隊
消息入隊可以在任何需要保存日志的地方操作,如aop統(tǒng)一攔截日志處理,filter過濾請求日志處理,或者耦合的業(yè)務(wù)日志,記住,不阻塞入隊操作,不然將影響正常的業(yè)務(wù)操作,如下為filter統(tǒng)一處理請求日志:
/** * Created by kl on 2017/3/20. * Content :訪問請求攔截,保存操作日志 */ public class SalesLogFilter implements Filter { private RoleResourceService resourceService; @Override public void init(FilterConfig filterConfig) throws ServletException { ServletContext context = filterConfig.getServletContext(); ApplicationContext ctx = WebApplicationContextUtils.getWebApplicationContext(context); resourceService = ctx.getBean(RoleResourceService.class); } @Override public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException { try { HttpServletRequest request = (HttpServletRequest) servletRequest; String requestUrl = request.getRequestURI(); String requestType=request.getMethod(); String ipAddress = HttpClientUtil.getIpAddr(request); Map resource=resourceService.getResource(); String context=resource.get(requestUrl); //動態(tài)url正則匹配 if(StringUtil.isNull(context)){ for(Map.Entry entry:resource.entrySet()){ String resourceUrl= entry.getKey(); if(requestUrl.matches(resourceUrl)){ context=entry.getValue(); break; } } } SalesLog log=new SalesLog(); log.setCreateDate(new Timestamp(System.currentTimeMillis())); log.setContext(context); log.setOperateUser(UserTokenUtil.currentUser.get().get("realname")); log.setRequestIp(ipAddress); log.setRequestUrl(requestUrl); log.setRequestType(requestType); SalesLogQueue.getInstance().push(log); }catch (Exception e){ e.printStackTrace(); } filterChain.doFilter(servletRequest, servletResponse); } @Override public void destroy() { } }
3.消息出隊被消費(fèi)
BlockingQueue是線程安全的,所以可以放心的在多個線程中去處理隊列中的消息,如下代碼聲明了一個兩個大小的固定線程池,并添加了兩個線程去處理隊列中的消息
/** * Created by kl on 2017/3/20. * Content :啟動消費(fèi)操作日志隊列的線程 */ @Component public class ConsumeSalesLogQueue { @Autowired SalesLogService salesLogService; @PostConstruct public void startrtThread() { ExecutorService e = Executors.newFixedThreadPool(2);//兩個大小的固定線程池 e.submit(new PollSalesLog(salesLogService)); e.submit(new PollSalesLog(salesLogService)); } class PollSalesLog implements Runnable { SalesLogService salesLogService; public PollSalesLog(SalesLogService salesLogService) { this.salesLogService = salesLogService; } @Override public void run() { while (true) { try { SalesLog salesLog = SalesLogQueue.getInstance().poll(); if(salesLog!=null){ salesLogService.saveSalesLog(salesLog); } } catch (Exception e) { e.printStackTrace(); } } } } }
參考博文如下,對BlockingQueue隊列更多了解,可讀一讀如下的博文:
詳細(xì)分析Java并發(fā)集合ArrayBlockingQueue的用法
詳解Java阻塞隊列(BlockingQueue)的實現(xiàn)原理
以上就是BlockingQueue隊列處理高并發(fā)下的日志的詳細(xì)內(nèi)容,更多關(guān)于BlockingQueue隊列處理高并發(fā)日志的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java實現(xiàn)多線程的兩種方式繼承Thread類和實現(xiàn)Runnable接口的方法
下面小編就為大家?guī)硪黄猨ava實現(xiàn)多線程的兩種方式繼承Thread類和實現(xiàn)Runnable接口的方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-09-09Java線程讓步_動力節(jié)點Java學(xué)院整理
yield()的作用是讓步。它能讓當(dāng)前線程由“運(yùn)行狀態(tài)”進(jìn)入到“就緒狀態(tài)”,從而讓其它具有相同優(yōu)先級的等待線程獲取執(zhí)行權(quán)。下面通過本文給大家介紹Java線程讓步的相關(guān)知識,需要的朋友參考下吧2017-05-05Java之使用POI教你玩轉(zhuǎn)Excel導(dǎo)入與導(dǎo)出
這篇文章主要介紹了Java之使用POI教你玩轉(zhuǎn)Excel導(dǎo)入與導(dǎo)出,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-10-10