亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

詳解Java使用雙異步后如何保證數(shù)據(jù)一致性

 更新時(shí)間:2024年01月22日 10:20:19   作者:哪吒編程  
這篇文章主要為大家詳細(xì)介紹了Java使用雙異步后如何保證數(shù)據(jù)一致性,文中的示例代碼講解詳細(xì),具有一定的借鑒價(jià)值,有需要的小伙伴可以了解下

一、前情提要

在上一篇文章中,我們通過(guò)雙異步的方式導(dǎo)入了10萬(wàn)行的Excel,有個(gè)小伙伴在評(píng)論區(qū)問(wèn)我,如何保證插入后數(shù)據(jù)的一致性呢?

很簡(jiǎn)單,通過(guò)對(duì)比Excel文件行數(shù)和入庫(kù)數(shù)量是否相等即可。

那么,如何獲取異步線程的返回值呢?

二、通過(guò)Future獲取異步返回值

我們可以通過(guò)給異步方法添加Future返回值的方式獲取結(jié)果。

FutureTask 除了實(shí)現(xiàn) Future 接口外,還實(shí)現(xiàn)了 Runnable 接口。因此,F(xiàn)utureTask 可以交給 Executor 執(zhí)行,也可以由調(diào)用線程直接執(zhí)行FutureTask.run()。

1、FutureTask 是基于 AbstractQueuedSynchronizer實(shí)現(xiàn)的

AbstractQueuedSynchronizer簡(jiǎn)稱AQS,它是一個(gè)同步框架,它提供通用機(jī)制來(lái)原子性管理同步狀態(tài)、阻塞和喚醒線程,以及 維護(hù)被阻塞線程的隊(duì)列。 基于 AQS 實(shí)現(xiàn)的同步器包括: ReentrantLock、Semaphore、ReentrantReadWriteLock、 CountDownLatch 和 FutureTask。

基于 AQS實(shí)現(xiàn)的同步器包含兩種操作:

  • acquire,阻塞調(diào)用線程,直到AQS的狀態(tài)允許這個(gè)線程繼續(xù)執(zhí)行,在FutureTask中,get()就是這個(gè)方法;
  • release,改變AQS的狀態(tài),使state變?yōu)榉亲枞麪顟B(tài),在FutureTask中,可以通過(guò)run()和cancel()實(shí)現(xiàn)。

2、FutureTask執(zhí)行流程

執(zhí)行@Async異步方法;

建立新線程async-executor-X,執(zhí)行Runnable的run()方法,(FutureTask實(shí)現(xiàn)RunnableFuture,RunnableFuture實(shí)現(xiàn)Runnable);

判斷狀態(tài)state;

  • 如果未新建或者不處于AQS,直接返回;
  • 否則進(jìn)入COMPLETING狀態(tài),執(zhí)行異步線程代碼;

如果執(zhí)行cancel()方法改變AQS的狀態(tài)時(shí),會(huì)喚醒AQS等待隊(duì)列中的第一個(gè)線程線程async-executor-1;

線程async-executor-1被喚醒后

  • 將自己從AQS隊(duì)列中移除;
  • 然后喚醒next線程async-executor-2;
  • 改變線程async-executor-1的state;
  • 等待get()線程取值。

next等待線程被喚醒后,循環(huán)線程async-executor-1的步驟

  • 被喚醒
  • 從AQS隊(duì)列中移除
  • 喚醒next線程
  • 改變異步線程狀態(tài)

新建線程async-executor-N,監(jiān)聽異步方法的state

  • 如果處于EXCEPTIONAL以上狀態(tài),拋出異常;
  • 如果處于COMPLETING狀態(tài),加入AQS隊(duì)列等待;
  • 如果處于NORMAL狀態(tài),返回結(jié)果;

3、get()方法執(zhí)行流程

get()方法通過(guò)判斷狀態(tài)state觀測(cè)異步線程是否已結(jié)束,如果結(jié)束直接將結(jié)果返回,否則會(huì)將等待節(jié)點(diǎn)扔進(jìn)等待隊(duì)列自旋,阻塞住線程。

自旋直至異步線程執(zhí)行完畢,獲取另一邊的線程計(jì)算出結(jié)果或取消后,將等待隊(duì)列里的所有節(jié)點(diǎn)依次喚醒并移除隊(duì)列。

  • 如果state小于等于COMPLETING,表示任務(wù)還在執(zhí)行中;
    • 如果線程被中斷,從等待隊(duì)列中移除等待節(jié)點(diǎn)WaitNode,拋出中斷異常;
    • 如果state大于COMPLETING;
      • 如果已有等待節(jié)點(diǎn)WaitNode,將線程置空;
      • 返回當(dāng)前狀態(tài);
    • 如果任務(wù)正在執(zhí)行,讓出時(shí)間片;
    • 如果還未構(gòu)造等待節(jié)點(diǎn),則new一個(gè)新的等待節(jié)點(diǎn);
    • 如果未入隊(duì)列,CAS嘗試入隊(duì);
    • 如果有超時(shí)時(shí)間參數(shù);
      • 計(jì)算超時(shí)時(shí)間;
      • 如果超時(shí),則從等待隊(duì)列中移除等待節(jié)點(diǎn)WaitNode,返回當(dāng)前狀態(tài)state;
      • 阻塞隊(duì)列nanos毫秒。
    • 否則阻塞隊(duì)列;
  • 如果state大于COMPLETING;
    • 如果執(zhí)行完畢,返回結(jié)果;
    • 如果大于等于取消狀態(tài),則拋出異常。

很多小朋友對(duì)讀源碼,嗤之以鼻,工作3年、5年,還是沒(méi)認(rèn)真讀過(guò)任何源碼,覺(jué)得讀了也沒(méi)啥用,或者讀了也看不懂~

其實(shí),只要把源碼的執(zhí)行流程通過(guò)畫圖的形式呈現(xiàn)出來(lái),你就會(huì)幡然醒悟,原來(lái)是這樣的~

簡(jiǎn)而言之:

1. 如果異步線程還沒(méi)執(zhí)行完,則進(jìn)入CAS自旋; 2. 其它線程獲取結(jié)果或取消后,重新喚醒CAS隊(duì)列中等待的線程; 3. 再通過(guò)get()判斷狀態(tài)state; 4. 直至返回結(jié)果或(取消、超時(shí)、異常)為止。

三、FutureTask源碼具體分析

1、FutureTask源碼

通過(guò)定義整形狀態(tài)值,判斷state大小,這個(gè)思想很有意思,值得學(xué)習(xí)。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
public class FutureTask<V> implements RunnableFuture<V> {

	// 最初始的狀態(tài)是new 新建狀態(tài)
	private volatile int state;
    private static final int NEW          = 0; // 新建狀態(tài)
    private static final int COMPLETING   = 1; // 完成中
    private static final int NORMAL       = 2; // 正常執(zhí)行完
    private static final int EXCEPTIONAL  = 3; // 異常
    private static final int CANCELLED    = 4; // 取消
    private static final int INTERRUPTING = 5; // 正在中斷
    private static final int INTERRUPTED  = 6; // 已中斷

	public V get() throws InterruptedException, ExecutionException {
	    int s = state;
	    // 任務(wù)還在執(zhí)行中
	    if (s <= COMPLETING)
	        s = awaitDone(false, 0L);
	    return report(s);
	}
	
	private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
        	// 線程被中斷,從等待隊(duì)列中移除等待節(jié)點(diǎn)WaitNode,拋出中斷異常
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            // 任務(wù)已執(zhí)行完畢或取消
            if (s > COMPLETING) {
            	// 如果已有等待節(jié)點(diǎn)WaitNode,將線程置空
                if (q != null)
                    q.thread = null;
                return s;
            }
            // 任務(wù)正在執(zhí)行,讓出時(shí)間片
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            // 還未構(gòu)造等待節(jié)點(diǎn),則new一個(gè)新的等待節(jié)點(diǎn)
            else if (q == null)
                q = new WaitNode();
            // 未入隊(duì)列,CAS嘗試入隊(duì)
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            // 如果有超時(shí)時(shí)間參數(shù)
            else if (timed) {
            	// 計(jì)算超時(shí)時(shí)間
                nanos = deadline - System.nanoTime();
                // 如果超時(shí),則從等待隊(duì)列中移除等待節(jié)點(diǎn)WaitNode,返回當(dāng)前狀態(tài)state
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                // 阻塞隊(duì)列nanos毫秒
                LockSupport.parkNanos(this, nanos);
            }
            else
            	// 阻塞隊(duì)列
                LockSupport.park(this);
        }
    }
    
	private V report(int s) throws ExecutionException {
		// 獲取outcome中記錄的返回結(jié)果
        Object x = outcome;
        // 如果執(zhí)行完畢,返回結(jié)果
        if (s == NORMAL)
            return (V)x;
            // 如果大于等于取消狀態(tài),則拋出異常
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
}

2、將異步方法的返回值改為Future<Integer>,將返回值放到new AsyncResult<>();中;

@Async("async-executor")
public void readXls(String filePath, String filename) {
    try {
    	// 此代碼為簡(jiǎn)化關(guān)鍵性代碼
        List<Future<Integer>> futureList = new ArrayList<>();
        for (int time = 0; time < times; time++) {
            Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync();
            futureList.add(sumFuture);
        }
    }catch (Exception e){
        logger.error("readXlsCacheAsync---插入數(shù)據(jù)異常:",e);
    }
}
@Async("async-executor")
public Future<Integer> readXlsCacheAsync() {
    try {
        // 此代碼為簡(jiǎn)化關(guān)鍵性代碼
        return new AsyncResult<>(sum);
    }catch (Exception e){
        return new AsyncResult<>(0);
    }
}

3、通過(guò)Future<Integer>.get()獲取返回值:

public static boolean getFutureResult(List<Future<Integer>> futureList, int excelRow){
    int[] futureSumArr = new int[futureList.size()];
    for (int i = 0;i<futureList.size();i++) {
        try {
            Future<Integer> future = futureList.get(i);
            while (true) {
                if (future.isDone() && !future.isCancelled()) {
                    Integer futureSum = future.get();
                    logger.info("獲取Future返回值成功"+"----Future:" + future
                            + ",Result:" + futureSum);
                    futureSumArr[i] += futureSum;
                    break;
                } else {
                    logger.info("Future正在執(zhí)行---獲取Future返回值中---等待3秒");
                    Thread.sleep(3000);
                }
            }
        } catch (Exception e) {
            logger.error("獲取Future返回值異常: ", e);
        }
    }
    
    boolean insertFlag = getInsertSum(futureSumArr, excelRow);
    logger.info("獲取所有異步線程Future的返回值成功,Excel插入結(jié)果="+insertFlag);
    return insertFlag;
}

4、這里也可以通過(guò)新線程+Future獲取Future返回值

不過(guò)感覺(jué)多此一舉了,就當(dāng)練習(xí)Future異步取返回值了~

public static Future<Boolean> getFutureResultThreadFuture(List<Future<Integer>> futureList, int excelRow) {
    ExecutorService service = Executors.newSingleThreadExecutor();
    final boolean[] insertFlag = {false};
    service.execute(new Runnable() {
        public void run() {
            try {
                insertFlag[0] = getFutureResult(futureList, excelRow);
            } catch (Exception e) {
                logger.error("新線程+Future獲取Future返回值異常: ", e);
                insertFlag[0] = false;
            }
        }
    });
    service.shutdown();
    return new AsyncResult<>(insertFlag[0]);
}

獲取異步線程結(jié)果后,我們可以通過(guò)添加事務(wù)的方式,實(shí)現(xiàn)Excel入庫(kù)操作的數(shù)據(jù)一致性。

但Future會(huì)造成主線程的阻塞,這個(gè)就很不友好了,有沒(méi)有更優(yōu)解呢?

以上就是詳解Java使用雙異步后如何保證數(shù)據(jù)一致性的詳細(xì)內(nèi)容,更多關(guān)于Java雙異步如何保證數(shù)據(jù)一致性的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 在 Spring Boot 3 中接入生成式 AI的操作方法

    在 Spring Boot 3 中接入生成式 AI的操作方法

    本文介紹了如何在SpringBoot3中集成生成式AI,以O(shè)penAI的GPT模型為例,通過(guò)代碼示例展示了如何實(shí)現(xiàn),SpringBoot3的優(yōu)勢(shì)和OpenAI的生成式AI技術(shù)結(jié)合,為開發(fā)者提供了高效集成生成式AI的方法,感興趣的朋友跟隨小編一起看看吧
    2025-01-01
  • Java中的位運(yùn)算符、移位運(yùn)算詳細(xì)介紹

    Java中的位運(yùn)算符、移位運(yùn)算詳細(xì)介紹

    這篇文章主要介紹了Java中的位運(yùn)算符、移位運(yùn)算,有需要的朋友可以參考一下
    2013-12-12
  • java 同步器SynchronousQueue詳解及實(shí)例

    java 同步器SynchronousQueue詳解及實(shí)例

    這篇文章主要介紹了java 同步器SynchronousQueue詳解及實(shí)例的相關(guān)資料,需要的朋友可以參考下
    2017-05-05
  • java中三種拷貝方法舉例總結(jié)

    java中三種拷貝方法舉例總結(jié)

    在Java編程中,理解引用拷貝、淺拷貝和深拷貝對(duì)于對(duì)象復(fù)制和內(nèi)存管理至關(guān)重要,這篇文章主要介紹了java中三種拷貝方法的相關(guān)資料,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2024-09-09
  • 淺析java 循序與二元搜索算法

    淺析java 循序與二元搜索算法

    這篇文章主要簡(jiǎn)單介紹了java 循序與二元搜索算法,需要的朋友可以參考下
    2015-02-02
  • java 類加載機(jī)制和反射詳解及實(shí)例代碼

    java 類加載機(jī)制和反射詳解及實(shí)例代碼

    這篇文章主要介紹了java 類加載機(jī)制和反射詳解及實(shí)例代碼的相關(guān)資料,需要的朋友可以參考下
    2017-03-03
  • Spring @Retryable注解輕松搞定循環(huán)重試功能

    Spring @Retryable注解輕松搞定循環(huán)重試功能

    spring系列的spring-retry是另一個(gè)實(shí)用程序模塊,可以幫助我們以標(biāo)準(zhǔn)方式處理任何特定操作的重試。在spring-retry中,所有配置都是基于簡(jiǎn)單注釋的。本文主要介紹了Spring@Retryable注解如何輕松搞定循環(huán)重試功能,有需要的朋友可以參考一下
    2023-04-04
  • 基于Spring Boot使用JpaRepository刪除數(shù)據(jù)時(shí)的注意事項(xiàng)

    基于Spring Boot使用JpaRepository刪除數(shù)據(jù)時(shí)的注意事項(xiàng)

    這篇文章主要介紹了Spring Boot使用JpaRepository刪除數(shù)據(jù)時(shí)的注意事項(xiàng),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-06-06
  • Java+opencv3.2.0之scharr濾波器

    Java+opencv3.2.0之scharr濾波器

    這篇文章主要為大家詳細(xì)介紹了Java+opencv3.2.0之scharr濾波器的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-02-02
  • 手把手教你在eclipse創(chuàng)建第一個(gè)java?web項(xiàng)目并運(yùn)行

    手把手教你在eclipse創(chuàng)建第一個(gè)java?web項(xiàng)目并運(yùn)行

    Eclipse是用來(lái)做開發(fā)的自由集成開發(fā)環(huán)境,這也是很多java程序員會(huì)使用的開發(fā)環(huán)境,所以可以使用eclipse創(chuàng)建javaweb項(xiàng)目,下面這篇文章主要給大家介紹了關(guān)于如何在eclipse創(chuàng)建第一個(gè)java?web項(xiàng)目并運(yùn)行的相關(guān)資料,需要的朋友可以參考下
    2023-02-02

最新評(píng)論