You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kev/Drawer/IPCLib/PipeManager.cs

369 lines
12 KiB
C#

1 month ago
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Pipes;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
namespace IPCLib
{
public class PipeManager : IDisposable
{
private NamedPipeServerStream _serverStream;
private NamedPipeClientStream _clientStream;
private Stream _activeStream;
private bool _isServer;
private CancellationTokenSource _cts;
private bool _isConnected;
// private readonly object _lock = new object();
private readonly SemaphoreSlim _sendLock = new SemaphoreSlim(1, 1);
private readonly object _stateLock = new object();
// 心跳配置
private readonly int _heartbeatIntervalMs = 2000;
private readonly int _heartbeatTimeoutMs = 10000;
private System.Timers.Timer _heartbeatTimer;
private DateTime _lastReceivedTime;
private Task _heartbeatMonitorTask;
// 缓冲区大小(增大到 4KB
private const int BufferSize = 2048;
public event Action<PipeMessage> MessageReceived;
public event Action<bool> ConnectionStatusChanged;
public event Action<string> ErrorOccurred;
public event Action HeartbeatTimeout;
public PipeManager(string pipeName, bool isServer)
{
PipeName = pipeName;
_isServer = isServer;
_cts = new CancellationTokenSource();
}
public string PipeName { get; private set; }
public async Task StartAsync()
{
try
{
if (_isServer)
{
// 允许最多 2 个实例,防止连接冲突
_serverStream = new NamedPipeServerStream(
PipeName,
PipeDirection.InOut,
2,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous
);
Log("等待客户端连接...");
await _serverStream.WaitForConnectionAsync(_cts.Token);
_activeStream = _serverStream;
Log("客户端已连接");
}
else
{
while (!_cts.Token.IsCancellationRequested)
{
try
{
_clientStream = new NamedPipeClientStream(
".",
PipeName,
PipeDirection.InOut,
PipeOptions.Asynchronous
);
Log("正在连接服务端...");
await _clientStream.ConnectAsync(5000, _cts.Token);
_activeStream = _clientStream;
Log("已连接服务端");
break;
}
catch (Exception ex) when (ex is TimeoutException || ex is IOException)
{
Log($"连接失败:{ex.Message}2 秒后重试...");
await Task.Delay(2000, _cts.Token);
}
}
}
SetConnected(true);
_lastReceivedTime = DateTime.Now;
_ = ReadLoopAsync();
StartHeartbeatSystem();
}
catch (OperationCanceledException)
{
Log("操作已取消");
}
catch (Exception ex)
{
Log($"启动失败:{ex.Message}");
ErrorOccurred?.Invoke(ex.Message);
HandleDisconnect();
}
}
Int64 g_count = 0;
private void StartHeartbeatSystem()
{
_heartbeatTimer = new System.Timers.Timer(_heartbeatIntervalMs);
_heartbeatTimer.Elapsed += async (s, e) => await SendHeartbeatAsync();
_heartbeatTimer.AutoReset = true;
_heartbeatTimer.Enabled = true;
Trace.WriteLine("start check");
_heartbeatMonitorTask = Task.Run(async () =>
{
while (!_cts.Token.IsCancellationRequested && _isConnected)
{
await Task.Delay(1000, _cts.Token);
g_count++;
Trace.WriteLine($"心跳检查 {_lastReceivedTime.ToString()} server={_isServer} count={g_count}");
if ((DateTime.Now - _lastReceivedTime).TotalMilliseconds > _heartbeatTimeoutMs)
{
Log($"心跳超时,判定连接断开-now={DateTime.Now.ToString()},last={_lastReceivedTime.ToString()}");
HeartbeatTimeout?.Invoke();
HandleDisconnect();
break;
}
}
});
}
private async Task SendHeartbeatAsync()
{
if (!_isConnected || _activeStream == null) return;
try
{
var msg = new PipeMessage { Type = MessageType.Heartbeat, Timestamp = DateTime.Now };
await SendAsync(msg);
}
catch (Exception ex)
{
Log($"发送心跳失败:{ex.Message}");
}
}
private async Task ReadLoopAsync()
{
var buffer = new byte[BufferSize];
var sb = new StringBuilder();
try
{
while (!_cts.Token.IsCancellationRequested /*&& _activeStream != null*/)
{
Stream currentStream;
lock (_stateLock)
{
currentStream = _activeStream;
if (currentStream == null) break;
}
int bytesRead = 0;
try
{
bytesRead = await _activeStream.ReadAsync(buffer, 0, buffer.Length, _cts.Token);
}
catch (IOException ex) when (ex.Message.Contains("broken") || ex.Message.Contains("已关闭"))
{
Log($"管道断开:{ex.Message}");
throw;
}
if (bytesRead == 0)
{
Log("管道已关闭(读取 0 字节)");
throw new IOException("Pipe disconnected");
}
_lastReceivedTime = DateTime.Now;
Trace.WriteLine($"recv time {_lastReceivedTime.ToString()} server={_isServer}");
string text = Encoding.UTF8.GetString(buffer, 0, bytesRead);
sb.Append(text);
// 处理完整消息(以换行符为界)
while (sb.ToString().Contains("\n"))
{
int newlineIndex = sb.ToString().IndexOf("\n");
string line = sb.ToString().Substring(0, newlineIndex).Trim();
sb.Remove(0, newlineIndex + 1);
if (!string.IsNullOrEmpty(line))
{
try
{
var msg = JsonSerializer.Deserialize<PipeMessage>(line);
if (msg != null && msg.Type != MessageType.Heartbeat)
{
MessageReceived?.Invoke(msg);
}
}
catch (JsonException ex)
{
Log($"JSON 解析失败:{ex.Message},数据:{line}");
}
}
}
}
}
catch (OperationCanceledException)
{
Log("读取循环已取消");
}
catch (IOException ex)
{
Log($"读取异常:{ex.Message}");
HandleDisconnect();
}
catch (ObjectDisposedException)
{
Log("管道对象已释放");
HandleDisconnect();
}
catch (Exception ex)
{
Log($"未知错误:{ex.Message}");
ErrorOccurred?.Invoke(ex.Message);
HandleDisconnect();
}
}
private void HandleDisconnect()
{
lock (_stateLock)
{
if (!_isConnected) return;
SetConnected(false);
StopHeartbeatSystem();
try
{
_activeStream?.Dispose();
}
catch (Exception ex)
{
Log($"关闭流失败:{ex.Message}");
}
_activeStream = null;
Log("连接已断开,准备重连...");
// 重连逻辑
Task.Run(async () =>
{
await Task.Delay(1000);
if (!_cts.Token.IsCancellationRequested)
{
await StartAsync();
}
});
}
}
private void StopHeartbeatSystem()
{
if (_heartbeatTimer != null)
{
_heartbeatTimer.Stop();
_heartbeatTimer.Dispose();
_heartbeatTimer = null;
}
}
public async Task SendAsync(PipeMessage message)
{
await _sendLock.WaitAsync(_cts.Token);
try
{
Stream currentStream;
lock (_stateLock)
{
currentStream = _activeStream;
if (currentStream == null || !currentStream.CanWrite)
{
throw new IOException("管道不可写");
}
}
string json = JsonSerializer.Serialize(message);
byte[] data = Encoding.UTF8.GetBytes(json + "\n");
await currentStream.WriteAsync(data, 0, data.Length, _cts.Token); // currentStream此时它是局部变量不会改变
await currentStream.FlushAsync(_cts.Token);
Trace.WriteLine($"send:{message.Timestamp.ToString()} {message.Content}");
}
catch (IOException ex) when (ex.Message.Contains("broken") || ex.Message.Contains("已关闭"))
{
Log($"发送失败(管道断开):{ex.Message}");
HandleDisconnect();
throw;
}
catch (ObjectDisposedException)
{
Log("发送失败(对象已释放)");
HandleDisconnect();
throw;
}
catch (Exception ex)
{
Log($"发送异常:{ex.Message}");
ErrorOccurred?.Invoke(ex.Message);
throw;
}
finally
{
_sendLock.Release();
}
}
private void SetConnected(bool connected)
{
if (_isConnected != connected)
{
_isConnected = connected;
ConnectionStatusChanged?.Invoke(connected);
}
}
private void Log(string message)
{
Trace.WriteLine($"[{DateTime.Now:HH:mm:ss}] {message}");
ErrorOccurred?.Invoke($"[LOG] {message}");
}
public void Dispose()
{
_cts?.Cancel();
StopHeartbeatSystem();
try
{
_serverStream?.Dispose();
}
catch { }
try
{
_clientStream?.Dispose();
}
catch { }
_sendLock?.Dispose();
_cts?.Dispose();
}
}
}