using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; using XP.Common.Logging.Interfaces; using XP.Hardware.Plc.Abstractions; namespace XP.Hardware.PLC.Services { /// /// 写入任务完成事件参数 | Write task completed event args /// public class PlcWriteCompletedEventArgs : EventArgs { public string Address { get; } public object Value { get; } public bool Success { get; } public Exception Error { get; } public DateTime EnqueueTime { get; } public DateTime CompleteTime { get; } public PlcWriteCompletedEventArgs(string address, object value, bool success, Exception error, DateTime enqueueTime, DateTime completeTime) { Address = address; Value = value; Success = success; Error = error; EnqueueTime = enqueueTime; CompleteTime = completeTime; } } /// /// 写入任务项 | Write task item /// internal class WriteTaskItem { /// /// 写入执行委托 | Write execution delegate /// public Func> ExecuteFunc { get; set; } /// /// PLC 地址(用于日志和事件)| PLC address (for logging and events) /// public string Address { get; set; } /// /// 写入值(用于日志和事件)| Write value (for logging and events) /// public object Value { get; set; } /// /// 入队时间 | Enqueue time /// public DateTime EnqueueTime { get; set; } /// /// 结果回调(用于 EnqueueAsync 等待模式)| Result callback (for EnqueueAsync await mode) /// public TaskCompletionSource Completion { get; set; } } /// /// PLC 队列写入服务实现 | PLC queue write service implementation /// 后台单线程消费队列,顺序写入 PLC,避免并发拥堵 /// public class PlcWriteQueue : IDisposable { private readonly IPlcClient _plcClient; private readonly ILoggerService _logger; private readonly BlockingCollection _queue; private readonly CancellationTokenSource _cts; private Thread _workerThread; private bool _disposed; private volatile bool _isRunning; /// /// 写入完成事件 | Write completed event /// public event EventHandler WriteCompleted; /// /// 当前队列中待处理的任务数 | Pending task count in queue /// public int PendingCount => _queue.Count; /// /// 队列是否正在运行 | Whether queue is running /// public bool IsRunning => _isRunning; /// /// 获取内部 PLC 客户端引用,供 PlcService 管理连接生命周期 /// Get internal PLC client reference for PlcService to manage connection lifecycle /// internal IPlcClient PlcClient => _plcClient; /// /// 构造函数 | Constructor /// /// PLC 客户端接口 | PLC client interface /// 日志服务 | Logger service /// 最大队列容量,默认 1000 | Max queue size, default 1000 public PlcWriteQueue(IPlcClient plcClient, ILoggerService logger, int maxQueueSize = 1000) { _plcClient = plcClient ?? throw new ArgumentNullException(nameof(plcClient)); _logger = logger?.ForModule() ?? throw new ArgumentNullException(nameof(logger)); _queue = new BlockingCollection(maxQueueSize); _cts = new CancellationTokenSource(); } /// /// 启动队列处理 | Start queue processing /// public void Start() { if (_isRunning) { _logger.Warn("写入队列已在运行中 | Write queue is already running"); return; } _isRunning = true; _workerThread = new Thread(ProcessQueue) { Name = "PlcWriteQueue-Worker", IsBackground = true }; _workerThread.Start(); _logger.Info("PLC 写入队列已启动 | PLC write queue started"); } /// /// 停止队列处理(等待当前任务完成)| Stop queue processing (wait for current task) /// public void Stop() { if (!_isRunning) return; _logger.Info("正在停止 PLC 写入队列... | Stopping PLC write queue..."); _isRunning = false; _queue.CompleteAdding(); // 等待工作线程结束,最多 5 秒 | Wait for worker thread to finish, max 5 seconds _workerThread?.Join(TimeSpan.FromSeconds(5)); _logger.Info("PLC 写入队列已停止 | PLC write queue stopped"); } /// /// 清空队列中所有待处理任务 | Clear all pending tasks in queue /// public void Clear() { int cleared = 0; while (_queue.TryTake(out var item)) { // 取消等待中的异步任务 | Cancel waiting async tasks item.Completion?.TrySetCanceled(); cleared++; } if (cleared > 0) _logger.Info("已清空 {Count} 个待处理写入任务 | Cleared {Count} pending write tasks", cleared); } /// /// 入队写入任务(即发即忘)| Enqueue write task (fire-and-forget) /// public void Enqueue(string address, T value) { var item = new WriteTaskItem { Address = address, Value = value, EnqueueTime = DateTime.Now, ExecuteFunc = async (client) => await client.WriteAsync(address, value) }; EnqueueItem(item); } /// /// 入队写入任务并等待结果 | Enqueue write task and await result /// public Task EnqueueAsync(string address, T value) { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var item = new WriteTaskItem { Address = address, Value = value, EnqueueTime = DateTime.Now, Completion = tcs, ExecuteFunc = async (client) => await client.WriteAsync(address, value) }; EnqueueItem(item); return tcs.Task; } /// /// 入队字符串写入任务(即发即忘)| Enqueue string write task (fire-and-forget) /// public void EnqueueString(string address, string value, ushort length) { var item = new WriteTaskItem { Address = address, Value = value, EnqueueTime = DateTime.Now, ExecuteFunc = async (client) => await client.WriteStringAsync(address, value, length) }; EnqueueItem(item); } /// /// 入队字符串写入任务并等待结果 | Enqueue string write task and await result /// public Task EnqueueStringAsync(string address, string value, ushort length) { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var item = new WriteTaskItem { Address = address, Value = value, EnqueueTime = DateTime.Now, Completion = tcs, ExecuteFunc = async (client) => await client.WriteStringAsync(address, value, length) }; EnqueueItem(item); return tcs.Task; } /// /// 将任务项加入队列 | Add task item to queue /// private void EnqueueItem(WriteTaskItem item) { if (_disposed || _queue.IsAddingCompleted) { _logger.Warn("写入队列已停止,无法入队: 地址={Address} | Write queue stopped, cannot enqueue: address={Address}", item.Address); item.Completion?.TrySetResult(false); return; } if (!_queue.TryAdd(item)) { _logger.Warn("写入队列已满,丢弃任务: 地址={Address} | Write queue full, task dropped: address={Address}", item.Address); item.Completion?.TrySetResult(false); } } /// /// 后台队列处理循环 | Background queue processing loop /// private void ProcessQueue() { _logger.Debug("写入队列工作线程已启动 | Write queue worker thread started"); try { foreach (var item in _queue.GetConsumingEnumerable(_cts.Token)) { ProcessItem(item); } } catch (OperationCanceledException) { _logger.Debug("写入队列工作线程被取消 | Write queue worker thread cancelled"); } catch (Exception ex) { _logger.Error(ex, "写入队列工作线程异常退出: {Message} | Write queue worker thread exception: {Message}", ex.Message); } finally { _isRunning = false; _logger.Debug("写入队列工作线程已退出 | Write queue worker thread exited"); } } /// /// 处理单个写入任务 | Process single write task /// private void ProcessItem(WriteTaskItem item) { bool success = false; Exception error = null; try { // 同步等待异步写入完成 | Synchronously wait for async write to complete success = item.ExecuteFunc(_plcClient).GetAwaiter().GetResult(); } catch (Exception ex) { error = ex; _logger.Error(ex, "队列写入失败: 地址={Address}, 错误={Message} | Queue write failed: address={Address}, error={Message}", item.Address, ex.Message); } var completeTime = DateTime.Now; // 通知等待的异步调用者 | Notify waiting async callers if (item.Completion != null) { if (error != null) item.Completion.TrySetException(error); else item.Completion.TrySetResult(success); } // 触发完成事件 | Fire completed event try { WriteCompleted?.Invoke(this, new PlcWriteCompletedEventArgs( item.Address, item.Value, success, error, item.EnqueueTime, completeTime)); } catch (Exception ex) { _logger.Error(ex, "写入完成事件处理异常: {Message} | Write completed event handler exception: {Message}", ex.Message); } } /// /// 释放资源 | Dispose resources /// public void Dispose() { if (_disposed) return; _disposed = true; _logger.Info("正在释放 PlcWriteQueue 资源... | Disposing PlcWriteQueue resources..."); try { _cts.Cancel(); } catch { /* 静默处理 | Silent handling */ } try { Stop(); } catch { /* 静默处理 | Silent handling */ } try { Clear(); } catch { /* 静默处理 | Silent handling */ } try { _queue.Dispose(); } catch { /* 静默处理 | Silent handling */ } try { _cts.Dispose(); } catch { /* 静默处理 | Silent handling */ } _logger.Info("PlcWriteQueue 资源已释放 | PlcWriteQueue resources disposed"); } } }