|
|
using System;
|
|
|
using System.Collections.Generic;
|
|
|
using System.Diagnostics;
|
|
|
using System.IO;
|
|
|
using System.IO.Pipes;
|
|
|
using System.Linq;
|
|
|
using System.Text;
|
|
|
using System.Text.Json;
|
|
|
using System.Threading;
|
|
|
using System.Threading.Tasks;
|
|
|
|
|
|
namespace IPCLib
|
|
|
{
|
|
|
public class PipeManager : IDisposable
|
|
|
{
|
|
|
private NamedPipeServerStream _serverStream;
|
|
|
private NamedPipeClientStream _clientStream;
|
|
|
private Stream _activeStream;
|
|
|
private bool _isServer;
|
|
|
private CancellationTokenSource _cts;
|
|
|
private bool _isConnected;
|
|
|
// private readonly object _lock = new object();
|
|
|
|
|
|
private readonly SemaphoreSlim _sendLock = new SemaphoreSlim(1, 1);
|
|
|
private readonly object _stateLock = new object();
|
|
|
|
|
|
// 心跳配置
|
|
|
private readonly int _heartbeatIntervalMs = 2000;
|
|
|
private readonly int _heartbeatTimeoutMs = 10000;
|
|
|
private System.Timers.Timer _heartbeatTimer;
|
|
|
private DateTime _lastReceivedTime;
|
|
|
private Task _heartbeatMonitorTask;
|
|
|
|
|
|
// 缓冲区大小(增大到 4KB)
|
|
|
private const int BufferSize = 2048;
|
|
|
|
|
|
public event Action<PipeMessage> MessageReceived;
|
|
|
public event Action<bool> ConnectionStatusChanged;
|
|
|
public event Action<string> ErrorOccurred;
|
|
|
public event Action HeartbeatTimeout;
|
|
|
|
|
|
public PipeManager(string pipeName, bool isServer)
|
|
|
{
|
|
|
PipeName = pipeName;
|
|
|
_isServer = isServer;
|
|
|
_cts = new CancellationTokenSource();
|
|
|
}
|
|
|
|
|
|
public string PipeName { get; private set; }
|
|
|
|
|
|
public async Task StartAsync()
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
if (_isServer)
|
|
|
{
|
|
|
// 允许最多 2 个实例,防止连接冲突
|
|
|
_serverStream = new NamedPipeServerStream(
|
|
|
PipeName,
|
|
|
PipeDirection.InOut,
|
|
|
2,
|
|
|
PipeTransmissionMode.Byte,
|
|
|
PipeOptions.Asynchronous
|
|
|
);
|
|
|
Log("等待客户端连接...");
|
|
|
await _serverStream.WaitForConnectionAsync(_cts.Token);
|
|
|
_activeStream = _serverStream;
|
|
|
Log("客户端已连接");
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
while (!_cts.Token.IsCancellationRequested)
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
_clientStream = new NamedPipeClientStream(
|
|
|
".",
|
|
|
PipeName,
|
|
|
PipeDirection.InOut,
|
|
|
PipeOptions.Asynchronous
|
|
|
);
|
|
|
Log("正在连接服务端...");
|
|
|
await _clientStream.ConnectAsync(5000, _cts.Token);
|
|
|
_activeStream = _clientStream;
|
|
|
Log("已连接服务端");
|
|
|
break;
|
|
|
}
|
|
|
catch (Exception ex) when (ex is TimeoutException || ex is IOException)
|
|
|
{
|
|
|
Log($"连接失败:{ex.Message},2 秒后重试...");
|
|
|
await Task.Delay(2000, _cts.Token);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
SetConnected(true);
|
|
|
_lastReceivedTime = DateTime.Now;
|
|
|
_ = ReadLoopAsync();
|
|
|
StartHeartbeatSystem();
|
|
|
}
|
|
|
catch (OperationCanceledException)
|
|
|
{
|
|
|
Log("操作已取消");
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
Log($"启动失败:{ex.Message}");
|
|
|
ErrorOccurred?.Invoke(ex.Message);
|
|
|
HandleDisconnect();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
Int64 g_count = 0;
|
|
|
private void StartHeartbeatSystem()
|
|
|
{
|
|
|
_heartbeatTimer = new System.Timers.Timer(_heartbeatIntervalMs);
|
|
|
_heartbeatTimer.Elapsed += async (s, e) => await SendHeartbeatAsync();
|
|
|
_heartbeatTimer.AutoReset = true;
|
|
|
_heartbeatTimer.Enabled = true;
|
|
|
Trace.WriteLine("start check");
|
|
|
_heartbeatMonitorTask = Task.Run(async () =>
|
|
|
{
|
|
|
while (!_cts.Token.IsCancellationRequested && _isConnected)
|
|
|
{
|
|
|
await Task.Delay(1000, _cts.Token);
|
|
|
g_count++;
|
|
|
Trace.WriteLine($"心跳检查 {_lastReceivedTime.ToString()} server={_isServer} count={g_count}");
|
|
|
if ((DateTime.Now - _lastReceivedTime).TotalMilliseconds > _heartbeatTimeoutMs)
|
|
|
{
|
|
|
Log($"心跳超时,判定连接断开-now={DateTime.Now.ToString()},last={_lastReceivedTime.ToString()}");
|
|
|
HeartbeatTimeout?.Invoke();
|
|
|
HandleDisconnect();
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
|
|
|
private async Task SendHeartbeatAsync()
|
|
|
{
|
|
|
if (!_isConnected || _activeStream == null) return;
|
|
|
try
|
|
|
{
|
|
|
var msg = new PipeMessage { Type = MessageType.Heartbeat, Timestamp = DateTime.Now };
|
|
|
await SendAsync(msg);
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
Log($"发送心跳失败:{ex.Message}");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private async Task ReadLoopAsync()
|
|
|
{
|
|
|
var buffer = new byte[BufferSize];
|
|
|
var sb = new StringBuilder();
|
|
|
|
|
|
try
|
|
|
{
|
|
|
while (!_cts.Token.IsCancellationRequested /*&& _activeStream != null*/)
|
|
|
{
|
|
|
Stream currentStream;
|
|
|
lock (_stateLock)
|
|
|
{
|
|
|
currentStream = _activeStream;
|
|
|
if (currentStream == null) break;
|
|
|
}
|
|
|
|
|
|
int bytesRead = 0;
|
|
|
try
|
|
|
{
|
|
|
bytesRead = await _activeStream.ReadAsync(buffer, 0, buffer.Length, _cts.Token);
|
|
|
}
|
|
|
catch (IOException ex) when (ex.Message.Contains("broken") || ex.Message.Contains("已关闭"))
|
|
|
{
|
|
|
Log($"管道断开:{ex.Message}");
|
|
|
throw;
|
|
|
}
|
|
|
|
|
|
if (bytesRead == 0)
|
|
|
{
|
|
|
Log("管道已关闭(读取 0 字节)");
|
|
|
throw new IOException("Pipe disconnected");
|
|
|
}
|
|
|
|
|
|
_lastReceivedTime = DateTime.Now;
|
|
|
Trace.WriteLine($"recv time {_lastReceivedTime.ToString()} server={_isServer}");
|
|
|
string text = Encoding.UTF8.GetString(buffer, 0, bytesRead);
|
|
|
sb.Append(text);
|
|
|
|
|
|
// 处理完整消息(以换行符为界)
|
|
|
while (sb.ToString().Contains("\n"))
|
|
|
{
|
|
|
int newlineIndex = sb.ToString().IndexOf("\n");
|
|
|
string line = sb.ToString().Substring(0, newlineIndex).Trim();
|
|
|
sb.Remove(0, newlineIndex + 1);
|
|
|
|
|
|
if (!string.IsNullOrEmpty(line))
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
var msg = JsonSerializer.Deserialize<PipeMessage>(line);
|
|
|
if (msg != null && msg.Type != MessageType.Heartbeat)
|
|
|
{
|
|
|
MessageReceived?.Invoke(msg);
|
|
|
}
|
|
|
}
|
|
|
catch (JsonException ex)
|
|
|
{
|
|
|
Log($"JSON 解析失败:{ex.Message},数据:{line}");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
catch (OperationCanceledException)
|
|
|
{
|
|
|
Log("读取循环已取消");
|
|
|
}
|
|
|
catch (IOException ex)
|
|
|
{
|
|
|
Log($"读取异常:{ex.Message}");
|
|
|
HandleDisconnect();
|
|
|
}
|
|
|
catch (ObjectDisposedException)
|
|
|
{
|
|
|
Log("管道对象已释放");
|
|
|
HandleDisconnect();
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
Log($"未知错误:{ex.Message}");
|
|
|
ErrorOccurred?.Invoke(ex.Message);
|
|
|
HandleDisconnect();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void HandleDisconnect()
|
|
|
{
|
|
|
lock (_stateLock)
|
|
|
{
|
|
|
if (!_isConnected) return;
|
|
|
|
|
|
SetConnected(false);
|
|
|
StopHeartbeatSystem();
|
|
|
|
|
|
try
|
|
|
{
|
|
|
_activeStream?.Dispose();
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
Log($"关闭流失败:{ex.Message}");
|
|
|
}
|
|
|
_activeStream = null;
|
|
|
|
|
|
Log("连接已断开,准备重连...");
|
|
|
|
|
|
// 重连逻辑
|
|
|
Task.Run(async () =>
|
|
|
{
|
|
|
await Task.Delay(1000);
|
|
|
if (!_cts.Token.IsCancellationRequested)
|
|
|
{
|
|
|
await StartAsync();
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void StopHeartbeatSystem()
|
|
|
{
|
|
|
if (_heartbeatTimer != null)
|
|
|
{
|
|
|
_heartbeatTimer.Stop();
|
|
|
_heartbeatTimer.Dispose();
|
|
|
_heartbeatTimer = null;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public async Task SendAsync(PipeMessage message)
|
|
|
{
|
|
|
await _sendLock.WaitAsync(_cts.Token);
|
|
|
try
|
|
|
{
|
|
|
|
|
|
Stream currentStream;
|
|
|
lock (_stateLock)
|
|
|
{
|
|
|
currentStream = _activeStream;
|
|
|
if (currentStream == null || !currentStream.CanWrite)
|
|
|
{
|
|
|
throw new IOException("管道不可写");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
string json = JsonSerializer.Serialize(message);
|
|
|
byte[] data = Encoding.UTF8.GetBytes(json + "\n");
|
|
|
|
|
|
await currentStream.WriteAsync(data, 0, data.Length, _cts.Token); // currentStream(此时它是局部变量,不会改变)
|
|
|
await currentStream.FlushAsync(_cts.Token);
|
|
|
|
|
|
Trace.WriteLine($"send:{message.Timestamp.ToString()} {message.Content}");
|
|
|
}
|
|
|
catch (IOException ex) when (ex.Message.Contains("broken") || ex.Message.Contains("已关闭"))
|
|
|
{
|
|
|
Log($"发送失败(管道断开):{ex.Message}");
|
|
|
HandleDisconnect();
|
|
|
throw;
|
|
|
}
|
|
|
catch (ObjectDisposedException)
|
|
|
{
|
|
|
Log("发送失败(对象已释放)");
|
|
|
HandleDisconnect();
|
|
|
throw;
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
Log($"发送异常:{ex.Message}");
|
|
|
ErrorOccurred?.Invoke(ex.Message);
|
|
|
throw;
|
|
|
}
|
|
|
|
|
|
finally
|
|
|
{
|
|
|
_sendLock.Release();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void SetConnected(bool connected)
|
|
|
{
|
|
|
if (_isConnected != connected)
|
|
|
{
|
|
|
_isConnected = connected;
|
|
|
ConnectionStatusChanged?.Invoke(connected);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void Log(string message)
|
|
|
{
|
|
|
Trace.WriteLine($"[{DateTime.Now:HH:mm:ss}] {message}");
|
|
|
ErrorOccurred?.Invoke($"[LOG] {message}");
|
|
|
}
|
|
|
|
|
|
public void Dispose()
|
|
|
{
|
|
|
_cts?.Cancel();
|
|
|
StopHeartbeatSystem();
|
|
|
|
|
|
try
|
|
|
{
|
|
|
_serverStream?.Dispose();
|
|
|
}
|
|
|
catch { }
|
|
|
|
|
|
try
|
|
|
{
|
|
|
_clientStream?.Dispose();
|
|
|
}
|
|
|
catch { }
|
|
|
|
|
|
_sendLock?.Dispose();
|
|
|
_cts?.Dispose();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|