C#使用channel實現(xiàn)Plc異步任務之間的通信
更新時間:2024年05月15日 10:57:14 作者:潘諾西亞的火山
在C#的并發(fā)編程中,Channel是一種非常強大的數(shù)據(jù)結構,用于在生產(chǎn)者和消費者之間進行通信,本文將給大家介紹C#使用channel實現(xiàn)Plc異步任務之間的通信,文中有相關的代碼示例供大家參考,感興趣的朋友跟著小編一起來看看吧
channel 通信的例子:
using ConsoleApp2; using System.Collections.Concurrent; using System.Threading.Channels; var queue = new BlockingCollection<Message>(new ConcurrentQueue<Message>()); var opt = new BoundedChannelOptions(10) { FullMode = BoundedChannelFullMode.Wait, SingleReader = true, SingleWriter = true, Capacity = 100 //最大容量 }; //有限的 var channelTest = Channel.CreateBounded<Message>(opt); //無限的 var channel = Channel.CreateUnbounded<Message>(); var sender1 = SendMessageThreadAsync(channel.Writer, 1); var sender2 = SendMessageThreadAsync(channel.Writer, 2); var receiver1 = ReceiveMessageThreadAsync(channel.Reader, 3); var receiver2 = ReceiveMessageThreadAsync(channel.Reader, 4); //await sender; // make sure all messages are received await Task.WhenAll(sender1, sender2); channel.Writer.Complete(); await Task.WhenAll(receiver1, receiver2); //await receiver; Console.WriteLine("Press any key to exit..."); Console.ReadKey(); async Task SendMessageThreadAsync(ChannelWriter<Message> writer, int id) { for (int i = 0; i < 20; i++) { await writer.WriteAsync(new Message(id, i.ToString())); Console.WriteLine($"Thread {id} sent {i}"); await Task.Delay(100); } } async Task ReceiveMessageThreadAsync(ChannelReader<Message> reader, int id) { //try //{ // while (!reader.Completion.IsCompleted) // { // var message = await reader.ReadAsync(); // Console.WriteLine($"Thread {id} received {message.Content}"); // } //} //catch (Exception ex) //{ // Console.WriteLine($"Thread {id} channel closed:{ex.Message}"); //} await foreach (var message in reader.ReadAllAsync()) { Console.WriteLine($"Thread {id} received {message.Content}"); } } record Message(int FromId, string Content);
改造為Plc的實例
record PlcDataMessage { public bool IsConnected { get; init; } public DbData DbData { get; init; } // 可以添加其他需要傳遞的信息 }
// 創(chuàng)建一個無邊界的Channel來發(fā)送和接收消息 var plcDataChannel = Channel.CreateUnbounded<PlcDataMessage>(); // 啟動一個新的任務來模擬PLC數(shù)據(jù)讀取 Task.Factory.StartNew(async () => { var cts = new CancellationTokenSource(); // 假設您已經(jīng)有了取消令牌源 while (!cts.IsCancellationRequested) { try { // ... 省略了連接PLC的代碼,這部分邏輯保持不變 ... if (MyIsConnected) { DbData dbDataTemp = await s7Plc.ReadClassAsync<DbData>(42, 0); // 心跳和其他操作... // 構造消息并發(fā)送到Channel var message = new PlcDataMessage { IsConnected = MyIsConnected, DbData = dbDataTemp }; await plcDataChannel.Writer.WriteAsync(message, cts.Token); } // ... 其他邏輯保持不變 ... } catch (Exception ex) { // 處理異常并重新連接PLC(如果需要) // ... // 可以通過Channel發(fā)送一個特殊的消息來表示連接已斷開或發(fā)生了錯誤 // 這里省略了這部分邏輯 // 休眠一段時間后再重試 await Task.Delay(2000, cts.Token); } } // 完成后通知Channel不再發(fā)送更多數(shù)據(jù) plcDataChannel.Writer.Complete(); }, cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); // 在另一個任務或線程中讀取Channel中的數(shù)據(jù) Task.Run(async () => { await foreach (var message in plcDataChannel.Reader.ReadAllAsync(cts.Token)) { if (message.IsConnected) { lock (lockObj) { // 更新dbData,這里假設dbData是一個線程安全的對象或結構 dbData.Str_S = message.DbData.Str_S.Trim(); // ... 更新其他屬性 ... } // 處理讀取到的數(shù)據(jù)... } else { // 處理PLC斷開連接的情況... } } // 讀取完成,Channel已關閉 Console.WriteLine("PLC數(shù)據(jù)讀取完畢。"); }, cts.Token); // ... 其他代碼,如等待所有任務完成、處理取消邏輯等 ...
using System; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; // ... 其他必要的引用和類型定義 ... // 創(chuàng)建一個無邊界的Channel來發(fā)送和接收消息 var plcDataChannel = Channel.CreateUnbounded<PlcDataMessage>(); // 取消令牌源 var cts = new CancellationTokenSource(); // 啟動一個新的任務來模擬PLC數(shù)據(jù)讀取 Task.Run(async () => { Plc s7Plc = null; bool MyIsConnected = false; int errorTimes = 0; try { while (!cts.IsCancellationRequested) { if (s7Plc == null || !MyIsConnected) { // 嘗試連接PLC(略去具體實現(xiàn)) // ... if (MyIsConnected) { // 連接成功,發(fā)送連接成功消息(如果需要) // ... } } else { try { // 讀取PLC數(shù)據(jù)(略去具體實現(xiàn)) DbData dbDataTemp = await s7Plc.ReadClassAsync<DbData>(42, 0, cts.Token); // 心跳和其他操作... // 構造消息并發(fā)送到Channel var message = new PlcDataMessage { IsConnected = MyIsConnected, DbData = dbDataTemp }; await plcDataChannel.Writer.WriteAsync(message, cts.Token); errorTimes = 0; // 重置錯誤計數(shù)器 } catch (Exception ex) { errorTimes++; // 處理異常(例如記錄日志) // ... // 在達到一定錯誤次數(shù)后,關閉PLC連接并重置 if (errorTimes > someThreshold) { s7Plc?.Close(); s7Plc = null; MyIsConnected = false; // 可以選擇發(fā)送一個斷開連接的消息到Channel } // 休眠一段時間后再重試 await Task.Delay(2000, cts.Token); } } // 可以添加一些延時來減少循環(huán)的頻率 await Task.Delay(somePollingInterval, cts.Token); } } catch (OperationCanceledException) { // 取消是預期的,不需要額外處理 } finally { // 確保關閉PLC連接和Channel寫入器 s7Plc?.Close(); plcDataChannel.Writer.Complete(); } }, cts.Token); // 在另一個任務或線程中讀取Channel中的數(shù)據(jù) Task.Run(async () => { await foreach (var message in plcDataChannel.Reader.ReadAllAsync(cts.Token)) { if (message.IsConnected) { // 更新dbData(這里假設dbData是一個線程安全的對象或結構) // 根據(jù)需要添加適當?shù)耐綑C制 // ... // 處理讀取到的數(shù)據(jù)... } else { // 處理PLC斷開連接的情況... } } // 讀取完成,Channel已關閉 Console.WriteLine("PLC數(shù)據(jù)讀取完畢。"); }, cts.Token); // ... 其他代碼,如等待所有任務完成、處理取消邏輯等 ... // 在某個適當?shù)臅r刻取消任務 // cts.Cancel(); // 等待所有任務完成(如果需要
拓展:C# Channel實現(xiàn)線程間通信
C# Channel實現(xiàn)線程間通信
同步方式實現(xiàn):
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; namespace ConsoleApp1 { public class ChannelDemo { static Channel<Message> channel1 = Channel.CreateUnbounded<Message>(); public static void Main2() { sender.Start(1); receive1.Start(2); receive2.Start(3); sender.Join(); Thread.Sleep(3000); receive1.Interrupt(); receive2.Interrupt(); receive1.Join(); receive2.Join(); Console.ReadKey(); } static Thread sender = new Thread(SendMsg); static Thread receive1 = new Thread(ReceiveMsg); static Thread receive2 = new Thread(ReceiveMsg); static void SendMsg(object id) { for (int i = 0; i < 20; i++) { if (channel1.Writer.TryWrite(new Message((int)id, i.ToString()))) { Console.WriteLine($"【線程{id}】發(fā)送了【{i}】"); } } } static void ReceiveMsg(object id) { try { while (true) { if (channel1.Reader.TryRead(out Message message)) { Console.WriteLine($"【線程{id}】從【線程{message.id}】接收了【{message.content}】"); } Thread.Sleep(1); } } catch (ThreadInterruptedException ex) { Console.WriteLine($"接收結束"); } } } }
異步方式:
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Runtime.Remoting.Channels; using System.Text; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; namespace ConsoleApp1 { public class ChannelDemo2 { static Channel<Message> channel1 = Channel.CreateUnbounded<Message>(); public static async void Main2() { await Task.WhenAll(sender, sender2); channel1.Writer.Complete(); await Task.WhenAll(receive1, receive2); Console.ReadKey(); } static Task sender = SendMsgAsync(channel1.Writer, 1); static Task sender2 = SendMsgAsync(channel1.Writer, 4); static Task receive1 = ReceiveMsgAsync(channel1.Reader, 2); static Task receive2 = ReceiveMsgAsync(channel1.Reader, 3); static async Task SendMsgAsync(ChannelWriter<Message> writer, int id) { for (int i = 0; i < 20; i++) { await writer.WriteAsync(new Message((int)id, i.ToString())); Console.WriteLine($"【線程{id}】發(fā)送了【{i}】"); } } static async Task ReceiveMsgAsync(ChannelReader<Message> reader,int id) { try { while (!reader.Completion.IsCompleted) { Message message = await reader.ReadAsync(); Console.WriteLine($"【線程{id}】從【線程{message.id}】接收了【{message.content}】"); } } catch (ChannelClosedException ex) { Console.WriteLine($"ChannelClosed 接收結束"); } } } }
在對Channel進行實例化的時候,也可以傳遞一個Options,這里面可以對消息容量,是否多個發(fā)送者和接受者進行定義。
以上就是C#使用channel實現(xiàn)Plc異步任務之間的通信的詳細內容,更多關于C# channel Plc異步通信的資料請關注腳本之家其它相關文章!