Files
XplorePlane/XP.Hardware.PLC/Services/PlcWriteQueue.cs
T

344 lines
12 KiB
C#

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using XP.Common.Logging.Interfaces;
using XP.Hardware.Plc.Abstractions;
namespace XP.Hardware.PLC.Services
{
/// <summary>
/// 写入任务完成事件参数 | Write task completed event args
/// </summary>
public class PlcWriteCompletedEventArgs : EventArgs
{
public string Address { get; }
public object Value { get; }
public bool Success { get; }
public Exception Error { get; }
public DateTime EnqueueTime { get; }
public DateTime CompleteTime { get; }
public PlcWriteCompletedEventArgs(string address, object value, bool success,
Exception error, DateTime enqueueTime, DateTime completeTime)
{
Address = address;
Value = value;
Success = success;
Error = error;
EnqueueTime = enqueueTime;
CompleteTime = completeTime;
}
}
/// <summary>
/// 写入任务项 | Write task item
/// </summary>
internal class WriteTaskItem
{
/// <summary>
/// 写入执行委托 | Write execution delegate
/// </summary>
public Func<IPlcClient, Task<bool>> ExecuteFunc { get; set; }
/// <summary>
/// PLC 地址(用于日志和事件)| PLC address (for logging and events)
/// </summary>
public string Address { get; set; }
/// <summary>
/// 写入值(用于日志和事件)| Write value (for logging and events)
/// </summary>
public object Value { get; set; }
/// <summary>
/// 入队时间 | Enqueue time
/// </summary>
public DateTime EnqueueTime { get; set; }
/// <summary>
/// 结果回调(用于 EnqueueAsync 等待模式)| Result callback (for EnqueueAsync await mode)
/// </summary>
public TaskCompletionSource<bool> Completion { get; set; }
}
/// <summary>
/// PLC 队列写入服务实现 | PLC queue write service implementation
/// 后台单线程消费队列,顺序写入 PLC,避免并发拥堵
/// </summary>
public class PlcWriteQueue : IDisposable
{
private readonly IPlcClient _plcClient;
private readonly ILoggerService _logger;
private readonly BlockingCollection<WriteTaskItem> _queue;
private readonly CancellationTokenSource _cts;
private Thread _workerThread;
private bool _disposed;
private volatile bool _isRunning;
/// <summary>
/// 写入完成事件 | Write completed event
/// </summary>
public event EventHandler<PlcWriteCompletedEventArgs> WriteCompleted;
/// <summary>
/// 当前队列中待处理的任务数 | Pending task count in queue
/// </summary>
public int PendingCount => _queue.Count;
/// <summary>
/// 队列是否正在运行 | Whether queue is running
/// </summary>
public bool IsRunning => _isRunning;
/// <summary>
/// 获取内部 PLC 客户端引用,供 PlcService 管理连接生命周期
/// Get internal PLC client reference for PlcService to manage connection lifecycle
/// </summary>
internal IPlcClient PlcClient => _plcClient;
/// <summary>
/// 构造函数 | Constructor
/// </summary>
/// <param name="plcClient">PLC 客户端接口 | PLC client interface</param>
/// <param name="logger">日志服务 | Logger service</param>
/// <param name="maxQueueSize">最大队列容量,默认 1000 | Max queue size, default 1000</param>
public PlcWriteQueue(IPlcClient plcClient, ILoggerService logger, int maxQueueSize = 1000)
{
_plcClient = plcClient ?? throw new ArgumentNullException(nameof(plcClient));
_logger = logger?.ForModule<PlcWriteQueue>() ?? throw new ArgumentNullException(nameof(logger));
_queue = new BlockingCollection<WriteTaskItem>(maxQueueSize);
_cts = new CancellationTokenSource();
}
/// <summary>
/// 启动队列处理 | Start queue processing
/// </summary>
public void Start()
{
if (_isRunning)
{
_logger.Warn("写入队列已在运行中 | Write queue is already running");
return;
}
_isRunning = true;
_workerThread = new Thread(ProcessQueue)
{
Name = "PlcWriteQueue-Worker",
IsBackground = true
};
_workerThread.Start();
_logger.Info("PLC 写入队列已启动 | PLC write queue started");
}
/// <summary>
/// 停止队列处理(等待当前任务完成)| Stop queue processing (wait for current task)
/// </summary>
public void Stop()
{
if (!_isRunning) return;
_logger.Info("正在停止 PLC 写入队列... | Stopping PLC write queue...");
_isRunning = false;
_queue.CompleteAdding();
// 等待工作线程结束,最多 5 秒 | Wait for worker thread to finish, max 5 seconds
_workerThread?.Join(TimeSpan.FromSeconds(5));
_logger.Info("PLC 写入队列已停止 | PLC write queue stopped");
}
/// <summary>
/// 清空队列中所有待处理任务 | Clear all pending tasks in queue
/// </summary>
public void Clear()
{
int cleared = 0;
while (_queue.TryTake(out var item))
{
// 取消等待中的异步任务 | Cancel waiting async tasks
item.Completion?.TrySetCanceled();
cleared++;
}
if (cleared > 0)
_logger.Info("已清空 {Count} 个待处理写入任务 | Cleared {Count} pending write tasks", cleared);
}
/// <summary>
/// 入队写入任务(即发即忘)| Enqueue write task (fire-and-forget)
/// </summary>
public void Enqueue<T>(string address, T value)
{
var item = new WriteTaskItem
{
Address = address,
Value = value,
EnqueueTime = DateTime.Now,
ExecuteFunc = async (client) => await client.WriteAsync(address, value)
};
EnqueueItem(item);
}
/// <summary>
/// 入队写入任务并等待结果 | Enqueue write task and await result
/// </summary>
public Task<bool> EnqueueAsync<T>(string address, T value)
{
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var item = new WriteTaskItem
{
Address = address,
Value = value,
EnqueueTime = DateTime.Now,
Completion = tcs,
ExecuteFunc = async (client) => await client.WriteAsync(address, value)
};
EnqueueItem(item);
return tcs.Task;
}
/// <summary>
/// 入队字符串写入任务(即发即忘)| Enqueue string write task (fire-and-forget)
/// </summary>
public void EnqueueString(string address, string value, ushort length)
{
var item = new WriteTaskItem
{
Address = address,
Value = value,
EnqueueTime = DateTime.Now,
ExecuteFunc = async (client) => await client.WriteStringAsync(address, value, length)
};
EnqueueItem(item);
}
/// <summary>
/// 入队字符串写入任务并等待结果 | Enqueue string write task and await result
/// </summary>
public Task<bool> EnqueueStringAsync(string address, string value, ushort length)
{
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var item = new WriteTaskItem
{
Address = address,
Value = value,
EnqueueTime = DateTime.Now,
Completion = tcs,
ExecuteFunc = async (client) => await client.WriteStringAsync(address, value, length)
};
EnqueueItem(item);
return tcs.Task;
}
/// <summary>
/// 将任务项加入队列 | Add task item to queue
/// </summary>
private void EnqueueItem(WriteTaskItem item)
{
if (_disposed || _queue.IsAddingCompleted)
{
_logger.Warn("写入队列已停止,无法入队: 地址={Address} | Write queue stopped, cannot enqueue: address={Address}", item.Address);
item.Completion?.TrySetResult(false);
return;
}
if (!_queue.TryAdd(item))
{
_logger.Warn("写入队列已满,丢弃任务: 地址={Address} | Write queue full, task dropped: address={Address}", item.Address);
item.Completion?.TrySetResult(false);
}
}
/// <summary>
/// 后台队列处理循环 | Background queue processing loop
/// </summary>
private void ProcessQueue()
{
_logger.Debug("写入队列工作线程已启动 | Write queue worker thread started");
try
{
foreach (var item in _queue.GetConsumingEnumerable(_cts.Token))
{
ProcessItem(item);
}
}
catch (OperationCanceledException)
{
_logger.Debug("写入队列工作线程被取消 | Write queue worker thread cancelled");
}
catch (Exception ex)
{
_logger.Error(ex, "写入队列工作线程异常退出: {Message} | Write queue worker thread exception: {Message}", ex.Message);
}
finally
{
_isRunning = false;
_logger.Debug("写入队列工作线程已退出 | Write queue worker thread exited");
}
}
/// <summary>
/// 处理单个写入任务 | Process single write task
/// </summary>
private void ProcessItem(WriteTaskItem item)
{
bool success = false;
Exception error = null;
try
{
// 同步等待异步写入完成 | Synchronously wait for async write to complete
success = item.ExecuteFunc(_plcClient).GetAwaiter().GetResult();
}
catch (Exception ex)
{
error = ex;
_logger.Error(ex, "队列写入失败: 地址={Address}, 错误={Message} | Queue write failed: address={Address}, error={Message}",
item.Address, ex.Message);
}
var completeTime = DateTime.Now;
// 通知等待的异步调用者 | Notify waiting async callers
if (item.Completion != null)
{
if (error != null)
item.Completion.TrySetException(error);
else
item.Completion.TrySetResult(success);
}
// 触发完成事件 | Fire completed event
try
{
WriteCompleted?.Invoke(this, new PlcWriteCompletedEventArgs(
item.Address, item.Value, success, error, item.EnqueueTime, completeTime));
}
catch (Exception ex)
{
_logger.Error(ex, "写入完成事件处理异常: {Message} | Write completed event handler exception: {Message}", ex.Message);
}
}
/// <summary>
/// 释放资源 | Dispose resources
/// </summary>
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_logger.Info("正在释放 PlcWriteQueue 资源... | Disposing PlcWriteQueue resources...");
try { _cts.Cancel(); } catch { /* 静默处理 | Silent handling */ }
try { Stop(); } catch { /* 静默处理 | Silent handling */ }
try { Clear(); } catch { /* 静默处理 | Silent handling */ }
try { _queue.Dispose(); } catch { /* 静默处理 | Silent handling */ }
try { _cts.Dispose(); } catch { /* 静默处理 | Silent handling */ }
_logger.Info("PlcWriteQueue 资源已释放 | PlcWriteQueue resources disposed");
}
}
}