344 lines
12 KiB
C#
344 lines
12 KiB
C#
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using XP.Common.Logging.Interfaces;
|
|
using XP.Hardware.Plc.Abstractions;
|
|
|
|
namespace XP.Hardware.PLC.Services
|
|
{
|
|
/// <summary>
|
|
/// 写入任务完成事件参数 | Write task completed event args
|
|
/// </summary>
|
|
public class PlcWriteCompletedEventArgs : EventArgs
|
|
{
|
|
public string Address { get; }
|
|
public object Value { get; }
|
|
public bool Success { get; }
|
|
public Exception Error { get; }
|
|
public DateTime EnqueueTime { get; }
|
|
public DateTime CompleteTime { get; }
|
|
|
|
public PlcWriteCompletedEventArgs(string address, object value, bool success,
|
|
Exception error, DateTime enqueueTime, DateTime completeTime)
|
|
{
|
|
Address = address;
|
|
Value = value;
|
|
Success = success;
|
|
Error = error;
|
|
EnqueueTime = enqueueTime;
|
|
CompleteTime = completeTime;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 写入任务项 | Write task item
|
|
/// </summary>
|
|
internal class WriteTaskItem
|
|
{
|
|
/// <summary>
|
|
/// 写入执行委托 | Write execution delegate
|
|
/// </summary>
|
|
public Func<IPlcClient, Task<bool>> ExecuteFunc { get; set; }
|
|
|
|
/// <summary>
|
|
/// PLC 地址(用于日志和事件)| PLC address (for logging and events)
|
|
/// </summary>
|
|
public string Address { get; set; }
|
|
|
|
/// <summary>
|
|
/// 写入值(用于日志和事件)| Write value (for logging and events)
|
|
/// </summary>
|
|
public object Value { get; set; }
|
|
|
|
/// <summary>
|
|
/// 入队时间 | Enqueue time
|
|
/// </summary>
|
|
public DateTime EnqueueTime { get; set; }
|
|
|
|
/// <summary>
|
|
/// 结果回调(用于 EnqueueAsync 等待模式)| Result callback (for EnqueueAsync await mode)
|
|
/// </summary>
|
|
public TaskCompletionSource<bool> Completion { get; set; }
|
|
}
|
|
|
|
/// <summary>
|
|
/// PLC 队列写入服务实现 | PLC queue write service implementation
|
|
/// 后台单线程消费队列,顺序写入 PLC,避免并发拥堵
|
|
/// </summary>
|
|
public class PlcWriteQueue : IDisposable
|
|
{
|
|
private readonly IPlcClient _plcClient;
|
|
private readonly ILoggerService _logger;
|
|
private readonly BlockingCollection<WriteTaskItem> _queue;
|
|
private readonly CancellationTokenSource _cts;
|
|
private Thread _workerThread;
|
|
private bool _disposed;
|
|
private volatile bool _isRunning;
|
|
|
|
/// <summary>
|
|
/// 写入完成事件 | Write completed event
|
|
/// </summary>
|
|
public event EventHandler<PlcWriteCompletedEventArgs> WriteCompleted;
|
|
|
|
/// <summary>
|
|
/// 当前队列中待处理的任务数 | Pending task count in queue
|
|
/// </summary>
|
|
public int PendingCount => _queue.Count;
|
|
|
|
/// <summary>
|
|
/// 队列是否正在运行 | Whether queue is running
|
|
/// </summary>
|
|
public bool IsRunning => _isRunning;
|
|
|
|
/// <summary>
|
|
/// 获取内部 PLC 客户端引用,供 PlcService 管理连接生命周期
|
|
/// Get internal PLC client reference for PlcService to manage connection lifecycle
|
|
/// </summary>
|
|
internal IPlcClient PlcClient => _plcClient;
|
|
|
|
/// <summary>
|
|
/// 构造函数 | Constructor
|
|
/// </summary>
|
|
/// <param name="plcClient">PLC 客户端接口 | PLC client interface</param>
|
|
/// <param name="logger">日志服务 | Logger service</param>
|
|
/// <param name="maxQueueSize">最大队列容量,默认 1000 | Max queue size, default 1000</param>
|
|
public PlcWriteQueue(IPlcClient plcClient, ILoggerService logger, int maxQueueSize = 1000)
|
|
{
|
|
_plcClient = plcClient ?? throw new ArgumentNullException(nameof(plcClient));
|
|
_logger = logger?.ForModule<PlcWriteQueue>() ?? throw new ArgumentNullException(nameof(logger));
|
|
_queue = new BlockingCollection<WriteTaskItem>(maxQueueSize);
|
|
_cts = new CancellationTokenSource();
|
|
}
|
|
|
|
/// <summary>
|
|
/// 启动队列处理 | Start queue processing
|
|
/// </summary>
|
|
public void Start()
|
|
{
|
|
if (_isRunning)
|
|
{
|
|
_logger.Warn("写入队列已在运行中 | Write queue is already running");
|
|
return;
|
|
}
|
|
|
|
_isRunning = true;
|
|
_workerThread = new Thread(ProcessQueue)
|
|
{
|
|
Name = "PlcWriteQueue-Worker",
|
|
IsBackground = true
|
|
};
|
|
_workerThread.Start();
|
|
_logger.Info("PLC 写入队列已启动 | PLC write queue started");
|
|
}
|
|
|
|
/// <summary>
|
|
/// 停止队列处理(等待当前任务完成)| Stop queue processing (wait for current task)
|
|
/// </summary>
|
|
public void Stop()
|
|
{
|
|
if (!_isRunning) return;
|
|
|
|
_logger.Info("正在停止 PLC 写入队列... | Stopping PLC write queue...");
|
|
_isRunning = false;
|
|
_queue.CompleteAdding();
|
|
|
|
// 等待工作线程结束,最多 5 秒 | Wait for worker thread to finish, max 5 seconds
|
|
_workerThread?.Join(TimeSpan.FromSeconds(5));
|
|
_logger.Info("PLC 写入队列已停止 | PLC write queue stopped");
|
|
}
|
|
|
|
/// <summary>
|
|
/// 清空队列中所有待处理任务 | Clear all pending tasks in queue
|
|
/// </summary>
|
|
public void Clear()
|
|
{
|
|
int cleared = 0;
|
|
while (_queue.TryTake(out var item))
|
|
{
|
|
// 取消等待中的异步任务 | Cancel waiting async tasks
|
|
item.Completion?.TrySetCanceled();
|
|
cleared++;
|
|
}
|
|
if (cleared > 0)
|
|
_logger.Info("已清空 {Count} 个待处理写入任务 | Cleared {Count} pending write tasks", cleared);
|
|
}
|
|
|
|
/// <summary>
|
|
/// 入队写入任务(即发即忘)| Enqueue write task (fire-and-forget)
|
|
/// </summary>
|
|
public void Enqueue<T>(string address, T value)
|
|
{
|
|
var item = new WriteTaskItem
|
|
{
|
|
Address = address,
|
|
Value = value,
|
|
EnqueueTime = DateTime.Now,
|
|
ExecuteFunc = async (client) => await client.WriteAsync(address, value)
|
|
};
|
|
EnqueueItem(item);
|
|
}
|
|
|
|
/// <summary>
|
|
/// 入队写入任务并等待结果 | Enqueue write task and await result
|
|
/// </summary>
|
|
public Task<bool> EnqueueAsync<T>(string address, T value)
|
|
{
|
|
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
var item = new WriteTaskItem
|
|
{
|
|
Address = address,
|
|
Value = value,
|
|
EnqueueTime = DateTime.Now,
|
|
Completion = tcs,
|
|
ExecuteFunc = async (client) => await client.WriteAsync(address, value)
|
|
};
|
|
EnqueueItem(item);
|
|
return tcs.Task;
|
|
}
|
|
|
|
/// <summary>
|
|
/// 入队字符串写入任务(即发即忘)| Enqueue string write task (fire-and-forget)
|
|
/// </summary>
|
|
public void EnqueueString(string address, string value, ushort length)
|
|
{
|
|
var item = new WriteTaskItem
|
|
{
|
|
Address = address,
|
|
Value = value,
|
|
EnqueueTime = DateTime.Now,
|
|
ExecuteFunc = async (client) => await client.WriteStringAsync(address, value, length)
|
|
};
|
|
EnqueueItem(item);
|
|
}
|
|
|
|
/// <summary>
|
|
/// 入队字符串写入任务并等待结果 | Enqueue string write task and await result
|
|
/// </summary>
|
|
public Task<bool> EnqueueStringAsync(string address, string value, ushort length)
|
|
{
|
|
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
var item = new WriteTaskItem
|
|
{
|
|
Address = address,
|
|
Value = value,
|
|
EnqueueTime = DateTime.Now,
|
|
Completion = tcs,
|
|
ExecuteFunc = async (client) => await client.WriteStringAsync(address, value, length)
|
|
};
|
|
EnqueueItem(item);
|
|
return tcs.Task;
|
|
}
|
|
|
|
/// <summary>
|
|
/// 将任务项加入队列 | Add task item to queue
|
|
/// </summary>
|
|
private void EnqueueItem(WriteTaskItem item)
|
|
{
|
|
if (_disposed || _queue.IsAddingCompleted)
|
|
{
|
|
_logger.Warn("写入队列已停止,无法入队: 地址={Address} | Write queue stopped, cannot enqueue: address={Address}", item.Address);
|
|
item.Completion?.TrySetResult(false);
|
|
return;
|
|
}
|
|
|
|
if (!_queue.TryAdd(item))
|
|
{
|
|
_logger.Warn("写入队列已满,丢弃任务: 地址={Address} | Write queue full, task dropped: address={Address}", item.Address);
|
|
item.Completion?.TrySetResult(false);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 后台队列处理循环 | Background queue processing loop
|
|
/// </summary>
|
|
private void ProcessQueue()
|
|
{
|
|
_logger.Debug("写入队列工作线程已启动 | Write queue worker thread started");
|
|
|
|
try
|
|
{
|
|
foreach (var item in _queue.GetConsumingEnumerable(_cts.Token))
|
|
{
|
|
ProcessItem(item);
|
|
}
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
_logger.Debug("写入队列工作线程被取消 | Write queue worker thread cancelled");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.Error(ex, "写入队列工作线程异常退出: {Message} | Write queue worker thread exception: {Message}", ex.Message);
|
|
}
|
|
finally
|
|
{
|
|
_isRunning = false;
|
|
_logger.Debug("写入队列工作线程已退出 | Write queue worker thread exited");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 处理单个写入任务 | Process single write task
|
|
/// </summary>
|
|
private void ProcessItem(WriteTaskItem item)
|
|
{
|
|
bool success = false;
|
|
Exception error = null;
|
|
|
|
try
|
|
{
|
|
// 同步等待异步写入完成 | Synchronously wait for async write to complete
|
|
success = item.ExecuteFunc(_plcClient).GetAwaiter().GetResult();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
error = ex;
|
|
_logger.Error(ex, "队列写入失败: 地址={Address}, 错误={Message} | Queue write failed: address={Address}, error={Message}",
|
|
item.Address, ex.Message);
|
|
}
|
|
|
|
var completeTime = DateTime.Now;
|
|
|
|
// 通知等待的异步调用者 | Notify waiting async callers
|
|
if (item.Completion != null)
|
|
{
|
|
if (error != null)
|
|
item.Completion.TrySetException(error);
|
|
else
|
|
item.Completion.TrySetResult(success);
|
|
}
|
|
|
|
// 触发完成事件 | Fire completed event
|
|
try
|
|
{
|
|
WriteCompleted?.Invoke(this, new PlcWriteCompletedEventArgs(
|
|
item.Address, item.Value, success, error, item.EnqueueTime, completeTime));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.Error(ex, "写入完成事件处理异常: {Message} | Write completed event handler exception: {Message}", ex.Message);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 释放资源 | Dispose resources
|
|
/// </summary>
|
|
public void Dispose()
|
|
{
|
|
if (_disposed) return;
|
|
_disposed = true;
|
|
|
|
_logger.Info("正在释放 PlcWriteQueue 资源... | Disposing PlcWriteQueue resources...");
|
|
|
|
try { _cts.Cancel(); } catch { /* 静默处理 | Silent handling */ }
|
|
try { Stop(); } catch { /* 静默处理 | Silent handling */ }
|
|
try { Clear(); } catch { /* 静默处理 | Silent handling */ }
|
|
try { _queue.Dispose(); } catch { /* 静默处理 | Silent handling */ }
|
|
try { _cts.Dispose(); } catch { /* 静默处理 | Silent handling */ }
|
|
|
|
_logger.Info("PlcWriteQueue 资源已释放 | PlcWriteQueue resources disposed");
|
|
}
|
|
}
|
|
}
|