Files

669 lines
25 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using Prism.Events;
using System;
using System.IO;
using System.IO.Pipes;
using System.Threading;
using System.Threading.Tasks;
using Telerik.Windows.Documents.Selection;
using XP.Common.Logging.Interfaces;
using XP.Hardware.RaySource.Abstractions;
using XP.Hardware.RaySource.Abstractions.Enums;
using XP.Hardware.RaySource.Abstractions.Events;
using XP.Hardware.RaySource.Comet.Messages;
using XP.Hardware.RaySource.Comet.Messages.Commands;
using XP.Hardware.RaySource.Comet.Messages.Responses;
using XP.Hardware.RaySource.Config;
using XP.Hardware.RaySource.Services;
namespace XP.Hardware.RaySource.Implementations
{
/// <summary>
/// IPC 客户端
/// 通过 NamedPipe 与 Host 进程通信,实现 IXRaySource 接口
/// 上层代码无需感知进程隔离的存在
/// </summary>
public class CometIpcClient : XRaySourceBase
{
private readonly RaySourceConfig _config;
private readonly IEventAggregator _eventAggregator;
private readonly ILoggerService _logger;
private NamedPipeClientStream _cmdPipe;
private NamedPipeClientStream _rspPipe;
private StreamReader _reader;
private Thread _receiveThread;
private volatile bool _isRunning;
/// <summary>
/// 管道连接状态标志位
/// 用于在 Host 进程崩溃或管道断开后,阻止后续 SendCommand 尝试写入已断开的管道
/// </summary>
private volatile bool _isPipeConnected;
/// <summary>
/// 用于等待命令响应的同步机制
/// </summary>
private TaskCompletionSource<RaySourceResponse> _pendingResponse;
private readonly object _sendLock = new object();
/// <summary>
/// 命令管道名称(客户端写 → Host 读)
/// </summary>
private const string CmdPipeName = "XP.Hardware.RaySource.Comet.Cmd";
/// <summary>
/// 响应管道名称(Host 写 → 客户端读)
/// </summary>
private const string RspPipeName = "XP.Hardware.RaySource.Comet.Rsp";
/// <summary>
/// 命令超时时间(毫秒)
/// 需要大于 Host 端 PVI 连接超时时间(30s),因为 Initialize 命令会在 Host 端阻塞等待 PVI 回调链完成
/// </summary>
private const int CommandTimeoutMs = 35000;
public override string SourceName => "Comet 225kV (IPC)";
/// <summary>
/// 构造函数
/// </summary>
public CometIpcClient(
RaySourceConfig config,
IEventAggregator eventAggregator,
ILoggerService loggerService)
{
_config = config ?? throw new ArgumentNullException(nameof(config));
_eventAggregator = eventAggregator ?? throw new ArgumentNullException(nameof(eventAggregator));
_logger = loggerService?.ForModule<CometIpcClient>() ?? throw new ArgumentNullException(nameof(loggerService));
}
#region
/// <summary>
/// 连接到 Host 进程的 NamedPipe
/// </summary>
public void Connect()
{
try
{
// 命令管道:客户端写 → Host 读
_cmdPipe = new NamedPipeClientStream(".", CmdPipeName, PipeDirection.Out);
_cmdPipe.Connect(_config.ConnectionTimeout);
// 响应管道:Host 写 → 客户端读
_rspPipe = new NamedPipeClientStream(".", RspPipeName, PipeDirection.In);
_rspPipe.Connect(_config.ConnectionTimeout);
_reader = new StreamReader(_rspPipe);
// 启动后台消息接收线程
_isRunning = true;
_receiveThread = new Thread(ReceiveLoop)
{
IsBackground = true,
Name = "CometIpcClient-Receiver"
};
_receiveThread.Start();
_isPipeConnected = true;
_logger.Info("已连接到 Host 进程 NamedPipe(双管道模式)");
}
catch (Exception ex)
{
_logger.Error(ex, "连接 Host 进程 NamedPipe 失败:{Message}", ex.Message);
throw;
}
}
/// <summary>
/// 直接写入一行文本到命令管道
/// </summary>
private void WriteLineToHost(string text)
{
var bytes = System.Text.Encoding.UTF8.GetBytes(text + "\n");
_cmdPipe.Write(bytes, 0, bytes.Length);
}
#endregion
#region
/// <summary>
/// 发送命令并等待响应
/// </summary>
private RaySourceResponse SendCommand(RaySourceCommand command)
{
// 检查管道连接状态,如果已断开则直接返回 null
if (!_isPipeConnected)
{
_logger.Error(null, "管道未连接,无法发送命令:{CommandType}", command.CommandType);
return null;
}
lock (_sendLock)
{
try
{
_pendingResponse = new TaskCompletionSource<RaySourceResponse>();
var json = MessageSerializer.Serialize(command);
_logger.Debug("发送命令:{CommandType}JSON 长度={Length}", command.CommandType, json.Length);
WriteLineToHost(json);
_logger.Debug("命令已写入管道,开始等待响应:{CommandType}", command.CommandType);
// 等待响应,带超时
using (var cts = new CancellationTokenSource(CommandTimeoutMs))
{
cts.Token.Register(() => _pendingResponse?.TrySetCanceled());
try
{
_pendingResponse.Task.Wait();
var result = _pendingResponse.Task.Result;
_logger.Debug("收到命令响应:{CommandType}Success={Success}", command.CommandType, result?.Success);
return result;
}
catch (AggregateException ae) when (ae.InnerException is TaskCanceledException)
{
_logger.Error(null, "命令超时({TimeoutMs}ms):{CommandType}", CommandTimeoutMs, command.CommandType);
return null;
}
}
}
catch (IOException ex)
{
_logger.Error(ex, "管道通信异常(Host 进程可能已崩溃):{Message}", ex.Message);
return null;
}
catch (Exception ex)
{
_logger.Error(ex, "发送命令异常:{Message}", ex.Message);
return null;
}
}
}
#endregion
#region 线
/// <summary>
/// 后台消息接收循环
/// 持续读取管道消息,区分命令响应和主动推送
/// </summary>
private void ReceiveLoop()
{
try
{
_logger.Debug("接收线程已启动");
while (_isRunning)
{
var line = _reader.ReadLine();
if (line == null)
{
// 管道断开,设置标志位阻止后续 SendCommand 写入
_isPipeConnected = false;
_logger.Warn("Host 进程管道已断开");
_pendingResponse?.TrySetResult(null);
break;
}
_logger.Debug("收到管道消息,长度={Length}", line.Length);
var response = MessageSerializer.DeserializeResponse(line);
if (response == null)
{
_logger.Warn("收到无法反序列化的消息,原始内容:{Raw}", line.Length > 200 ? line.Substring(0, 200) + "..." : line);
continue;
}
if (response.IsPush)
{
_logger.Debug("收到推送消息:{PushType}", response.PushType);
// 主动推送消息,路由到对应的 Prism 事件
HandlePushMessage(response);
}
else
{
_logger.Debug("收到命令响应,Success={Success}", response.Success);
// 命令响应,通知等待的 SendCommand 调用
_pendingResponse?.TrySetResult(response);
}
}
}
catch (IOException ex)
{
_isPipeConnected = false;
_logger.Warn("接收线程 IO 异常(管道可能已断开):{Message}", ex.Message);
_pendingResponse?.TrySetResult(null);
}
catch (ObjectDisposedException)
{
// 管道已释放,正常退出
}
catch (Exception ex)
{
_logger.Error(ex, "接收线程异常:{Message}", ex.Message);
_pendingResponse?.TrySetResult(null);
}
}
#endregion
#region
/// <summary>
/// 处理推送消息,根据 PushType 路由到对应的 Prism 事件
/// </summary>
private void HandlePushMessage(RaySourceResponse response)
{
try
{
switch (response.PushType)
{
case "StatusChanged":
HandleStatusPush(response as StatusResponse);
break;
case "XRayStateChanged":
HandleXRayStatePush(response as OperationResponse);
break;
case "ErrorOccurred":
HandleErrorPush(response);
break;
case "ConnectionStateChanged":
HandleConnectionStatePush(response as OperationResponse);
break;
case "Log":
HandleLogPush(response as LogResponse);
break;
default:
_logger.Warn("收到未知推送类型:{PushType}", response.PushType);
break;
}
}
catch (Exception ex)
{
_logger.Error(ex, "处理推送消息异常:{Message}", ex.Message);
}
}
private void HandleStatusPush(StatusResponse status)
{
if (status == null) return;
var statusData = new SystemStatusData
{
SetVoltage = status.SetVoltage,
ActualVoltage = status.ActualVoltage,
SetCurrent = status.SetCurrent,
ActualCurrent = status.ActualCurrent,
IsXRayOn = status.IsXRayOn,
WarmUpStatus = status.WarmUpStatus,
VacuumStatus = status.VacuumStatus,
StartUpStatus = status.StartUpStatus,
AutoCenterStatus = status.AutoCenterStatus,
FilamentAdjustStatus = status.FilamentAdjustStatus,
IsInterlockActive = status.IsInterlockActive,
WatchdogStatus = status.WatchdogStatus,
PowerMode = status.PowerMode,
TxiStatus = status.TxiStatus
};
_eventAggregator.GetEvent<StatusUpdatedEvent>().Publish(statusData);
}
private void HandleXRayStatePush(OperationResponse response)
{
if (response?.Data == null) return;
var isXRayOn = Convert.ToBoolean(response.Data);
var rayStatus = isXRayOn ? RaySourceStatus.Opened : RaySourceStatus.Closed;
_eventAggregator.GetEvent<RaySourceStatusChangedEvent>().Publish(rayStatus);
}
private void HandleErrorPush(RaySourceResponse response)
{
_eventAggregator.GetEvent<ErrorOccurredEvent>().Publish(response.ErrorMessage);
}
private void HandleConnectionStatePush(OperationResponse response)
{
if (response?.Data == null) return;
var stateStr = response.Data.ToString();
if (stateStr == "RaySourceConnected")
{
_isConnected = true;
_logger.Info("收到 RaySourceConnected 推送,射线源已完成全部连接流程,准备就绪");
_eventAggregator.GetEvent<VariablesConnectedEvent>().Publish(true);
_logger.Info("射线源连接成功: {SourceName},状态更新为 Closed | X-ray source connectzed successfully: {SourceName}, status updated to Closed", SourceName);
// 通知状态变更 | Notify status changed
_eventAggregator.GetEvent<RaySourceStatusChangedEvent>().Publish(RaySourceStatus.Closed);
}
else if (stateStr == "VariablesConnected")
{
_isInitialized = true;
_logger.Info("收到 VariablesConnected 推送,PVI 变量已创建、激活并绑定");
}
else if (stateStr == "ServiceConnected")
{
_serviceConnectedEvent.Set();
_logger.Info("收到 ServiceConnected 推送,PVI Service 和 CPU 已连接");
}
else if (stateStr == "Disconnected")
{
_isConnected = false;
_isInitialized = false;
_logger.Info("收到 Disconnected 推送,PVI 未连接或已断开");
_eventAggregator.GetEvent<VariablesConnectedEvent>().Publish(false);
}
}
#endregion
#region
/// <summary>
/// 处理日志推送消息
/// 根据 Level 字段映射到 ILoggerService 对应方法
/// </summary>
private void HandleLogPush(LogResponse logResponse)
{
if (logResponse == null) return;
var message = logResponse.Message ?? "";
// 将 string[] 转换为 object[] 以匹配 ILoggerService 的 params object[] 签名
var args = logResponse.Args != null ? (object[])logResponse.Args : Array.Empty<object>();
switch (logResponse.Level)
{
case "Debug":
_logger.Debug(message, args);
break;
case "Info":
_logger.Info(message, args);
break;
case "Warn":
_logger.Warn(message, args);
break;
case "Error":
_logger.Error(null, message, args);
break;
case "Fatal":
_logger.Fatal(null, message, args);
break;
default:
_logger.Debug(message, args);
break;
}
}
#endregion
#region IXRaySource
/// <summary>
/// 等待 ServiceConnected 推送的信号量
/// Host 端 HandleInitialize 会等待 PVI 回调链完成后才返回响应
/// 但作为双重保险,客户端也等待 ServiceConnected 推送到达
/// </summary>
private readonly ManualResetEventSlim _serviceConnectedEvent = new ManualResetEventSlim(false);
/// <summary>
/// 等待 ServiceConnected 推送的超时时间(毫秒)
/// 应略大于 Host 端的 PVI 连接超时时间,因为还有管道通信延迟
/// </summary>
private const int ServiceConnectedTimeoutMs = 35000;
public override XRayResult Initialize()
{
_logger.Info("初始化 Comet225 射线源(IPC 模式)");
try
{
// 重置等待信号
_serviceConnectedEvent.Reset();
// 连接管道
Connect();
var command = new InitializeCommand
{
IpAddress = _config.PlcIpAddress,
Port = _config.PlcPort,
CpuName = _config.CpuName,
SourcePort = _config.PortNumber,
StationNumber = _config.StationNumber
};
// Host 端 HandleInitialize 会等待 PVI 回调链完成后才返回响应
var response = SendCommand(command);
var result = ProcessResponse(response, "Initialize");
if (!result.Success)
{
return result;
}
// 双重保险:等待 ServiceConnected 推送到达
// Host 返回成功意味着 PVI 回调链已完成,推送应该很快到达
if (!_serviceConnectedEvent.Wait(ServiceConnectedTimeoutMs))
{
_logger.Warn("Initialize 命令成功但未收到 ServiceConnected 推送,可能推送丢失");
// Host 已确认成功,即使推送未到达也设置初始化标志
_isInitialized = true;
}
return result;
}
catch (Exception ex)
{
_logger.Error(ex, "Initialize 异常:{Message}", ex.Message);
return XRayResult.Error($"Initialize 异常:{ex.Message}");
}
}
public override XRayResult ConnectVariables()
{
var response = SendCommand(new ConnectVariablesCommand());
return ProcessResponse(response, "ConnectVariables");
}
public override XRayResult TurnOn()
{
var response = SendCommand(new TurnOnCommand());
return ProcessResponse(response, "TurnOn");
}
public override XRayResult TurnOff()
{
var response = SendCommand(new TurnOffCommand());
return ProcessResponse(response, "TurnOff");
}
public override XRayResult SetVoltage(float voltage)
{
var response = SendCommand(new SetVoltageCommand { Voltage = voltage });
return ProcessResponse(response, "SetVoltage");
}
public override XRayResult SetCurrent(float current)
{
var response = SendCommand(new SetCurrentCommand { Current = current });
return ProcessResponse(response, "SetCurrent");
}
public override XRayResult SetFocus(float focus)
{
_logger.Info("SetFocus 被调用,Comet 225kV 射线源不支持焦点设置");
return XRayResult.Ok("Comet 225kV 射线源不支持焦点设置");
}
public override XRayResult ReadVoltage()
{
var response = SendCommand(new ReadVoltageCommand());
if (response is OperationResponse opResp && opResp.Success)
{
return XRayResult.Ok(opResp.Data);
}
return ProcessResponse(response, "ReadVoltage");
}
public override XRayResult ReadCurrent()
{
var response = SendCommand(new ReadCurrentCommand());
if (response is OperationResponse opResp && opResp.Success)
{
return XRayResult.Ok(opResp.Data);
}
return ProcessResponse(response, "ReadCurrent");
}
public override XRayResult ReadSystemStatus()
{
var response = SendCommand(new ReadSystemStatusCommand());
if (response is StatusResponse statusResp && statusResp.Success)
{
var statusData = new SystemStatusData
{
SetVoltage = statusResp.SetVoltage,
ActualVoltage = statusResp.ActualVoltage,
SetCurrent = statusResp.SetCurrent,
ActualCurrent = statusResp.ActualCurrent,
IsXRayOn = statusResp.IsXRayOn,
WarmUpStatus = statusResp.WarmUpStatus,
VacuumStatus = statusResp.VacuumStatus,
StartUpStatus = statusResp.StartUpStatus,
AutoCenterStatus = statusResp.AutoCenterStatus,
FilamentAdjustStatus = statusResp.FilamentAdjustStatus,
IsInterlockActive = statusResp.IsInterlockActive,
WatchdogStatus = statusResp.WatchdogStatus,
PowerMode = statusResp.PowerMode,
TxiStatus = statusResp.TxiStatus
};
return XRayResult.Ok(statusData);
}
return ProcessResponse(response, "ReadSystemStatus");
}
public override XRayResult CheckErrors()
{
var response = SendCommand(new ReadErrorsCommand());
if (response is ErrorDataResponse errorResp && errorResp.Success)
{
return XRayResult.Ok(errorResp);
}
return ProcessResponse(response, "CheckErrors");
}
public override XRayResult TxiOn()
{
var response = SendCommand(new TxiOnCommand());
return ProcessResponse(response, "TxiOn");
}
public override XRayResult TxiOff()
{
var response = SendCommand(new TxiOffCommand());
return ProcessResponse(response, "TxiOff");
}
public override XRayResult WarmUp()
{
var response = SendCommand(new WarmUpCommand());
return ProcessResponse(response, "WarmUp");
}
public override XRayResult Training()
{
var response = SendCommand(new TrainingCommand());
return ProcessResponse(response, "Training");
}
public override XRayResult FilamentCalibration()
{
var response = SendCommand(new FilamentCalibrationCommand());
return ProcessResponse(response, "FilamentCalibration");
}
public override XRayResult AutoCenter()
{
var response = SendCommand(new AutoCenterCommand());
return ProcessResponse(response, "AutoCenter");
}
public override XRayResult SetPowerMode(int mode)
{
var response = SendCommand(new SetPowerModeCommand { Mode = mode });
return ProcessResponse(response, "SetPowerMode");
}
public override XRayResult CloseOff()
{
_logger.Info("执行 CloseOff 操作(IPC 模式)");
try
{
var response = SendCommand(new DisconnectCommand());
_isInitialized = false;
_isConnected = false;
return ProcessResponse(response, "CloseOff");
}
catch (Exception ex)
{
_logger.Warn("CloseOff 过程中发生异常:{Message}", ex.Message);
_isInitialized = false;
_isConnected = false;
return XRayResult.Error($"CloseOff 异常:{ex.Message}");
}
}
#endregion
#region
/// <summary>
/// 处理响应,将 Host 返回的响应转换为 XRayResult
/// </summary>
private XRayResult ProcessResponse(RaySourceResponse response, string operationName)
{
if (response == null)
{
var errorMsg = $"{operationName} 操作失败:未收到 Host 响应(管道可能已断开或超时)";
_logger.Error(null, errorMsg);
return XRayResult.Error(errorMsg);
}
if (!response.Success)
{
_logger.Error(null, "{Operation} 操作失败:{ErrorMessage}", operationName, response.ErrorMessage);
return XRayResult.Error(response.ErrorMessage);
}
return XRayResult.Ok();
}
#endregion
#region
protected override void Dispose(bool disposing)
{
if (!_isDisposed && disposing)
{
_isRunning = false;
_isPipeConnected = false;
_serviceConnectedEvent?.Dispose();
_reader?.Dispose();
_cmdPipe?.Dispose();
_rspPipe?.Dispose();
_reader = null;
_cmdPipe = null;
_rspPipe = null;
}
base.Dispose(disposing);
}
#endregion
}
}