You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kev/Drawer/IPCLib/PipeManager.cs

369 lines
12 KiB
C#

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Pipes;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
namespace IPCLib
{
public class PipeManager : IDisposable
{
private NamedPipeServerStream _serverStream;
private NamedPipeClientStream _clientStream;
private Stream _activeStream;
private bool _isServer;
private CancellationTokenSource _cts;
private bool _isConnected;
// private readonly object _lock = new object();
private readonly SemaphoreSlim _sendLock = new SemaphoreSlim(1, 1);
private readonly object _stateLock = new object();
// 心跳配置
private readonly int _heartbeatIntervalMs = 2000;
private readonly int _heartbeatTimeoutMs = 10000;
private System.Timers.Timer _heartbeatTimer;
private DateTime _lastReceivedTime;
private Task _heartbeatMonitorTask;
// 缓冲区大小(增大到 4KB
private const int BufferSize = 2048;
public event Action<PipeMessage> MessageReceived;
public event Action<bool> ConnectionStatusChanged;
public event Action<string> ErrorOccurred;
public event Action HeartbeatTimeout;
public PipeManager(string pipeName, bool isServer)
{
PipeName = pipeName;
_isServer = isServer;
_cts = new CancellationTokenSource();
}
public string PipeName { get; private set; }
public async Task StartAsync()
{
try
{
if (_isServer)
{
// 允许最多 2 个实例,防止连接冲突
_serverStream = new NamedPipeServerStream(
PipeName,
PipeDirection.InOut,
2,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous
);
Log("等待客户端连接...");
await _serverStream.WaitForConnectionAsync(_cts.Token);
_activeStream = _serverStream;
Log("客户端已连接");
}
else
{
while (!_cts.Token.IsCancellationRequested)
{
try
{
_clientStream = new NamedPipeClientStream(
".",
PipeName,
PipeDirection.InOut,
PipeOptions.Asynchronous
);
Log("正在连接服务端...");
await _clientStream.ConnectAsync(5000, _cts.Token);
_activeStream = _clientStream;
Log("已连接服务端");
break;
}
catch (Exception ex) when (ex is TimeoutException || ex is IOException)
{
Log($"连接失败:{ex.Message}2 秒后重试...");
await Task.Delay(2000, _cts.Token);
}
}
}
SetConnected(true);
_lastReceivedTime = DateTime.Now;
_ = ReadLoopAsync();
StartHeartbeatSystem();
}
catch (OperationCanceledException)
{
Log("操作已取消");
}
catch (Exception ex)
{
Log($"启动失败:{ex.Message}");
ErrorOccurred?.Invoke(ex.Message);
HandleDisconnect();
}
}
Int64 g_count = 0;
private void StartHeartbeatSystem()
{
_heartbeatTimer = new System.Timers.Timer(_heartbeatIntervalMs);
_heartbeatTimer.Elapsed += async (s, e) => await SendHeartbeatAsync();
_heartbeatTimer.AutoReset = true;
_heartbeatTimer.Enabled = true;
Trace.WriteLine("start check");
_heartbeatMonitorTask = Task.Run(async () =>
{
while (!_cts.Token.IsCancellationRequested && _isConnected)
{
await Task.Delay(1000, _cts.Token);
g_count++;
Trace.WriteLine($"心跳检查 {_lastReceivedTime.ToString()} server={_isServer} count={g_count}");
if ((DateTime.Now - _lastReceivedTime).TotalMilliseconds > _heartbeatTimeoutMs)
{
Log($"心跳超时,判定连接断开-now={DateTime.Now.ToString()},last={_lastReceivedTime.ToString()}");
HeartbeatTimeout?.Invoke();
HandleDisconnect();
break;
}
}
});
}
private async Task SendHeartbeatAsync()
{
if (!_isConnected || _activeStream == null) return;
try
{
var msg = new PipeMessage { Type = MessageType.Heartbeat, Timestamp = DateTime.Now };
await SendAsync(msg);
}
catch (Exception ex)
{
Log($"发送心跳失败:{ex.Message}");
}
}
private async Task ReadLoopAsync()
{
var buffer = new byte[BufferSize];
var sb = new StringBuilder();
try
{
while (!_cts.Token.IsCancellationRequested /*&& _activeStream != null*/)
{
Stream currentStream;
lock (_stateLock)
{
currentStream = _activeStream;
if (currentStream == null) break;
}
int bytesRead = 0;
try
{
bytesRead = await _activeStream.ReadAsync(buffer, 0, buffer.Length, _cts.Token);
}
catch (IOException ex) when (ex.Message.Contains("broken") || ex.Message.Contains("已关闭"))
{
Log($"管道断开:{ex.Message}");
throw;
}
if (bytesRead == 0)
{
Log("管道已关闭(读取 0 字节)");
throw new IOException("Pipe disconnected");
}
_lastReceivedTime = DateTime.Now;
Trace.WriteLine($"recv time {_lastReceivedTime.ToString()} server={_isServer}");
string text = Encoding.UTF8.GetString(buffer, 0, bytesRead);
sb.Append(text);
// 处理完整消息(以换行符为界)
while (sb.ToString().Contains("\n"))
{
int newlineIndex = sb.ToString().IndexOf("\n");
string line = sb.ToString().Substring(0, newlineIndex).Trim();
sb.Remove(0, newlineIndex + 1);
if (!string.IsNullOrEmpty(line))
{
try
{
var msg = JsonSerializer.Deserialize<PipeMessage>(line);
if (msg != null && msg.Type != MessageType.Heartbeat)
{
MessageReceived?.Invoke(msg);
}
}
catch (JsonException ex)
{
Log($"JSON 解析失败:{ex.Message},数据:{line}");
}
}
}
}
}
catch (OperationCanceledException)
{
Log("读取循环已取消");
}
catch (IOException ex)
{
Log($"读取异常:{ex.Message}");
HandleDisconnect();
}
catch (ObjectDisposedException)
{
Log("管道对象已释放");
HandleDisconnect();
}
catch (Exception ex)
{
Log($"未知错误:{ex.Message}");
ErrorOccurred?.Invoke(ex.Message);
HandleDisconnect();
}
}
private void HandleDisconnect()
{
lock (_stateLock)
{
if (!_isConnected) return;
SetConnected(false);
StopHeartbeatSystem();
try
{
_activeStream?.Dispose();
}
catch (Exception ex)
{
Log($"关闭流失败:{ex.Message}");
}
_activeStream = null;
Log("连接已断开,准备重连...");
// 重连逻辑
Task.Run(async () =>
{
await Task.Delay(1000);
if (!_cts.Token.IsCancellationRequested)
{
await StartAsync();
}
});
}
}
private void StopHeartbeatSystem()
{
if (_heartbeatTimer != null)
{
_heartbeatTimer.Stop();
_heartbeatTimer.Dispose();
_heartbeatTimer = null;
}
}
public async Task SendAsync(PipeMessage message)
{
await _sendLock.WaitAsync(_cts.Token);
try
{
Stream currentStream;
lock (_stateLock)
{
currentStream = _activeStream;
if (currentStream == null || !currentStream.CanWrite)
{
throw new IOException("管道不可写");
}
}
string json = JsonSerializer.Serialize(message);
byte[] data = Encoding.UTF8.GetBytes(json + "\n");
await currentStream.WriteAsync(data, 0, data.Length, _cts.Token); // currentStream此时它是局部变量不会改变
await currentStream.FlushAsync(_cts.Token);
Trace.WriteLine($"send:{message.Timestamp.ToString()} {message.Content}");
}
catch (IOException ex) when (ex.Message.Contains("broken") || ex.Message.Contains("已关闭"))
{
Log($"发送失败(管道断开):{ex.Message}");
HandleDisconnect();
throw;
}
catch (ObjectDisposedException)
{
Log("发送失败(对象已释放)");
HandleDisconnect();
throw;
}
catch (Exception ex)
{
Log($"发送异常:{ex.Message}");
ErrorOccurred?.Invoke(ex.Message);
throw;
}
finally
{
_sendLock.Release();
}
}
private void SetConnected(bool connected)
{
if (_isConnected != connected)
{
_isConnected = connected;
ConnectionStatusChanged?.Invoke(connected);
}
}
private void Log(string message)
{
Trace.WriteLine($"[{DateTime.Now:HH:mm:ss}] {message}");
ErrorOccurred?.Invoke($"[LOG] {message}");
}
public void Dispose()
{
_cts?.Cancel();
StopHeartbeatSystem();
try
{
_serverStream?.Dispose();
}
catch { }
try
{
_clientStream?.Dispose();
}
catch { }
_sendLock?.Dispose();
_cts?.Dispose();
}
}
}