Files
XplorePlane/XplorePlane/Services/MainViewport/DetectorFramePipelineService.cs
T
2026-05-06 23:18:28 +08:00

179 lines
6.4 KiB
C#

using Prism.Events;
using System;
using System.Collections.Concurrent;
using System.Configuration;
using System.Threading;
using System.Threading.Tasks;
using XP.Common.Converters;
using XP.Common.Logging.Interfaces;
using XP.Hardware.Detector.Abstractions;
using XP.Hardware.Detector.Abstractions.Events;
namespace XplorePlane.Services.MainViewport
{
public sealed class DetectorFramePipelineService : IDetectorFramePipelineService
{
private readonly ConcurrentQueue<DetectorFrame> _acquireQueue = new();
private readonly ConcurrentQueue<DetectorFrame> _processQueue = new();
private readonly SemaphoreSlim _processSignal = new(0);
private readonly CancellationTokenSource _shutdown = new();
private readonly IMainViewportService _mainViewportService;
private readonly ILoggerService _logger;
private readonly Task _processConsumerTask;
private int _acquireQueueCount;
private int _processQueueCount;
private long _receivedFrameCount;
private bool _disposed;
public DetectorFramePipelineService(
IEventAggregator eventAggregator,
IMainViewportService mainViewportService,
ILoggerService logger)
: this(eventAggregator, mainViewportService, logger,
ReadInt("DetectorPipeline:AcquireQueueCapacity", 16, 1),
ReadInt("DetectorPipeline:ProcessQueueCapacity", 8, 1),
ReadInt("DetectorPipeline:ProcessEveryNFrames", 1, 1))
{
}
/// <summary>
/// Internal constructor for testing: accepts capacity and sampling values directly,
/// bypassing App.config reads.
/// </summary>
internal DetectorFramePipelineService(
IEventAggregator eventAggregator,
IMainViewportService mainViewportService,
ILoggerService logger,
int acquireQueueCapacity,
int processQueueCapacity,
int processEveryNFrames)
{
ArgumentNullException.ThrowIfNull(eventAggregator);
_mainViewportService = mainViewportService ?? throw new ArgumentNullException(nameof(mainViewportService));
_logger = logger?.ForModule<DetectorFramePipelineService>() ?? throw new ArgumentNullException(nameof(logger));
AcquireQueueCapacity = Math.Max(1, acquireQueueCapacity);
ProcessQueueCapacity = Math.Max(1, processQueueCapacity);
ProcessEveryNFrames = Math.Max(1, processEveryNFrames);
eventAggregator.GetEvent<ImageCapturedEvent>()
.Subscribe(OnImageCaptured, ThreadOption.BackgroundThread);
_processConsumerTask = Task.Run(ProcessLoopAsync);
}
public int AcquireQueueCount => Volatile.Read(ref _acquireQueueCount);
public int ProcessQueueCount => Volatile.Read(ref _processQueueCount);
public int AcquireQueueCapacity { get; }
public int ProcessQueueCapacity { get; }
public int ProcessEveryNFrames { get; }
public event EventHandler<DetectorFrame> ProcessFrameDequeued;
private void OnImageCaptured(ImageCapturedEventArgs args)
{
if (_disposed || args?.ImageData == null || args.Width <= 0 || args.Height <= 0)
return;
try
{
var rawPixels = new ushort[args.ImageData.Length];
Array.Copy(args.ImageData, rawPixels, rawPixels.Length);
var bitmap = XP.Common.Converters.ImageConverter.ConvertGray16ToBitmapSource(rawPixels, (int)args.Width, (int)args.Height);
bitmap.Freeze();
var frame = new DetectorFrame(
frameId: args.FrameNumber,
captureTime: args.CaptureTime,
width: (int)args.Width,
height: (int)args.Height,
rawPixels: rawPixels,
previewImage: bitmap);
EnqueueBounded(_acquireQueue, frame, AcquireQueueCapacity, ref _acquireQueueCount);
_mainViewportService.UpdateDetectorFrame(frame);
var sequence = Interlocked.Increment(ref _receivedFrameCount);
if ((sequence - 1) % ProcessEveryNFrames == 0)
{
EnqueueBounded(_processQueue, frame, ProcessQueueCapacity, ref _processQueueCount);
_processSignal.Release();
}
}
catch (Exception ex)
{
_logger.Error(ex, "探测器帧进入主界面流水线失败");
}
}
private async Task ProcessLoopAsync()
{
try
{
while (!_shutdown.IsCancellationRequested)
{
await _processSignal.WaitAsync(_shutdown.Token).ConfigureAwait(false);
while (_processQueue.TryDequeue(out var frame))
{
Interlocked.Decrement(ref _processQueueCount);
ProcessFrameDequeued?.Invoke(this, frame);
}
}
}
catch (OperationCanceledException)
{
}
catch (Exception ex)
{
_logger.Error(ex, "探测器处理队列后台消费者异常退出");
}
}
private static void EnqueueBounded(
ConcurrentQueue<DetectorFrame> queue,
DetectorFrame frame,
int capacity,
ref int queueCount)
{
queue.Enqueue(frame);
var count = Interlocked.Increment(ref queueCount);
while (count > capacity && queue.TryDequeue(out _))
{
count = Interlocked.Decrement(ref queueCount);
}
}
private static int ReadInt(string key, int defaultValue, int minValue)
{
var raw = ConfigurationManager.AppSettings[key];
if (!int.TryParse(raw, out var parsed))
return defaultValue;
return parsed < minValue ? minValue : parsed;
}
public void Dispose()
{
if (_disposed)
return;
_disposed = true;
_shutdown.Cancel();
try
{
_processConsumerTask.Wait(TimeSpan.FromSeconds(2));
}
catch
{
}
_processSignal.Dispose();
_shutdown.Dispose();
}
}
}