using Prism.Events; using System; using System.IO; using System.IO.Pipes; using System.Threading; using System.Threading.Tasks; using Telerik.Windows.Documents.Selection; using XP.Common.Logging.Interfaces; using XP.Hardware.RaySource.Abstractions; using XP.Hardware.RaySource.Abstractions.Enums; using XP.Hardware.RaySource.Abstractions.Events; using XP.Hardware.RaySource.Comet.Messages; using XP.Hardware.RaySource.Comet.Messages.Commands; using XP.Hardware.RaySource.Comet.Messages.Responses; using XP.Hardware.RaySource.Config; using XP.Hardware.RaySource.Services; namespace XP.Hardware.RaySource.Implementations { /// /// IPC 客户端 /// 通过 NamedPipe 与 Host 进程通信,实现 IXRaySource 接口 /// 上层代码无需感知进程隔离的存在 /// public class CometIpcClient : XRaySourceBase { private readonly RaySourceConfig _config; private readonly IEventAggregator _eventAggregator; private readonly ILoggerService _logger; private NamedPipeClientStream _cmdPipe; private NamedPipeClientStream _rspPipe; private StreamReader _reader; private Thread _receiveThread; private volatile bool _isRunning; /// /// 管道连接状态标志位 /// 用于在 Host 进程崩溃或管道断开后,阻止后续 SendCommand 尝试写入已断开的管道 /// private volatile bool _isPipeConnected; /// /// 用于等待命令响应的同步机制 /// private TaskCompletionSource _pendingResponse; private readonly object _sendLock = new object(); /// /// 命令管道名称(客户端写 → Host 读) /// private const string CmdPipeName = "XP.Hardware.RaySource.Comet.Cmd"; /// /// 响应管道名称(Host 写 → 客户端读) /// private const string RspPipeName = "XP.Hardware.RaySource.Comet.Rsp"; /// /// 命令超时时间(毫秒) /// 需要大于 Host 端 PVI 连接超时时间(30s),因为 Initialize 命令会在 Host 端阻塞等待 PVI 回调链完成 /// private const int CommandTimeoutMs = 35000; public override string SourceName => "Comet 225kV (IPC)"; /// /// 构造函数 /// public CometIpcClient( RaySourceConfig config, IEventAggregator eventAggregator, ILoggerService loggerService) { _config = config ?? throw new ArgumentNullException(nameof(config)); _eventAggregator = eventAggregator ?? throw new ArgumentNullException(nameof(eventAggregator)); _logger = loggerService?.ForModule() ?? throw new ArgumentNullException(nameof(loggerService)); } #region 管道连接管理 /// /// 连接到 Host 进程的 NamedPipe /// public void Connect() { try { // 命令管道:客户端写 → Host 读 _cmdPipe = new NamedPipeClientStream(".", CmdPipeName, PipeDirection.Out); _cmdPipe.Connect(_config.ConnectionTimeout); // 响应管道:Host 写 → 客户端读 _rspPipe = new NamedPipeClientStream(".", RspPipeName, PipeDirection.In); _rspPipe.Connect(_config.ConnectionTimeout); _reader = new StreamReader(_rspPipe); // 启动后台消息接收线程 _isRunning = true; _receiveThread = new Thread(ReceiveLoop) { IsBackground = true, Name = "CometIpcClient-Receiver" }; _receiveThread.Start(); _isPipeConnected = true; _logger.Info("已连接到 Host 进程 NamedPipe(双管道模式)"); } catch (Exception ex) { _logger.Error(ex, "连接 Host 进程 NamedPipe 失败:{Message}", ex.Message); throw; } } /// /// 直接写入一行文本到命令管道 /// private void WriteLineToHost(string text) { var bytes = System.Text.Encoding.UTF8.GetBytes(text + "\n"); _cmdPipe.Write(bytes, 0, bytes.Length); } #endregion #region 核心通信方法 /// /// 发送命令并等待响应 /// private RaySourceResponse SendCommand(RaySourceCommand command) { // 检查管道连接状态,如果已断开则直接返回 null if (!_isPipeConnected) { _logger.Error(null, "管道未连接,无法发送命令:{CommandType}", command.CommandType); return null; } lock (_sendLock) { try { _pendingResponse = new TaskCompletionSource(); var json = MessageSerializer.Serialize(command); _logger.Debug("发送命令:{CommandType},JSON 长度={Length}", command.CommandType, json.Length); WriteLineToHost(json); _logger.Debug("命令已写入管道,开始等待响应:{CommandType}", command.CommandType); // 等待响应,带超时 using (var cts = new CancellationTokenSource(CommandTimeoutMs)) { cts.Token.Register(() => _pendingResponse?.TrySetCanceled()); try { _pendingResponse.Task.Wait(); var result = _pendingResponse.Task.Result; _logger.Debug("收到命令响应:{CommandType},Success={Success}", command.CommandType, result?.Success); return result; } catch (AggregateException ae) when (ae.InnerException is TaskCanceledException) { _logger.Error(null, "命令超时({TimeoutMs}ms):{CommandType}", CommandTimeoutMs, command.CommandType); return null; } } } catch (IOException ex) { _logger.Error(ex, "管道通信异常(Host 进程可能已崩溃):{Message}", ex.Message); return null; } catch (Exception ex) { _logger.Error(ex, "发送命令异常:{Message}", ex.Message); return null; } } } #endregion #region 后台消息接收线程 /// /// 后台消息接收循环 /// 持续读取管道消息,区分命令响应和主动推送 /// private void ReceiveLoop() { try { _logger.Debug("接收线程已启动"); while (_isRunning) { var line = _reader.ReadLine(); if (line == null) { // 管道断开,设置标志位阻止后续 SendCommand 写入 _isPipeConnected = false; _logger.Warn("Host 进程管道已断开"); _pendingResponse?.TrySetResult(null); break; } _logger.Debug("收到管道消息,长度={Length}", line.Length); var response = MessageSerializer.DeserializeResponse(line); if (response == null) { _logger.Warn("收到无法反序列化的消息,原始内容:{Raw}", line.Length > 200 ? line.Substring(0, 200) + "..." : line); continue; } if (response.IsPush) { _logger.Debug("收到推送消息:{PushType}", response.PushType); // 主动推送消息,路由到对应的 Prism 事件 HandlePushMessage(response); } else { _logger.Debug("收到命令响应,Success={Success}", response.Success); // 命令响应,通知等待的 SendCommand 调用 _pendingResponse?.TrySetResult(response); } } } catch (IOException ex) { _isPipeConnected = false; _logger.Warn("接收线程 IO 异常(管道可能已断开):{Message}", ex.Message); _pendingResponse?.TrySetResult(null); } catch (ObjectDisposedException) { // 管道已释放,正常退出 } catch (Exception ex) { _logger.Error(ex, "接收线程异常:{Message}", ex.Message); _pendingResponse?.TrySetResult(null); } } #endregion #region 推送消息路由 /// /// 处理推送消息,根据 PushType 路由到对应的 Prism 事件 /// private void HandlePushMessage(RaySourceResponse response) { try { switch (response.PushType) { case "StatusChanged": HandleStatusPush(response as StatusResponse); break; case "XRayStateChanged": HandleXRayStatePush(response as OperationResponse); break; case "ErrorOccurred": HandleErrorPush(response); break; case "ConnectionStateChanged": HandleConnectionStatePush(response as OperationResponse); break; case "Log": HandleLogPush(response as LogResponse); break; default: _logger.Warn("收到未知推送类型:{PushType}", response.PushType); break; } } catch (Exception ex) { _logger.Error(ex, "处理推送消息异常:{Message}", ex.Message); } } private void HandleStatusPush(StatusResponse status) { if (status == null) return; var statusData = new SystemStatusData { SetVoltage = status.SetVoltage, ActualVoltage = status.ActualVoltage, SetCurrent = status.SetCurrent, ActualCurrent = status.ActualCurrent, IsXRayOn = status.IsXRayOn, WarmUpStatus = status.WarmUpStatus, VacuumStatus = status.VacuumStatus, StartUpStatus = status.StartUpStatus, AutoCenterStatus = status.AutoCenterStatus, FilamentAdjustStatus = status.FilamentAdjustStatus, IsInterlockActive = status.IsInterlockActive, WatchdogStatus = status.WatchdogStatus, PowerMode = status.PowerMode, TxiStatus = status.TxiStatus }; _eventAggregator.GetEvent().Publish(statusData); } private void HandleXRayStatePush(OperationResponse response) { if (response?.Data == null) return; var isXRayOn = Convert.ToBoolean(response.Data); var rayStatus = isXRayOn ? RaySourceStatus.Opened : RaySourceStatus.Closed; _eventAggregator.GetEvent().Publish(rayStatus); } private void HandleErrorPush(RaySourceResponse response) { _eventAggregator.GetEvent().Publish(response.ErrorMessage); } private void HandleConnectionStatePush(OperationResponse response) { if (response?.Data == null) return; var stateStr = response.Data.ToString(); if (stateStr == "RaySourceConnected") { _isConnected = true; _logger.Info("收到 RaySourceConnected 推送,射线源已完成全部连接流程,准备就绪"); _eventAggregator.GetEvent().Publish(true); _logger.Info("射线源连接成功: {SourceName},状态更新为 Closed | X-ray source connectzed successfully: {SourceName}, status updated to Closed", SourceName); // 通知状态变更 | Notify status changed _eventAggregator.GetEvent().Publish(RaySourceStatus.Closed); } else if (stateStr == "VariablesConnected") { _isInitialized = true; _logger.Info("收到 VariablesConnected 推送,PVI 变量已创建、激活并绑定"); } else if (stateStr == "ServiceConnected") { _serviceConnectedEvent.Set(); _logger.Info("收到 ServiceConnected 推送,PVI Service 和 CPU 已连接"); } else if (stateStr == "Disconnected") { _isConnected = false; _isInitialized = false; _logger.Info("收到 Disconnected 推送,PVI 未连接或已断开"); _eventAggregator.GetEvent().Publish(false); } } #endregion #region 日志传递 /// /// 处理日志推送消息 /// 根据 Level 字段映射到 ILoggerService 对应方法 /// private void HandleLogPush(LogResponse logResponse) { if (logResponse == null) return; var message = logResponse.Message ?? ""; // 将 string[] 转换为 object[] 以匹配 ILoggerService 的 params object[] 签名 var args = logResponse.Args != null ? (object[])logResponse.Args : Array.Empty(); switch (logResponse.Level) { case "Debug": _logger.Debug(message, args); break; case "Info": _logger.Info(message, args); break; case "Warn": _logger.Warn(message, args); break; case "Error": _logger.Error(null, message, args); break; case "Fatal": _logger.Fatal(null, message, args); break; default: _logger.Debug(message, args); break; } } #endregion #region IXRaySource 接口实现 /// /// 等待 ServiceConnected 推送的信号量 /// Host 端 HandleInitialize 会等待 PVI 回调链完成后才返回响应 /// 但作为双重保险,客户端也等待 ServiceConnected 推送到达 /// private readonly ManualResetEventSlim _serviceConnectedEvent = new ManualResetEventSlim(false); /// /// 等待 ServiceConnected 推送的超时时间(毫秒) /// 应略大于 Host 端的 PVI 连接超时时间,因为还有管道通信延迟 /// private const int ServiceConnectedTimeoutMs = 35000; public override XRayResult Initialize() { _logger.Info("初始化 Comet225 射线源(IPC 模式)"); try { // 重置等待信号 _serviceConnectedEvent.Reset(); // 连接管道 Connect(); var command = new InitializeCommand { IpAddress = _config.PlcIpAddress, Port = _config.PlcPort, CpuName = _config.CpuName, SourcePort = _config.PortNumber, StationNumber = _config.StationNumber }; // Host 端 HandleInitialize 会等待 PVI 回调链完成后才返回响应 var response = SendCommand(command); var result = ProcessResponse(response, "Initialize"); if (!result.Success) { return result; } // 双重保险:等待 ServiceConnected 推送到达 // Host 返回成功意味着 PVI 回调链已完成,推送应该很快到达 if (!_serviceConnectedEvent.Wait(ServiceConnectedTimeoutMs)) { _logger.Warn("Initialize 命令成功但未收到 ServiceConnected 推送,可能推送丢失"); // Host 已确认成功,即使推送未到达也设置初始化标志 _isInitialized = true; } return result; } catch (Exception ex) { _logger.Error(ex, "Initialize 异常:{Message}", ex.Message); return XRayResult.Error($"Initialize 异常:{ex.Message}"); } } public override XRayResult ConnectVariables() { var response = SendCommand(new ConnectVariablesCommand()); return ProcessResponse(response, "ConnectVariables"); } public override XRayResult TurnOn() { var response = SendCommand(new TurnOnCommand()); return ProcessResponse(response, "TurnOn"); } public override XRayResult TurnOff() { var response = SendCommand(new TurnOffCommand()); return ProcessResponse(response, "TurnOff"); } public override XRayResult SetVoltage(float voltage) { var response = SendCommand(new SetVoltageCommand { Voltage = voltage }); return ProcessResponse(response, "SetVoltage"); } public override XRayResult SetCurrent(float current) { var response = SendCommand(new SetCurrentCommand { Current = current }); return ProcessResponse(response, "SetCurrent"); } public override XRayResult SetFocus(float focus) { _logger.Info("SetFocus 被调用,Comet 225kV 射线源不支持焦点设置"); return XRayResult.Ok("Comet 225kV 射线源不支持焦点设置"); } public override XRayResult ReadVoltage() { var response = SendCommand(new ReadVoltageCommand()); if (response is OperationResponse opResp && opResp.Success) { return XRayResult.Ok(opResp.Data); } return ProcessResponse(response, "ReadVoltage"); } public override XRayResult ReadCurrent() { var response = SendCommand(new ReadCurrentCommand()); if (response is OperationResponse opResp && opResp.Success) { return XRayResult.Ok(opResp.Data); } return ProcessResponse(response, "ReadCurrent"); } public override XRayResult ReadSystemStatus() { var response = SendCommand(new ReadSystemStatusCommand()); if (response is StatusResponse statusResp && statusResp.Success) { var statusData = new SystemStatusData { SetVoltage = statusResp.SetVoltage, ActualVoltage = statusResp.ActualVoltage, SetCurrent = statusResp.SetCurrent, ActualCurrent = statusResp.ActualCurrent, IsXRayOn = statusResp.IsXRayOn, WarmUpStatus = statusResp.WarmUpStatus, VacuumStatus = statusResp.VacuumStatus, StartUpStatus = statusResp.StartUpStatus, AutoCenterStatus = statusResp.AutoCenterStatus, FilamentAdjustStatus = statusResp.FilamentAdjustStatus, IsInterlockActive = statusResp.IsInterlockActive, WatchdogStatus = statusResp.WatchdogStatus, PowerMode = statusResp.PowerMode, TxiStatus = statusResp.TxiStatus }; return XRayResult.Ok(statusData); } return ProcessResponse(response, "ReadSystemStatus"); } public override XRayResult CheckErrors() { var response = SendCommand(new ReadErrorsCommand()); if (response is ErrorDataResponse errorResp && errorResp.Success) { return XRayResult.Ok(errorResp); } return ProcessResponse(response, "CheckErrors"); } public override XRayResult TxiOn() { var response = SendCommand(new TxiOnCommand()); return ProcessResponse(response, "TxiOn"); } public override XRayResult TxiOff() { var response = SendCommand(new TxiOffCommand()); return ProcessResponse(response, "TxiOff"); } public override XRayResult WarmUp() { var response = SendCommand(new WarmUpCommand()); return ProcessResponse(response, "WarmUp"); } public override XRayResult Training() { var response = SendCommand(new TrainingCommand()); return ProcessResponse(response, "Training"); } public override XRayResult FilamentCalibration() { var response = SendCommand(new FilamentCalibrationCommand()); return ProcessResponse(response, "FilamentCalibration"); } public override XRayResult AutoCenter() { var response = SendCommand(new AutoCenterCommand()); return ProcessResponse(response, "AutoCenter"); } public override XRayResult SetPowerMode(int mode) { var response = SendCommand(new SetPowerModeCommand { Mode = mode }); return ProcessResponse(response, "SetPowerMode"); } public override XRayResult CloseOff() { _logger.Info("执行 CloseOff 操作(IPC 模式)"); try { var response = SendCommand(new DisconnectCommand()); _isInitialized = false; _isConnected = false; return ProcessResponse(response, "CloseOff"); } catch (Exception ex) { _logger.Warn("CloseOff 过程中发生异常:{Message}", ex.Message); _isInitialized = false; _isConnected = false; return XRayResult.Error($"CloseOff 异常:{ex.Message}"); } } #endregion #region 辅助方法 /// /// 处理响应,将 Host 返回的响应转换为 XRayResult /// private XRayResult ProcessResponse(RaySourceResponse response, string operationName) { if (response == null) { var errorMsg = $"{operationName} 操作失败:未收到 Host 响应(管道可能已断开或超时)"; _logger.Error(null, errorMsg); return XRayResult.Error(errorMsg); } if (!response.Success) { _logger.Error(null, "{Operation} 操作失败:{ErrorMessage}", operationName, response.ErrorMessage); return XRayResult.Error(response.ErrorMessage); } return XRayResult.Ok(); } #endregion #region 资源释放 protected override void Dispose(bool disposing) { if (!_isDisposed && disposing) { _isRunning = false; _isPipeConnected = false; _serviceConnectedEvent?.Dispose(); _reader?.Dispose(); _cmdPipe?.Dispose(); _rspPipe?.Dispose(); _reader = null; _cmdPipe = null; _rspPipe = null; } base.Dispose(disposing); } #endregion } }