亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

C#多線程系列之工作流實現(xiàn)

 更新時間:2022年02月14日 11:08:41   作者:癡者工良  
本文詳細講解了C#實現(xiàn)多線程工作流的方法,文中通過示例代碼介紹的非常詳細。對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下

前言

前面學習了很多多線程和任務的基礎知識,這里要來實踐一下啦。通過本篇教程,你可以寫出一個簡單的工作流引擎。

本篇教程內(nèi)容完成是基于任務的,只需要看過筆者的三篇關于異步的文章,掌握 C# 基礎,即可輕松完成。

由于本篇文章編寫的工作流程序,主要使用任務,有些邏輯過程會比較難理解,多測試一下就好。代碼主要還是 C# 基礎,為什么說簡單?

  • 不包含 async 、await
  • 幾乎不含包含多線程(有個讀寫鎖)
  • 不包含表達式樹
  • 幾乎不含反射(有個小地方需要反射一下,但是非常簡單)
  • 沒有復雜的算法

因為是基于任務(Task)的,所以可以輕松設計組合流程,組成復雜的工作流。

由于只是講述基礎,所以不會包含很多種流程控制,這里只實現(xiàn)一些簡單的。

先說明,別用到業(yè)務上。。。這個工作流非常簡單,就幾個功能,這個工作流是基于筆者的多線程系列文章的知識點。寫這個東西是為了講解任務操作,讓讀者更加深入理解任務。

代碼地址:https://github.com/whuanle/CZGL.FLow

節(jié)點

在開始前,我們來設計幾種流程控制的東西。

將一個 步驟/流程/節(jié)點 稱為 step。

Then

一個普通的節(jié)點,包含一個任務。

多個 Then 節(jié)點,可以組成一條連續(xù)的工作流。

Parallel

并行節(jié)點,可以設置多個并行節(jié)點放到 Parallel 中,以及在里面為任一個節(jié)點創(chuàng)建新的分支。

Schedule

定時節(jié)點,創(chuàng)建后會在一定時間后執(zhí)行節(jié)點中的任務。

Delay

讓當前任務阻塞一段時間。

試用一下

順序節(jié)點

打開你的 VS ,創(chuàng)建項目,Nuget 引用 CZGL.DoFlow ,版本 1.0.2 。

創(chuàng)建一個類 MyFlow1,繼承 IDoFlow

    public class MyFlow1 : IDoFlow
    {
        public int Id => 1;

        public string Name => "隨便起個名字";

        public int Version => 1;

        public IDoFlowBuilder Build(IDoFlowBuilder builder)
        {
            throw new NotImplementedException();
        }
    }

你可以創(chuàng)建多個工作流任務,每個工作流的 Id 必須唯一。Name 和 Version 隨便填,因為這里筆者沒有對這幾個字段做邏輯。

IDoFlowBuilder 是構(gòu)建工作流的一個接口。

我們來寫一個工作流測試一下。

/// <summary>
/// 普通節(jié)點 Then 使用方法
/// </summary>
public class MyFlow1 : IDoFlow
{
    public int Id => 1;
    public string Name => "test";
    public int Version => 1;

    public IDoFlowBuilder Build(IDoFlowBuilder builder)
    {
        builder.StartWith(() =>
        {
            Console.WriteLine("工作流開始");
        }).Then(() =>
        {
            Console.WriteLine("下一個節(jié)點");
        }).Then(() =>
         {
             Console.WriteLine("最后一個節(jié)點");
         });
        return builder;
    }
} 

Main 方法中:

        static void Main(string[] args)
        {
            FlowCore.RegisterWorkflow<MyFlow1>();
            // FlowCore.RegisterWorkflow(new MyFlow1());
            FlowCore.Start(1);
            Console.ReadKey();
        }

.StartWith() 方法開始一個工作流;

FlowCore.RegisterWorkflow<T>() 注冊一個工作流;

FlowCore.Start();執(zhí)行一個工作流;

并行任務

其代碼如下:

    /// <summary>
    /// 并行節(jié)點 Parallel 使用方法
    /// </summary>
    public class MyFlow2 : IDoFlow
    {
        public int Id => 2;
        public string Name => "test";
        public int Version => 1;

        public IDoFlowBuilder Build(IDoFlowBuilder builder)
        {
            builder.StartWith()
                .Parallel(steps =>
                {
                    // 每個并行任務也可以設計后面繼續(xù)執(zhí)行其它任務
                    steps.Do(() =>
                    {
                        Console.WriteLine("并行1");
                    }).Do(() =>
                    {
                        Console.WriteLine("并行2");
                    });
                    steps.Do(() =>
                    {
                        Console.WriteLine("并行3");
                    });

                    // 并行任務設計完成后,必須調(diào)用此方法
                    // 此方法必須放在所有并行任務 .Do() 的最后
                    steps.EndParallel();

                    // 如果 .Do() 在 EndParallel() 后,那么不會等待此任務
                    steps.Do(() => { Console.WriteLine("并行異步"); });

                    // 開啟新的分支
                    steps.StartWith()
                    .Then(() =>
                    {
                        Console.WriteLine("新的分支" + Task.CurrentId);
                    }).Then(() => { Console.WriteLine("分支2.0" + Task.CurrentId); });

                }, false)
                .Then(() =>
                {
                    Console.WriteLine("11111111111111111 ");
                });

            return builder;
        }
    }

Main 方法中:

        static void Main(string[] args)
        {
            FlowCore.RegisterWorkflow<MyFlow2>();
            FlowCore.Start(2);
            Console.ReadKey();
        }

通過以上示例,可以大概了解本篇文章中我們要寫的程序。

編寫工作流

建立一個類庫項目,名為 DoFlow。

建立 Extensions、InterfacesServices 三個目錄。

接口構(gòu)建器

新建 IStepBuilder 接口文件到 Interfaces 目錄,其內(nèi)容如下:

using System;

namespace DoFlow.Interfaces
{
    public interface IStepBuilder
    {
        /// <summary>
        /// 普通節(jié)點
        /// </summary>
        /// <param name="stepBuilder"></param>
        /// <returns></returns>
        IStepBuilder Then(Action action);

        /// <summary>
        /// 多個節(jié)點
        /// <para>默認下,需要等待所有的任務完成,這個step才算完成</para>
        /// </summary>
        /// <param name="action"></param>
        /// <param name="anyWait">任意一個任務完成即可跳轉(zhuǎn)到下一個step</param>
        /// <returns></returns>
        IStepBuilder Parallel(Action<IStepParallel> action, bool anyWait = false);

        /// <summary>
        /// 節(jié)點將在某個時間間隔后執(zhí)行
        /// <para>異步,不會阻塞當前工作流的運行,計劃任務將在一段時間后觸發(fā)</para>
        /// </summary>
        /// <returns></returns>
        IStepBuilder Schedule(Action action, TimeSpan time);

        /// <summary>
        /// 阻塞一段時間
        /// </summary>
        /// <param name="time"></param>
        /// <returns></returns>
        IStepBuilder Delay(TimeSpan time);
    }
}

新建 IStepParallel 文件到 Interfaces 目錄。

using System;

namespace DoFlow.Interfaces
{
    /// <summary>
    /// 并行任務
    ///  <para>默認情況下,只有這個節(jié)點的所有并行任務都完成后,這個節(jié)點才算完成</para>
    /// </summary>
    public interface IStepParallel
    {
        /// <summary>
        /// 一個并行任務
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        IStepParallel Do(Action action);

        /// <summary>
        /// 開始一個分支
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        IStepBuilder StartWith(Action action = null);

        /// <summary>
        /// 必須使用此方法結(jié)束一個并行任務
        /// </summary>
        void EndParallel();
    }

    /// <summary>
    /// 并行任務
    /// <para>任意一個任務完成后,就可以跳轉(zhuǎn)到下一個 step</para>
    /// </summary>
    public interface IStepParallelAny : IStepParallel
    {

    }
}

工作流構(gòu)建器

新建 IDoFlowBuilder 接口文件到 Interfaces 目錄。

using System;
using System.Threading.Tasks;

namespace DoFlow.Interfaces
{
    /// <summary>
    /// 構(gòu)建工作流任務
    /// </summary>
    public interface IDoFlowBuilder
    {
        /// <summary>
        /// 開始一個 step
        /// </summary>
        IStepBuilder StartWith(Action action = null);
        void EndWith(Action action);

        Task ThatTask { get; }
    }
}

新建 IDoFlow 接口文件到 Interfaces 目錄。

namespace DoFlow.Interfaces
{

    /// <summary>
    /// 工作流
    /// <para>無參數(shù)傳遞</para>
    /// </summary>
    public interface IDoFlow
    {
        /// <summary>
        /// 全局唯一標識
        /// </summary>
        int Id { get; }

        /// <summary>
        /// 標識此工作流的名稱
        /// </summary>
        string Name { get; }

        /// <summary>
        /// 標識此工作流的版本
        /// </summary>
        int Version { get; }

        IDoFlowBuilder Build(IDoFlowBuilder builder);
    }
}

依賴注入

新建 DependencyInjectionService 文件到 Services 目錄。

用于實現(xiàn)依賴注入和解耦。

using DoFlow.Extensions;
using Microsoft.Extensions.DependencyInjection;
using System;

namespace DoFlow.Services
{
    /// <summary>
    /// 依賴注入服務
    /// </summary>
    public static class DependencyInjectionService
    {
        private static IServiceCollection _servicesList;
        private static IServiceProvider _services;
        static DependencyInjectionService()
        {
            IServiceCollection services = new ServiceCollection();
            _servicesList = services;
            // 注入引擎需要的服務
            InitExtension.StartInitExtension();
            var serviceProvider = services.BuildServiceProvider();
            _services = serviceProvider;
        }

        /// <summary>
        /// 添加一個注入到容器服務
        /// </summary>
        /// <typeparam name="TService"></typeparam>
        /// <typeparam name="TImplementation"></typeparam>
        public static void AddService<TService, TImplementation>()
            where TService : class
            where TImplementation : class, TService
        {
            _servicesList.AddTransient<TService, TImplementation>();
        }

        /// <summary>
        /// 獲取需要的服務
        /// </summary>
        /// <typeparam name="TIResult"></typeparam>
        /// <returns></returns>
        public static TIResult GetService<TIResult>()
        {
            TIResult Tservice = _services.GetService<TIResult>();
            return Tservice;
        }
    }
}

添加一個 InitExtension 文件到 Extensions 目錄。

using DoFlow.Interfaces;
using DoFlow.Services;

namespace DoFlow.Extensions
{
    public static class InitExtension
    {
        private static bool IsInit = false;
        public static void StartInitExtension()
        {
            if (IsInit) return;
            IsInit = true;
            DependencyInjectionService.AddService<IStepBuilder, StepBuilder>();
            DependencyInjectionService.AddService<IDoFlowBuilder, DoFlowBuilder>();
            DependencyInjectionService.AddService<IStepParallel, StepParallelWhenAll>();
            DependencyInjectionService.AddService<IStepParallelAny, StepParallelWhenAny>();
        }
    }
}

實現(xiàn)工作流解析

以下文件均在 Services 目錄建立。

新建 StepBuilder 文件,用于解析節(jié)點,構(gòu)建任務。

using DoFlow.Interfaces;
using System;
using System.Threading.Tasks;

namespace DoFlow.Services
{

    /// <summary>
    /// 節(jié)點工作引擎
    /// </summary>
    public class StepBuilder : IStepBuilder
    {
        private Task _task;

        /// <summary>
        /// 延遲執(zhí)行
        /// </summary>
        /// <param name="time"></param>
        /// <returns></returns>
        public IStepBuilder Delay(TimeSpan time)
        {
            Task.Delay(time).Wait();
            return this;
        }

        /// <summary>
        /// 并行 step
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        public IStepBuilder Parallel(Action<IStepParallel> action, bool anyAwait = false)
        {
            IStepParallel parallel = anyAwait ? DependencyInjectionService.GetService<IStepParallelAny>() : DependencyInjectionService.GetService<IStepParallel>();
            Task task = new Task(() =>
            {
                action.Invoke(parallel);
            });

            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                task.Start();
            });
            _task = task;
            return this;
        }

        /// <summary>
        /// 計劃任務
        /// </summary>
        /// <param name="action"></param>
        /// <param name="time"></param>
        /// <returns></returns>
        public IStepBuilder Schedule(Action action, TimeSpan time)
        {
            Task.Factory.StartNew(() =>
            {
                Task.Delay(time).Wait();
                action.Invoke();
            });
            return this;
        }

        /// <summary>
        /// 普通 step
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        public IStepBuilder Then(Action action)
        {
            Task task = new Task(action);
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                task.Start();
                task.Wait();
            });
            _task = task;
            return this;
        }

        public void SetTask(Task task)
        {
            _task = task;
        }
    }
}

新建 StepParallel 文件,里面有兩個類,用于實現(xiàn)同步任務。

using DoFlow.Interfaces;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace DoFlow.Services
{
    /// <summary>
    /// 第一層所有任務結(jié)束后才能跳轉(zhuǎn)下一個 step
    /// </summary>
    public class StepParallelWhenAll : IStepParallel
    {
        private Task _task;
        private readonly List<Task> _tasks = new List<Task>();
        public StepParallelWhenAll()
        {
            _task = new Task(() => { },TaskCreationOptions.AttachedToParent);
        }
        public IStepParallel Do(Action action)
        {
            _tasks.Add(Task.Run(action));
            return this;
        }

        public void EndParallel()
        {
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                Task.WhenAll(_tasks).Wait();
            });
        }

        public IStepBuilder StartWith(Action action = null)
        {
            Task task =
                action is null ? new Task(() => { })
                : new Task(action);
            var _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() => { task.Start(); });

            return _stepBuilder;
        }
    }

    /// <summary>
    /// 完成任意一個任務即可跳轉(zhuǎn)到下一個 step
    /// </summary>
    public class StepParallelWhenAny : IStepParallelAny
    {
        private Task _task;
        private readonly List<Task> _tasks = new List<Task>();
        public StepParallelWhenAny()
        {
            _task = Task.Run(() => { });
        }
        public IStepParallel Do(Action action)
        {
            _tasks.Add(Task.Run(action));
            return this;
        }

        public void EndParallel()
        {
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                Task.WhenAny(_tasks).Wait();
            });
        }

        public IStepBuilder StartWith(Action action = null)
        {
            Task task =
                action is null ? new Task(() => { })
                : new Task(action);
            var _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() => { task.Start(); });

            return _stepBuilder;
        }
    }
}

新建 DoFlowBuilder 文件,用于構(gòu)建工作流。

using DoFlow.Interfaces;
using System;
using System.Threading.Tasks;

namespace DoFlow.Services
{
    public class DoFlowBuilder : IDoFlowBuilder
    {
        private Task _task;
        public Task ThatTask => _task;

        public void EndWith(Action action)
        {
            _task.Start();
        }

        public IStepBuilder StartWith(Action action = null)
        {
            if (action is null)
                _task = new Task(() => { });
            else _task = new Task(action);

            IStepBuilder _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
            ((StepBuilder)_stepBuilder).SetTask(_task);
            return _stepBuilder;
        }
    }
}

新建 FlowEngine 文件,用于執(zhí)行工作流。

using DoFlow.Interfaces;

namespace DoFlow.Services
{
    /// <summary>
    /// 工作流引擎
    /// </summary>
    public class FlowEngine
    {
        private readonly IDoFlow _flow;
        public FlowEngine(IDoFlow flow)
        {
            _flow = flow;
        }

        /// <summary>
        /// 開始一個工作流
        /// </summary>
        public void Start()
        {
            IDoFlowBuilder builder = DependencyInjectionService.GetService<IDoFlowBuilder>();
            _flow.Build(builder).ThatTask.Start();
        }
    }
}

新建 FlowCore 文件,用于存儲和索引工作流。使用讀寫鎖解決并發(fā)字典問題。

using DoFlow.Interfaces;
using System;
using System.Collections.Generic;
using System.Threading;

namespace DoFlow.Services
{
    public static class FlowCore
    {
        private static Dictionary<int, FlowEngine> flowEngines = new Dictionary<int, FlowEngine>();

        // 讀寫鎖
        private static ReaderWriterLockSlim readerWriterLockSlim = new ReaderWriterLockSlim();

        /// <summary>
        /// 注冊工作流
        /// </summary>
        /// <param name="flow"></param>
        public static bool RegisterWorkflow(IDoFlow flow)
        {
            try
            {
                readerWriterLockSlim.EnterReadLock();
                if (flowEngines.ContainsKey(flow.Id))
                    return false;
                flowEngines.Add(flow.Id, new FlowEngine(flow));
                return true;
            }
            finally
            {
                readerWriterLockSlim.ExitReadLock();
            }
        }

        /// <summary>
        /// 注冊工作流
        /// </summary>
        /// <param name="flow"></param>
        public static bool RegisterWorkflow<TDoFlow>()
        {

            Type type = typeof(TDoFlow);
            IDoFlow flow = (IDoFlow)Activator.CreateInstance(type);
            try
            {
                readerWriterLockSlim.EnterReadLock();
                if (flowEngines.ContainsKey(flow.Id))
                    return false;
                flowEngines.Add(flow.Id, new FlowEngine(flow));
                return true;
            }
            finally
            {
                readerWriterLockSlim.ExitReadLock();
            }
        }

        /// <summary>
        /// 要啟動的工作流
        /// </summary>
        /// <param name="id"></param>
        public static bool Start(int id)
        {
            FlowEngine engine;
            // 讀寫鎖
            try
            {
                readerWriterLockSlim.EnterUpgradeableReadLock();

                if (!flowEngines.ContainsKey(id))
                    return default;
                try
                {
                    readerWriterLockSlim.EnterWriteLock();
                    engine = flowEngines[id];
                }
                catch { return default; }
                finally
                {
                    readerWriterLockSlim.ExitWriteLock();
                }
            }
            catch { return default; }
            finally
            {
                readerWriterLockSlim.ExitUpgradeableReadLock();
            }

            engine.Start();
            return true;
        }
    }
}

就這樣程序?qū)懲炅恕?/p>

到此這篇關于C#多線程系列之工作流實現(xiàn)的文章就介紹到這了。希望對大家的學習有所幫助,也希望大家多多支持腳本之家。

相關文章

  • 如何使用C# 捕獲進程輸出

    如何使用C# 捕獲進程輸出

    這篇文章主要介紹了如何使用C# 捕獲進程輸出,幫助大家更好的理解和使用c#,感興趣的朋友可以了解下
    2020-08-08
  • 詳解C#如何利用TcpListener和TcpClient實現(xiàn)Tcp通訊

    詳解C#如何利用TcpListener和TcpClient實現(xiàn)Tcp通訊

    TcpListener 和 TcpClient 是在 System.Net.Sockets.Socket 類的基礎上做的進一步封裝,使用 GetStream 方法返回網(wǎng)絡流,下面我們就來詳細一下如何使用TcpListener和TcpClient實現(xiàn)Tcp通訊吧
    2023-12-12
  • C#筆記之EF Code First 數(shù)據(jù)模型 數(shù)據(jù)遷移

    C#筆記之EF Code First 數(shù)據(jù)模型 數(shù)據(jù)遷移

    EF 中 Code First 的數(shù)據(jù)遷移網(wǎng)上有很多資料,我這份并沒什么特別。Code First 創(chuàng)建視圖網(wǎng)上也有很多資料,但好像很麻煩,而且親測好像是無效的方法(可能是我太笨,沒搞成功),我摸索出了一種簡單有效的方法,這里分享給大家
    2021-09-09
  • vs2019安裝和使用詳細圖文教程

    vs2019安裝和使用詳細圖文教程

    這篇文章主要介紹了vs2019安裝和使用詳細圖文教程,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-03-03
  • C#異步的世界(下)

    C#異步的世界(下)

    這篇文章主要介紹了C#異步的世界(下),對異步感興趣的同學,可以參考下
    2021-04-04
  • 基于C#編寫一個合并多個Word文檔的工具

    基于C#編寫一個合并多個Word文檔的工具

    這篇文章主要為大家詳細介紹了如何使用C#編寫一個小工具,可以實現(xiàn)把多個word文檔進行合并成一個word文檔,感興趣的小伙伴可以了解下
    2024-02-02
  • C# IsDefined的問題

    C# IsDefined的問題

    這篇文章主要介紹了C# IsDefined的問題,通俗易懂,需要的朋友可以參考下。
    2016-06-06
  • C#實現(xiàn)帶搜索功能的ComboBox

    C#實現(xiàn)帶搜索功能的ComboBox

    這篇文章主要為大家詳細介紹了C#如何實現(xiàn)帶搜索功能的ComboBox,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2016-10-10
  • C#自定義Attribute值的獲取與優(yōu)化技巧

    C#自定義Attribute值的獲取與優(yōu)化技巧

    C#自定義Attribute值的獲取是開發(fā)中會經(jīng)常用到的,大家通常使用反射進行獲取的,代碼也很簡單,今天通過本文給大家講解C#?Attribute值獲取方法,感興趣的朋友跟隨小編一起看看吧
    2023-07-07
  • C# 單元測試全解析

    C# 單元測試全解析

    這篇文章主要介紹了C# 單元測試的相關資料,幫助大家更好的理解和學習使用c#,感興趣的朋友可以了解下
    2021-04-04

最新評論