亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

C#并行編程之Task同步機(jī)制

 更新時(shí)間:2022年05月09日 14:19:33   作者:springsnow  
這篇文章介紹了C#并行編程之Task同步機(jī)制,文中通過(guò)示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

在并行計(jì)算中,不可避免的會(huì)碰到多個(gè)任務(wù)共享變量,實(shí)例,集合。雖然task自帶了兩個(gè)方法:task.ContinueWith()和Task.Factory.ContinueWhenAll()來(lái)實(shí)現(xiàn)任務(wù)串行化,但是這些簡(jiǎn)單的方法遠(yuǎn)遠(yuǎn)不能滿(mǎn)足我們實(shí)際的開(kāi)發(fā)需要,從.net 4.0開(kāi)始,類(lèi)庫(kù)給我們提供了很多的類(lèi)來(lái)幫助我們簡(jiǎn)化并行計(jì)算中復(fù)雜的數(shù)據(jù)同步問(wèn)題。

一、隔離執(zhí)行:不共享數(shù)據(jù),讓每個(gè)task都有一份自己的數(shù)據(jù)拷貝。

對(duì)數(shù)據(jù)共享問(wèn)題處理的方式是“分離執(zhí)行”,我們通過(guò)把每個(gè)Task執(zhí)行完成后的各自計(jì)算的值進(jìn)行最后的匯總,也就是說(shuō)多個(gè)Task之間不存在數(shù)據(jù)共享了,各自做各自的事,完全分離開(kāi)來(lái)。

1、傳統(tǒng)方式

每個(gè)Task執(zhí)行時(shí)不存在數(shù)據(jù)共享了,每個(gè)Task中計(jì)算自己值,最后我們匯總每個(gè)Task的Result。我們可以通過(guò)Task中傳遞的state參數(shù)來(lái)進(jìn)行隔離執(zhí)行:

int Sum = 0;
Task<int>[] tasks = new Task<int>[10];
for (int i = 0; i < 10; i++)
{
    tasks[i] = new Task<int>((obj) =>
    {
        var start = (int)obj;
        for (int j = 0; j < 1000; j++)
        {
            start = start + 1;
        }
        return start;
    }, Sum);
    tasks[i].Start();
}
Task.WaitAll(tasks);
for (var i = 0; i < 10; i++)
{
    Sum += tasks[i].Result;
}
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);

2、ThreadLocal類(lèi)

在.Net中提供了System.Threading.ThreadLocal來(lái)創(chuàng)建分離。

ThreadLocal是一種提供線(xiàn)程本地存儲(chǔ)的類(lèi)型,它可以給每個(gè)線(xiàn)程一個(gè)分離的實(shí)例,來(lái)提供每個(gè)線(xiàn)程單獨(dú)的數(shù)據(jù)結(jié)果。上面的程序我們可以使用TreadLocal:

int Sum = 0;
Task<int>[] tasks = new Task<int>[10];

var tl = new ThreadLocal<int>();
for (int i = 0; i < 10; i++)
{
    tasks[i] = new Task<int>((obj) =>
    {
        tl.Value = (int)obj;
        for (int j = 0; j < 1000; j++)
        {
            tl.Value++;
        }
        returntl.Value;
    }, Sum);
    tasks[i].Start();
}
Task.WaitAll(tasks);
for (var i = 0; i < 10; i++)
{
    Sum += tasks[i].Result;
}
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);

但是我們要注意的一點(diǎn)TreadLocal是針對(duì)每個(gè)線(xiàn)程的,不是針對(duì)每個(gè)Task的。一個(gè)Tread中可能有多個(gè)Task。

ThreadLocal類(lèi)舉例:

static ThreadLocal<string> local;
static void Main()
{
    //創(chuàng)建ThreadLocal并提供默認(rèn)值 
    local = new ThreadLocal<string>(() => "hehe");

    //修改TLS的線(xiàn)程 
    Thread th = new Thread(() =>
     {
         local.Value = "Mgen";
         Display();
     });

    th.Start();
    th.Join();
    Display();
}

//顯示TLS中數(shù)據(jù)值 
static void Display()
{
    Console.WriteLine("{0} {1}", Thread.CurrentThread.ManagedThreadId, local.Value);
}

二、同步類(lèi)型:通過(guò)調(diào)整task的執(zhí)行,有序的執(zhí)行task。

同步類(lèi)型是一種用來(lái)調(diào)度Task訪(fǎng)問(wèn)臨界區(qū)域的一種特殊類(lèi)型。在.Net 4.0中提供了多種同步類(lèi)型給我們使用,主要分為:輕量級(jí)的、重量級(jí)的和等待處理型的,在下面我們會(huì)介紹常用的同步處理類(lèi)型。

常用的同步類(lèi)型

首先來(lái)看看.Net 4.0中常見(jiàn)的幾種同步類(lèi)型以及處理的相關(guān)問(wèn)題:

同步類(lèi)型以及解決問(wèn)題

  • lock關(guān)鍵字、Montor類(lèi)、SpinLock類(lèi):有序訪(fǎng)問(wèn)臨界區(qū)域
  • Interlocked類(lèi):數(shù)值類(lèi)型的增加或則減少
  • Mutex類(lèi):交叉同步
  • WaitAll方法:同步多個(gè)鎖定(主要是Task之間的調(diào)度)
  • 申明性的同步(如Synchronization):使類(lèi)中的所有的方法同步

1、Lock鎖

其實(shí)最簡(jiǎn)單同步類(lèi)型的使用辦法就是使用lock關(guān)鍵字。在使用lock關(guān)鍵字時(shí),首先我們需要?jiǎng)?chuàng)建一個(gè)鎖定的object,而且這個(gè)object需要所有的task都能訪(fǎng)問(wèn),其次能我們需要將我們的臨界區(qū)域包含在lock塊中。我們之前例子中代碼可以這樣加上lock:

int Sum = 0;
Task[] tasks = new Task[10];

var obj = new Object(); for (int i = 0; i < 10; i++)
{
    tasks[i] = new Task(() =>
    {
        for (int j = 0; j < 1000; j++)
        {
            lock (obj)
            { Sum = Sum + 1; }
        }
    });
    tasks[i].Start();
}
Task.WaitAll(tasks);
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);

其實(shí)lock關(guān)鍵字是使用Monitor的一種簡(jiǎn)短的方式,lock關(guān)鍵字自動(dòng)通過(guò)調(diào)用Monitor.Enter\Monitor.Exit方法來(lái)處理獲得鎖以及釋放鎖。

2、Interlocked 聯(lián)鎖

Interlocked通過(guò)使用操作系統(tǒng)或則硬件的一些特性提供了一些列高效的靜態(tài)的同步方法。其中主要提供了這些方法:Exchange、Add、Increment、CompareExchange四種類(lèi)型的多個(gè)方法的重載。我們將上面的例子中使用Interlocked:

int Sum = 0;
Task[] tasks = new Task[10];
for (int i = 0; i < 10; i++)
{
    tasks[i] = new Task(() =>
    {
        for (int j = 0; j < 1000; j++)
        {
            Interlocked.Increment(ref Sum);
        }
    });
    tasks[i].Start();
}
Task.WaitAll(tasks);
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);

3、Mutex互斥體

Mutex也是一個(gè)同步類(lèi)型,在多個(gè)線(xiàn)程進(jìn)行訪(fǎng)問(wèn)的時(shí)候,它只向一個(gè)線(xiàn)程授權(quán)共享數(shù)據(jù)的獨(dú)立訪(fǎng)問(wèn)。我們可以通過(guò)Mutex中的WaitOne方法來(lái)獲取Mutex的所有權(quán),但是同時(shí)我們要注意的是,我們?cè)谝粋€(gè)線(xiàn)程中多少次調(diào)用過(guò)WaitOne方法,就需要調(diào)用多少次ReleaseMutex方法來(lái)釋放Mutex的占有。上面的例子我們通過(guò)Mutex這樣實(shí)現(xiàn):

int Sum = 0;
Task[] tasks = new Task[10];

var mutex = new Mutex();
for (int i = 0; i < 10; i++)
{
    tasks[i] = new Task(() =>
    {
        for (int j = 0; j < 1000; j++)
        {
            bool lockAcquired = mutex.WaitOne(); try
            {
                Sum++;
            }
            finally
            {
                if (lockAcquired) mutex.ReleaseMutex();
            }
        }
    });
    tasks[i].Start();
}
Task.WaitAll(tasks);
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);

三、申明性同步

我們可以通過(guò)使用Synchronization 特性來(lái)標(biāo)識(shí)一個(gè)類(lèi),從而使一個(gè)類(lèi)型的字段以及方法都實(shí)現(xiàn)同步化。在使用Synchronization 時(shí),我們需要將我們的目標(biāo)同步的類(lèi)繼承于System.ContextBoundObject類(lèi)型。我們來(lái)看看之前的例子我們同步標(biāo)識(shí)Synchronization 的實(shí)現(xiàn):

static void Main(string[] args)
{
    var sum = new SumClass();
    Task[] tasks = new Task[10];
    for (int i = 0; i < 10; i++)
    {
        tasks[i] = new Task(() =>
        {
            for (int j = 0; j < 1000; j++)
            {
                sum.Increment();
            }
        });
        tasks[i].Start();
    }
    Task.WaitAll(tasks);
    Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, sum.GetSum());
}

[Synchronization]
class SumClass : ContextBoundObject
{
    private int _Sum;

    public void Increment()
    {
        _Sum++;
    }

    public int GetSum()
    {
        return _Sum;
    }
}

四、并發(fā)集合

當(dāng)多個(gè)線(xiàn)程對(duì)某個(gè)非線(xiàn)程安全容器并發(fā)地進(jìn)行讀寫(xiě)操作時(shí),這些操作將導(dǎo)致不可預(yù)估的后果或者會(huì)導(dǎo)致報(bào)錯(cuò)。為了解決這個(gè)問(wèn)題我們可以使用lock關(guān)鍵字或者M(jìn)onitor類(lèi)來(lái)給容器上鎖。但鎖的引入使得我們的代碼更加復(fù)雜,同時(shí)也帶來(lái)了更多的同步消耗。而.NET Framework 4提供的線(xiàn)程安全且可拓展的并發(fā)集合能夠使得我們的并行代碼更加容易編寫(xiě),此外,鎖的使用次數(shù)的減少也減少了麻煩的死鎖與競(jìng)爭(zhēng)條件的問(wèn)題。.NET Framework 4主要提供了如下幾種并發(fā)集合:BlockingCollection,ConcurrentBag,ConcurrentDictionary,ConcurrentQueue,ConcurrentStack。這些集合通過(guò)使用一種叫做比較并交換(compare and swap, CAS)指令和內(nèi)存屏障的技術(shù)來(lái)避免使用重量級(jí)的鎖。

在.Net 4.0中提供了很多并發(fā)的集合類(lèi)型來(lái)讓我們處理數(shù)據(jù)同步的集合的問(wèn)題,這里面包括:

1.ConcurrentQueue:提供并發(fā)安全的隊(duì)列集合,以先進(jìn)先出的方式進(jìn)行操作;
2.ConcurrentStack:提供并發(fā)安全的堆棧集合,以先進(jìn)后出的方式進(jìn)行操作;
3.ConcurrentBag:提供并發(fā)安全的一種無(wú)序集合;
4.ConcurrentDictionary:提供并發(fā)安全的一種key-value類(lèi)型的集合。

我們?cè)谶@里只做ConcurrentQueue的一個(gè)嘗試,并發(fā)隊(duì)列是一種線(xiàn)程安全的隊(duì)列集合,我們可以通過(guò)Enqueue()進(jìn)行排隊(duì)、TryDequeue()進(jìn)行出隊(duì)列操作:

for (var j = 0; j < 10; j++)
{
    var queue = new ConcurrentQueue<int>();
    var count = 0;
    for (var i = 0; i < 1000; i++)
    {
        queue.Enqueue(i);
    }
    var tasks = new Task[10];
    for (var i = 0; i < tasks.Length; i++)
    {
        tasks[i] = new Task(() =>
        {
            while (queue.Count > 0)
            {
                int item;
                var isDequeue = queue.TryDequeue(out item);
                if (isDequeue) Interlocked.Increment(ref count);
            }

        });
        tasks[i].Start();
    }
    try
    {
        Task.WaitAll(tasks);
    }
    catch (AggregateException e)
    {
        e.Handle((ex) =>
        {
            Console.WriteLine("Exception Message:{0}", ex.Message);
            return true;
        });
    }
    Console.WriteLine("Dequeue items count :{0}", count);
}

五、Barrier(屏障同步)

barrier叫做屏障,就像下圖中的“紅色線(xiàn)”,如果我們的屏障設(shè)為4個(gè)task就認(rèn)為已經(jīng)滿(mǎn)了的話(huà),那么執(zhí)行中先到的task必須等待后到的task,通知方式也就是barrier.SignalAndWait(),屏障中線(xiàn)程設(shè)置操作為new Barrier(4,(i)=>{})。SignalAndWait給我們提供了超時(shí)的重載,為了能夠取消后續(xù)執(zhí)行

//四個(gè)task執(zhí)行
static Task[] tasks = new Task[4];

static Barrier barrier = null;

static void Main(string[] args)
{
    barrier = new Barrier(tasks.Length, (i) =>
     {
         Console.WriteLine("**********************************************************");
         Console.WriteLine("\n屏障中當(dāng)前階段編號(hào):{0}\n", i.CurrentPhaseNumber);
         Console.WriteLine("**********************************************************");
     });

    for (int j = 0; j < tasks.Length; j++)
    {
        tasks[j] = Task.Factory.StartNew((obj) =>
        {
            var single = Convert.ToInt32(obj);

            LoadUser(single);
            barrier.SignalAndWait();

            LoadProduct(single);
            barrier.SignalAndWait();

            LoadOrder(single);
            barrier.SignalAndWait();
        }, j);
    }

    Task.WaitAll(tasks);

    Console.WriteLine("指定數(shù)據(jù)庫(kù)中所有數(shù)據(jù)已經(jīng)加載完畢!");

    Console.Read();
}

static void LoadUser(int num)
{
    Console.WriteLine("當(dāng)前任務(wù):{0}正在加載User部分?jǐn)?shù)據(jù)!", num);
}

static void LoadProduct(int num)
{
    Console.WriteLine("當(dāng)前任務(wù):{0}正在加載Product部分?jǐn)?shù)據(jù)!", num);
}

static void LoadOrder(int num)
{
    Console.WriteLine("當(dāng)前任務(wù):{0}正在加載Order部分?jǐn)?shù)據(jù)!", num);
}

到此這篇關(guān)于C#并行編程之Task同步機(jī)制的文章就介紹到這了。希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

最新評(píng)論