using Prism.Events;
using System;
using System.IO;
using System.IO.Pipes;
using System.Threading;
using System.Threading.Tasks;
using Telerik.Windows.Documents.Selection;
using XP.Common.Logging.Interfaces;
using XP.Hardware.RaySource.Abstractions;
using XP.Hardware.RaySource.Abstractions.Enums;
using XP.Hardware.RaySource.Abstractions.Events;
using XP.Hardware.RaySource.Comet.Messages;
using XP.Hardware.RaySource.Comet.Messages.Commands;
using XP.Hardware.RaySource.Comet.Messages.Responses;
using XP.Hardware.RaySource.Config;
using XP.Hardware.RaySource.Services;
namespace XP.Hardware.RaySource.Implementations
{
///
/// IPC 客户端
/// 通过 NamedPipe 与 Host 进程通信,实现 IXRaySource 接口
/// 上层代码无需感知进程隔离的存在
///
public class CometIpcClient : XRaySourceBase
{
private readonly RaySourceConfig _config;
private readonly IEventAggregator _eventAggregator;
private readonly ILoggerService _logger;
private NamedPipeClientStream _cmdPipe;
private NamedPipeClientStream _rspPipe;
private StreamReader _reader;
private Thread _receiveThread;
private volatile bool _isRunning;
///
/// 管道连接状态标志位
/// 用于在 Host 进程崩溃或管道断开后,阻止后续 SendCommand 尝试写入已断开的管道
///
private volatile bool _isPipeConnected;
///
/// 用于等待命令响应的同步机制
///
private TaskCompletionSource _pendingResponse;
private readonly object _sendLock = new object();
///
/// 命令管道名称(客户端写 → Host 读)
///
private const string CmdPipeName = "XP.Hardware.RaySource.Comet.Cmd";
///
/// 响应管道名称(Host 写 → 客户端读)
///
private const string RspPipeName = "XP.Hardware.RaySource.Comet.Rsp";
///
/// 命令超时时间(毫秒)
/// 需要大于 Host 端 PVI 连接超时时间(30s),因为 Initialize 命令会在 Host 端阻塞等待 PVI 回调链完成
///
private const int CommandTimeoutMs = 35000;
public override string SourceName => "Comet 225kV (IPC)";
///
/// 构造函数
///
public CometIpcClient(
RaySourceConfig config,
IEventAggregator eventAggregator,
ILoggerService loggerService)
{
_config = config ?? throw new ArgumentNullException(nameof(config));
_eventAggregator = eventAggregator ?? throw new ArgumentNullException(nameof(eventAggregator));
_logger = loggerService?.ForModule() ?? throw new ArgumentNullException(nameof(loggerService));
}
#region 管道连接管理
///
/// 连接到 Host 进程的 NamedPipe
///
public void Connect()
{
try
{
// 命令管道:客户端写 → Host 读
_cmdPipe = new NamedPipeClientStream(".", CmdPipeName, PipeDirection.Out);
_cmdPipe.Connect(_config.ConnectionTimeout);
// 响应管道:Host 写 → 客户端读
_rspPipe = new NamedPipeClientStream(".", RspPipeName, PipeDirection.In);
_rspPipe.Connect(_config.ConnectionTimeout);
_reader = new StreamReader(_rspPipe);
// 启动后台消息接收线程
_isRunning = true;
_receiveThread = new Thread(ReceiveLoop)
{
IsBackground = true,
Name = "CometIpcClient-Receiver"
};
_receiveThread.Start();
_isPipeConnected = true;
_logger.Info("已连接到 Host 进程 NamedPipe(双管道模式)");
}
catch (Exception ex)
{
_logger.Error(ex, "连接 Host 进程 NamedPipe 失败:{Message}", ex.Message);
throw;
}
}
///
/// 直接写入一行文本到命令管道
///
private void WriteLineToHost(string text)
{
var bytes = System.Text.Encoding.UTF8.GetBytes(text + "\n");
_cmdPipe.Write(bytes, 0, bytes.Length);
}
#endregion
#region 核心通信方法
///
/// 发送命令并等待响应
///
private RaySourceResponse SendCommand(RaySourceCommand command)
{
// 检查管道连接状态,如果已断开则直接返回 null
if (!_isPipeConnected)
{
_logger.Error(null, "管道未连接,无法发送命令:{CommandType}", command.CommandType);
return null;
}
lock (_sendLock)
{
try
{
_pendingResponse = new TaskCompletionSource();
var json = MessageSerializer.Serialize(command);
_logger.Debug("发送命令:{CommandType},JSON 长度={Length}", command.CommandType, json.Length);
WriteLineToHost(json);
_logger.Debug("命令已写入管道,开始等待响应:{CommandType}", command.CommandType);
// 等待响应,带超时
using (var cts = new CancellationTokenSource(CommandTimeoutMs))
{
cts.Token.Register(() => _pendingResponse?.TrySetCanceled());
try
{
_pendingResponse.Task.Wait();
var result = _pendingResponse.Task.Result;
_logger.Debug("收到命令响应:{CommandType},Success={Success}", command.CommandType, result?.Success);
return result;
}
catch (AggregateException ae) when (ae.InnerException is TaskCanceledException)
{
_logger.Error(null, "命令超时({TimeoutMs}ms):{CommandType}", CommandTimeoutMs, command.CommandType);
return null;
}
}
}
catch (IOException ex)
{
_logger.Error(ex, "管道通信异常(Host 进程可能已崩溃):{Message}", ex.Message);
return null;
}
catch (Exception ex)
{
_logger.Error(ex, "发送命令异常:{Message}", ex.Message);
return null;
}
}
}
#endregion
#region 后台消息接收线程
///
/// 后台消息接收循环
/// 持续读取管道消息,区分命令响应和主动推送
///
private void ReceiveLoop()
{
try
{
_logger.Debug("接收线程已启动");
while (_isRunning)
{
var line = _reader.ReadLine();
if (line == null)
{
// 管道断开,设置标志位阻止后续 SendCommand 写入
_isPipeConnected = false;
_logger.Warn("Host 进程管道已断开");
_pendingResponse?.TrySetResult(null);
break;
}
_logger.Debug("收到管道消息,长度={Length}", line.Length);
var response = MessageSerializer.DeserializeResponse(line);
if (response == null)
{
_logger.Warn("收到无法反序列化的消息,原始内容:{Raw}", line.Length > 200 ? line.Substring(0, 200) + "..." : line);
continue;
}
if (response.IsPush)
{
_logger.Debug("收到推送消息:{PushType}", response.PushType);
// 主动推送消息,路由到对应的 Prism 事件
HandlePushMessage(response);
}
else
{
_logger.Debug("收到命令响应,Success={Success}", response.Success);
// 命令响应,通知等待的 SendCommand 调用
_pendingResponse?.TrySetResult(response);
}
}
}
catch (IOException ex)
{
_isPipeConnected = false;
_logger.Warn("接收线程 IO 异常(管道可能已断开):{Message}", ex.Message);
_pendingResponse?.TrySetResult(null);
}
catch (ObjectDisposedException)
{
// 管道已释放,正常退出
}
catch (Exception ex)
{
_logger.Error(ex, "接收线程异常:{Message}", ex.Message);
_pendingResponse?.TrySetResult(null);
}
}
#endregion
#region 推送消息路由
///
/// 处理推送消息,根据 PushType 路由到对应的 Prism 事件
///
private void HandlePushMessage(RaySourceResponse response)
{
try
{
switch (response.PushType)
{
case "StatusChanged":
HandleStatusPush(response as StatusResponse);
break;
case "XRayStateChanged":
HandleXRayStatePush(response as OperationResponse);
break;
case "ErrorOccurred":
HandleErrorPush(response);
break;
case "ConnectionStateChanged":
HandleConnectionStatePush(response as OperationResponse);
break;
case "Log":
HandleLogPush(response as LogResponse);
break;
default:
_logger.Warn("收到未知推送类型:{PushType}", response.PushType);
break;
}
}
catch (Exception ex)
{
_logger.Error(ex, "处理推送消息异常:{Message}", ex.Message);
}
}
private void HandleStatusPush(StatusResponse status)
{
if (status == null) return;
var statusData = new SystemStatusData
{
SetVoltage = status.SetVoltage,
ActualVoltage = status.ActualVoltage,
SetCurrent = status.SetCurrent,
ActualCurrent = status.ActualCurrent,
IsXRayOn = status.IsXRayOn,
WarmUpStatus = status.WarmUpStatus,
VacuumStatus = status.VacuumStatus,
StartUpStatus = status.StartUpStatus,
AutoCenterStatus = status.AutoCenterStatus,
FilamentAdjustStatus = status.FilamentAdjustStatus,
IsInterlockActive = status.IsInterlockActive,
WatchdogStatus = status.WatchdogStatus,
PowerMode = status.PowerMode,
TxiStatus = status.TxiStatus
};
_eventAggregator.GetEvent().Publish(statusData);
}
private void HandleXRayStatePush(OperationResponse response)
{
if (response?.Data == null) return;
var isXRayOn = Convert.ToBoolean(response.Data);
var rayStatus = isXRayOn ? RaySourceStatus.Opened : RaySourceStatus.Closed;
_eventAggregator.GetEvent().Publish(rayStatus);
}
private void HandleErrorPush(RaySourceResponse response)
{
_eventAggregator.GetEvent().Publish(response.ErrorMessage);
}
private void HandleConnectionStatePush(OperationResponse response)
{
if (response?.Data == null) return;
var stateStr = response.Data.ToString();
if (stateStr == "RaySourceConnected")
{
_isConnected = true;
_logger.Info("收到 RaySourceConnected 推送,射线源已完成全部连接流程,准备就绪");
_eventAggregator.GetEvent().Publish(true);
_logger.Info("射线源连接成功: {SourceName},状态更新为 Closed | X-ray source connectzed successfully: {SourceName}, status updated to Closed", SourceName);
// 通知状态变更 | Notify status changed
_eventAggregator.GetEvent().Publish(RaySourceStatus.Closed);
}
else if (stateStr == "VariablesConnected")
{
_isInitialized = true;
_logger.Info("收到 VariablesConnected 推送,PVI 变量已创建、激活并绑定");
}
else if (stateStr == "ServiceConnected")
{
_serviceConnectedEvent.Set();
_logger.Info("收到 ServiceConnected 推送,PVI Service 和 CPU 已连接");
}
else if (stateStr == "Disconnected")
{
_isConnected = false;
_isInitialized = false;
_logger.Info("收到 Disconnected 推送,PVI 未连接或已断开");
_eventAggregator.GetEvent().Publish(false);
}
}
#endregion
#region 日志传递
///
/// 处理日志推送消息
/// 根据 Level 字段映射到 ILoggerService 对应方法
///
private void HandleLogPush(LogResponse logResponse)
{
if (logResponse == null) return;
var message = logResponse.Message ?? "";
// 将 string[] 转换为 object[] 以匹配 ILoggerService 的 params object[] 签名
var args = logResponse.Args != null ? (object[])logResponse.Args : Array.Empty