将Feature/XP.Common和Feature/XP.Hardware分支合并至Develop/XP.forHardwareAndCommon,完善XPapp注册和相关硬件类库通用类库功能。

This commit is contained in:
QI Mingxuan
2026-04-16 17:31:13 +08:00
parent 6ec4c3ddaa
commit 2bd6e566c3
581 changed files with 74600 additions and 222 deletions
@@ -0,0 +1,668 @@
using Prism.Events;
using System;
using System.IO;
using System.IO.Pipes;
using System.Threading;
using System.Threading.Tasks;
using Telerik.Windows.Documents.Selection;
using XP.Common.Logging.Interfaces;
using XP.Hardware.RaySource.Abstractions;
using XP.Hardware.RaySource.Abstractions.Enums;
using XP.Hardware.RaySource.Abstractions.Events;
using XP.Hardware.RaySource.Comet.Messages;
using XP.Hardware.RaySource.Comet.Messages.Commands;
using XP.Hardware.RaySource.Comet.Messages.Responses;
using XP.Hardware.RaySource.Config;
using XP.Hardware.RaySource.Services;
namespace XP.Hardware.RaySource.Implementations
{
/// <summary>
/// IPC 客户端
/// 通过 NamedPipe 与 Host 进程通信,实现 IXRaySource 接口
/// 上层代码无需感知进程隔离的存在
/// </summary>
public class CometIpcClient : XRaySourceBase
{
private readonly RaySourceConfig _config;
private readonly IEventAggregator _eventAggregator;
private readonly ILoggerService _logger;
private NamedPipeClientStream _cmdPipe;
private NamedPipeClientStream _rspPipe;
private StreamReader _reader;
private Thread _receiveThread;
private volatile bool _isRunning;
/// <summary>
/// 管道连接状态标志位
/// 用于在 Host 进程崩溃或管道断开后,阻止后续 SendCommand 尝试写入已断开的管道
/// </summary>
private volatile bool _isPipeConnected;
/// <summary>
/// 用于等待命令响应的同步机制
/// </summary>
private TaskCompletionSource<RaySourceResponse> _pendingResponse;
private readonly object _sendLock = new object();
/// <summary>
/// 命令管道名称(客户端写 → Host 读)
/// </summary>
private const string CmdPipeName = "XP.Hardware.RaySource.Comet.Cmd";
/// <summary>
/// 响应管道名称(Host 写 → 客户端读)
/// </summary>
private const string RspPipeName = "XP.Hardware.RaySource.Comet.Rsp";
/// <summary>
/// 命令超时时间(毫秒)
/// 需要大于 Host 端 PVI 连接超时时间(30s),因为 Initialize 命令会在 Host 端阻塞等待 PVI 回调链完成
/// </summary>
private const int CommandTimeoutMs = 35000;
public override string SourceName => "Comet 225kV (IPC)";
/// <summary>
/// 构造函数
/// </summary>
public CometIpcClient(
RaySourceConfig config,
IEventAggregator eventAggregator,
ILoggerService loggerService)
{
_config = config ?? throw new ArgumentNullException(nameof(config));
_eventAggregator = eventAggregator ?? throw new ArgumentNullException(nameof(eventAggregator));
_logger = loggerService?.ForModule<CometIpcClient>() ?? throw new ArgumentNullException(nameof(loggerService));
}
#region
/// <summary>
/// 连接到 Host 进程的 NamedPipe
/// </summary>
public void Connect()
{
try
{
// 命令管道:客户端写 → Host 读
_cmdPipe = new NamedPipeClientStream(".", CmdPipeName, PipeDirection.Out);
_cmdPipe.Connect(_config.ConnectionTimeout);
// 响应管道:Host 写 → 客户端读
_rspPipe = new NamedPipeClientStream(".", RspPipeName, PipeDirection.In);
_rspPipe.Connect(_config.ConnectionTimeout);
_reader = new StreamReader(_rspPipe);
// 启动后台消息接收线程
_isRunning = true;
_receiveThread = new Thread(ReceiveLoop)
{
IsBackground = true,
Name = "CometIpcClient-Receiver"
};
_receiveThread.Start();
_isPipeConnected = true;
_logger.Info("已连接到 Host 进程 NamedPipe(双管道模式)");
}
catch (Exception ex)
{
_logger.Error(ex, "连接 Host 进程 NamedPipe 失败:{Message}", ex.Message);
throw;
}
}
/// <summary>
/// 直接写入一行文本到命令管道
/// </summary>
private void WriteLineToHost(string text)
{
var bytes = System.Text.Encoding.UTF8.GetBytes(text + "\n");
_cmdPipe.Write(bytes, 0, bytes.Length);
}
#endregion
#region
/// <summary>
/// 发送命令并等待响应
/// </summary>
private RaySourceResponse SendCommand(RaySourceCommand command)
{
// 检查管道连接状态,如果已断开则直接返回 null
if (!_isPipeConnected)
{
_logger.Error(null, "管道未连接,无法发送命令:{CommandType}", command.CommandType);
return null;
}
lock (_sendLock)
{
try
{
_pendingResponse = new TaskCompletionSource<RaySourceResponse>();
var json = MessageSerializer.Serialize(command);
_logger.Debug("发送命令:{CommandType}JSON 长度={Length}", command.CommandType, json.Length);
WriteLineToHost(json);
_logger.Debug("命令已写入管道,开始等待响应:{CommandType}", command.CommandType);
// 等待响应,带超时
using (var cts = new CancellationTokenSource(CommandTimeoutMs))
{
cts.Token.Register(() => _pendingResponse?.TrySetCanceled());
try
{
_pendingResponse.Task.Wait();
var result = _pendingResponse.Task.Result;
_logger.Debug("收到命令响应:{CommandType}Success={Success}", command.CommandType, result?.Success);
return result;
}
catch (AggregateException ae) when (ae.InnerException is TaskCanceledException)
{
_logger.Error(null, "命令超时({TimeoutMs}ms):{CommandType}", CommandTimeoutMs, command.CommandType);
return null;
}
}
}
catch (IOException ex)
{
_logger.Error(ex, "管道通信异常(Host 进程可能已崩溃):{Message}", ex.Message);
return null;
}
catch (Exception ex)
{
_logger.Error(ex, "发送命令异常:{Message}", ex.Message);
return null;
}
}
}
#endregion
#region 线
/// <summary>
/// 后台消息接收循环
/// 持续读取管道消息,区分命令响应和主动推送
/// </summary>
private void ReceiveLoop()
{
try
{
_logger.Debug("接收线程已启动");
while (_isRunning)
{
var line = _reader.ReadLine();
if (line == null)
{
// 管道断开,设置标志位阻止后续 SendCommand 写入
_isPipeConnected = false;
_logger.Warn("Host 进程管道已断开");
_pendingResponse?.TrySetResult(null);
break;
}
_logger.Debug("收到管道消息,长度={Length}", line.Length);
var response = MessageSerializer.DeserializeResponse(line);
if (response == null)
{
_logger.Warn("收到无法反序列化的消息,原始内容:{Raw}", line.Length > 200 ? line.Substring(0, 200) + "..." : line);
continue;
}
if (response.IsPush)
{
_logger.Debug("收到推送消息:{PushType}", response.PushType);
// 主动推送消息,路由到对应的 Prism 事件
HandlePushMessage(response);
}
else
{
_logger.Debug("收到命令响应,Success={Success}", response.Success);
// 命令响应,通知等待的 SendCommand 调用
_pendingResponse?.TrySetResult(response);
}
}
}
catch (IOException ex)
{
_isPipeConnected = false;
_logger.Warn("接收线程 IO 异常(管道可能已断开):{Message}", ex.Message);
_pendingResponse?.TrySetResult(null);
}
catch (ObjectDisposedException)
{
// 管道已释放,正常退出
}
catch (Exception ex)
{
_logger.Error(ex, "接收线程异常:{Message}", ex.Message);
_pendingResponse?.TrySetResult(null);
}
}
#endregion
#region
/// <summary>
/// 处理推送消息,根据 PushType 路由到对应的 Prism 事件
/// </summary>
private void HandlePushMessage(RaySourceResponse response)
{
try
{
switch (response.PushType)
{
case "StatusChanged":
HandleStatusPush(response as StatusResponse);
break;
case "XRayStateChanged":
HandleXRayStatePush(response as OperationResponse);
break;
case "ErrorOccurred":
HandleErrorPush(response);
break;
case "ConnectionStateChanged":
HandleConnectionStatePush(response as OperationResponse);
break;
case "Log":
HandleLogPush(response as LogResponse);
break;
default:
_logger.Warn("收到未知推送类型:{PushType}", response.PushType);
break;
}
}
catch (Exception ex)
{
_logger.Error(ex, "处理推送消息异常:{Message}", ex.Message);
}
}
private void HandleStatusPush(StatusResponse status)
{
if (status == null) return;
var statusData = new SystemStatusData
{
SetVoltage = status.SetVoltage,
ActualVoltage = status.ActualVoltage,
SetCurrent = status.SetCurrent,
ActualCurrent = status.ActualCurrent,
IsXRayOn = status.IsXRayOn,
WarmUpStatus = status.WarmUpStatus,
VacuumStatus = status.VacuumStatus,
StartUpStatus = status.StartUpStatus,
AutoCenterStatus = status.AutoCenterStatus,
FilamentAdjustStatus = status.FilamentAdjustStatus,
IsInterlockActive = status.IsInterlockActive,
WatchdogStatus = status.WatchdogStatus,
PowerMode = status.PowerMode,
TxiStatus = status.TxiStatus
};
_eventAggregator.GetEvent<StatusUpdatedEvent>().Publish(statusData);
}
private void HandleXRayStatePush(OperationResponse response)
{
if (response?.Data == null) return;
var isXRayOn = Convert.ToBoolean(response.Data);
var rayStatus = isXRayOn ? RaySourceStatus.Opened : RaySourceStatus.Closed;
_eventAggregator.GetEvent<RaySourceStatusChangedEvent>().Publish(rayStatus);
}
private void HandleErrorPush(RaySourceResponse response)
{
_eventAggregator.GetEvent<ErrorOccurredEvent>().Publish(response.ErrorMessage);
}
private void HandleConnectionStatePush(OperationResponse response)
{
if (response?.Data == null) return;
var stateStr = response.Data.ToString();
if (stateStr == "RaySourceConnected")
{
_isConnected = true;
_logger.Info("收到 RaySourceConnected 推送,射线源已完成全部连接流程,准备就绪");
_eventAggregator.GetEvent<VariablesConnectedEvent>().Publish(true);
_logger.Info("射线源连接成功: {SourceName},状态更新为 Closed | X-ray source connectzed successfully: {SourceName}, status updated to Closed", SourceName);
// 通知状态变更 | Notify status changed
_eventAggregator.GetEvent<RaySourceStatusChangedEvent>().Publish(RaySourceStatus.Closed);
}
else if (stateStr == "VariablesConnected")
{
_isInitialized = true;
_logger.Info("收到 VariablesConnected 推送,PVI 变量已创建、激活并绑定");
}
else if (stateStr == "ServiceConnected")
{
_serviceConnectedEvent.Set();
_logger.Info("收到 ServiceConnected 推送,PVI Service 和 CPU 已连接");
}
else if (stateStr == "Disconnected")
{
_isConnected = false;
_isInitialized = false;
_logger.Info("收到 Disconnected 推送,PVI 未连接或已断开");
_eventAggregator.GetEvent<VariablesConnectedEvent>().Publish(false);
}
}
#endregion
#region
/// <summary>
/// 处理日志推送消息
/// 根据 Level 字段映射到 ILoggerService 对应方法
/// </summary>
private void HandleLogPush(LogResponse logResponse)
{
if (logResponse == null) return;
var message = logResponse.Message ?? "";
// 将 string[] 转换为 object[] 以匹配 ILoggerService 的 params object[] 签名
var args = logResponse.Args != null ? (object[])logResponse.Args : Array.Empty<object>();
switch (logResponse.Level)
{
case "Debug":
_logger.Debug(message, args);
break;
case "Info":
_logger.Info(message, args);
break;
case "Warn":
_logger.Warn(message, args);
break;
case "Error":
_logger.Error(null, message, args);
break;
case "Fatal":
_logger.Fatal(null, message, args);
break;
default:
_logger.Debug(message, args);
break;
}
}
#endregion
#region IXRaySource
/// <summary>
/// 等待 ServiceConnected 推送的信号量
/// Host 端 HandleInitialize 会等待 PVI 回调链完成后才返回响应
/// 但作为双重保险,客户端也等待 ServiceConnected 推送到达
/// </summary>
private readonly ManualResetEventSlim _serviceConnectedEvent = new ManualResetEventSlim(false);
/// <summary>
/// 等待 ServiceConnected 推送的超时时间(毫秒)
/// 应略大于 Host 端的 PVI 连接超时时间,因为还有管道通信延迟
/// </summary>
private const int ServiceConnectedTimeoutMs = 35000;
public override XRayResult Initialize()
{
_logger.Info("初始化 Comet225 射线源(IPC 模式)");
try
{
// 重置等待信号
_serviceConnectedEvent.Reset();
// 连接管道
Connect();
var command = new InitializeCommand
{
IpAddress = _config.PlcIpAddress,
Port = _config.PlcPort,
CpuName = _config.CpuName,
SourcePort = _config.PortNumber,
StationNumber = _config.StationNumber
};
// Host 端 HandleInitialize 会等待 PVI 回调链完成后才返回响应
var response = SendCommand(command);
var result = ProcessResponse(response, "Initialize");
if (!result.Success)
{
return result;
}
// 双重保险:等待 ServiceConnected 推送到达
// Host 返回成功意味着 PVI 回调链已完成,推送应该很快到达
if (!_serviceConnectedEvent.Wait(ServiceConnectedTimeoutMs))
{
_logger.Warn("Initialize 命令成功但未收到 ServiceConnected 推送,可能推送丢失");
// Host 已确认成功,即使推送未到达也设置初始化标志
_isInitialized = true;
}
return result;
}
catch (Exception ex)
{
_logger.Error(ex, "Initialize 异常:{Message}", ex.Message);
return XRayResult.Error($"Initialize 异常:{ex.Message}");
}
}
public override XRayResult ConnectVariables()
{
var response = SendCommand(new ConnectVariablesCommand());
return ProcessResponse(response, "ConnectVariables");
}
public override XRayResult TurnOn()
{
var response = SendCommand(new TurnOnCommand());
return ProcessResponse(response, "TurnOn");
}
public override XRayResult TurnOff()
{
var response = SendCommand(new TurnOffCommand());
return ProcessResponse(response, "TurnOff");
}
public override XRayResult SetVoltage(float voltage)
{
var response = SendCommand(new SetVoltageCommand { Voltage = voltage });
return ProcessResponse(response, "SetVoltage");
}
public override XRayResult SetCurrent(float current)
{
var response = SendCommand(new SetCurrentCommand { Current = current });
return ProcessResponse(response, "SetCurrent");
}
public override XRayResult SetFocus(float focus)
{
_logger.Info("SetFocus 被调用,Comet 225kV 射线源不支持焦点设置");
return XRayResult.Ok("Comet 225kV 射线源不支持焦点设置");
}
public override XRayResult ReadVoltage()
{
var response = SendCommand(new ReadVoltageCommand());
if (response is OperationResponse opResp && opResp.Success)
{
return XRayResult.Ok(opResp.Data);
}
return ProcessResponse(response, "ReadVoltage");
}
public override XRayResult ReadCurrent()
{
var response = SendCommand(new ReadCurrentCommand());
if (response is OperationResponse opResp && opResp.Success)
{
return XRayResult.Ok(opResp.Data);
}
return ProcessResponse(response, "ReadCurrent");
}
public override XRayResult ReadSystemStatus()
{
var response = SendCommand(new ReadSystemStatusCommand());
if (response is StatusResponse statusResp && statusResp.Success)
{
var statusData = new SystemStatusData
{
SetVoltage = statusResp.SetVoltage,
ActualVoltage = statusResp.ActualVoltage,
SetCurrent = statusResp.SetCurrent,
ActualCurrent = statusResp.ActualCurrent,
IsXRayOn = statusResp.IsXRayOn,
WarmUpStatus = statusResp.WarmUpStatus,
VacuumStatus = statusResp.VacuumStatus,
StartUpStatus = statusResp.StartUpStatus,
AutoCenterStatus = statusResp.AutoCenterStatus,
FilamentAdjustStatus = statusResp.FilamentAdjustStatus,
IsInterlockActive = statusResp.IsInterlockActive,
WatchdogStatus = statusResp.WatchdogStatus,
PowerMode = statusResp.PowerMode,
TxiStatus = statusResp.TxiStatus
};
return XRayResult.Ok(statusData);
}
return ProcessResponse(response, "ReadSystemStatus");
}
public override XRayResult CheckErrors()
{
var response = SendCommand(new ReadErrorsCommand());
if (response is ErrorDataResponse errorResp && errorResp.Success)
{
return XRayResult.Ok(errorResp);
}
return ProcessResponse(response, "CheckErrors");
}
public override XRayResult TxiOn()
{
var response = SendCommand(new TxiOnCommand());
return ProcessResponse(response, "TxiOn");
}
public override XRayResult TxiOff()
{
var response = SendCommand(new TxiOffCommand());
return ProcessResponse(response, "TxiOff");
}
public override XRayResult WarmUp()
{
var response = SendCommand(new WarmUpCommand());
return ProcessResponse(response, "WarmUp");
}
public override XRayResult Training()
{
var response = SendCommand(new TrainingCommand());
return ProcessResponse(response, "Training");
}
public override XRayResult FilamentCalibration()
{
var response = SendCommand(new FilamentCalibrationCommand());
return ProcessResponse(response, "FilamentCalibration");
}
public override XRayResult AutoCenter()
{
var response = SendCommand(new AutoCenterCommand());
return ProcessResponse(response, "AutoCenter");
}
public override XRayResult SetPowerMode(int mode)
{
var response = SendCommand(new SetPowerModeCommand { Mode = mode });
return ProcessResponse(response, "SetPowerMode");
}
public override XRayResult CloseOff()
{
_logger.Info("执行 CloseOff 操作(IPC 模式)");
try
{
var response = SendCommand(new DisconnectCommand());
_isInitialized = false;
_isConnected = false;
return ProcessResponse(response, "CloseOff");
}
catch (Exception ex)
{
_logger.Warn("CloseOff 过程中发生异常:{Message}", ex.Message);
_isInitialized = false;
_isConnected = false;
return XRayResult.Error($"CloseOff 异常:{ex.Message}");
}
}
#endregion
#region
/// <summary>
/// 处理响应,将 Host 返回的响应转换为 XRayResult
/// </summary>
private XRayResult ProcessResponse(RaySourceResponse response, string operationName)
{
if (response == null)
{
var errorMsg = $"{operationName} 操作失败:未收到 Host 响应(管道可能已断开或超时)";
_logger.Error(null, errorMsg);
return XRayResult.Error(errorMsg);
}
if (!response.Success)
{
_logger.Error(null, "{Operation} 操作失败:{ErrorMessage}", operationName, response.ErrorMessage);
return XRayResult.Error(response.ErrorMessage);
}
return XRayResult.Ok();
}
#endregion
#region
protected override void Dispose(bool disposing)
{
if (!_isDisposed && disposing)
{
_isRunning = false;
_isPipeConnected = false;
_serviceConnectedEvent?.Dispose();
_reader?.Dispose();
_cmdPipe?.Dispose();
_rspPipe?.Dispose();
_reader = null;
_cmdPipe = null;
_rspPipe = null;
}
base.Dispose(disposing);
}
#endregion
}
}