using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.IO.Pipes; using System.Linq; using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; namespace IPCLib { public class PipeManager : IDisposable { private NamedPipeServerStream _serverStream; private NamedPipeClientStream _clientStream; private Stream _activeStream; private bool _isServer; private CancellationTokenSource _cts; private bool _isConnected; // private readonly object _lock = new object(); private readonly SemaphoreSlim _sendLock = new SemaphoreSlim(1, 1); private readonly object _stateLock = new object(); // 心跳配置 private readonly int _heartbeatIntervalMs = 2000; private readonly int _heartbeatTimeoutMs = 10000; private System.Timers.Timer _heartbeatTimer; private DateTime _lastReceivedTime; private Task _heartbeatMonitorTask; // 缓冲区大小(增大到 4KB) private const int BufferSize = 2048; public event Action MessageReceived; public event Action ConnectionStatusChanged; public event Action ErrorOccurred; public event Action HeartbeatTimeout; public PipeManager(string pipeName, bool isServer) { PipeName = pipeName; _isServer = isServer; _cts = new CancellationTokenSource(); } public string PipeName { get; private set; } public async Task StartAsync() { try { if (_isServer) { // 允许最多 2 个实例,防止连接冲突 _serverStream = new NamedPipeServerStream( PipeName, PipeDirection.InOut, 2, PipeTransmissionMode.Byte, PipeOptions.Asynchronous ); Log("等待客户端连接..."); await _serverStream.WaitForConnectionAsync(_cts.Token); _activeStream = _serverStream; Log("客户端已连接"); } else { while (!_cts.Token.IsCancellationRequested) { try { _clientStream = new NamedPipeClientStream( ".", PipeName, PipeDirection.InOut, PipeOptions.Asynchronous ); Log("正在连接服务端..."); await _clientStream.ConnectAsync(5000, _cts.Token); _activeStream = _clientStream; Log("已连接服务端"); break; } catch (Exception ex) when (ex is TimeoutException || ex is IOException) { Log($"连接失败:{ex.Message},2 秒后重试..."); await Task.Delay(2000, _cts.Token); } } } SetConnected(true); _lastReceivedTime = DateTime.Now; _ = ReadLoopAsync(); StartHeartbeatSystem(); } catch (OperationCanceledException) { Log("操作已取消"); } catch (Exception ex) { Log($"启动失败:{ex.Message}"); ErrorOccurred?.Invoke(ex.Message); HandleDisconnect(); } } Int64 g_count = 0; private void StartHeartbeatSystem() { _heartbeatTimer = new System.Timers.Timer(_heartbeatIntervalMs); _heartbeatTimer.Elapsed += async (s, e) => await SendHeartbeatAsync(); _heartbeatTimer.AutoReset = true; _heartbeatTimer.Enabled = true; Trace.WriteLine("start check"); _heartbeatMonitorTask = Task.Run(async () => { while (!_cts.Token.IsCancellationRequested && _isConnected) { await Task.Delay(1000, _cts.Token); g_count++; Trace.WriteLine($"心跳检查 {_lastReceivedTime.ToString()} server={_isServer} count={g_count}"); if ((DateTime.Now - _lastReceivedTime).TotalMilliseconds > _heartbeatTimeoutMs) { Log($"心跳超时,判定连接断开-now={DateTime.Now.ToString()},last={_lastReceivedTime.ToString()}"); HeartbeatTimeout?.Invoke(); HandleDisconnect(); break; } } }); } private async Task SendHeartbeatAsync() { if (!_isConnected || _activeStream == null) return; try { var msg = new PipeMessage { Type = MessageType.Heartbeat, Timestamp = DateTime.Now }; await SendAsync(msg); } catch (Exception ex) { Log($"发送心跳失败:{ex.Message}"); } } private async Task ReadLoopAsync() { var buffer = new byte[BufferSize]; var sb = new StringBuilder(); try { while (!_cts.Token.IsCancellationRequested /*&& _activeStream != null*/) { Stream currentStream; lock (_stateLock) { currentStream = _activeStream; if (currentStream == null) break; } int bytesRead = 0; try { bytesRead = await _activeStream.ReadAsync(buffer, 0, buffer.Length, _cts.Token); } catch (IOException ex) when (ex.Message.Contains("broken") || ex.Message.Contains("已关闭")) { Log($"管道断开:{ex.Message}"); throw; } if (bytesRead == 0) { Log("管道已关闭(读取 0 字节)"); throw new IOException("Pipe disconnected"); } _lastReceivedTime = DateTime.Now; Trace.WriteLine($"recv time {_lastReceivedTime.ToString()} server={_isServer}"); string text = Encoding.UTF8.GetString(buffer, 0, bytesRead); sb.Append(text); // 处理完整消息(以换行符为界) while (sb.ToString().Contains("\n")) { int newlineIndex = sb.ToString().IndexOf("\n"); string line = sb.ToString().Substring(0, newlineIndex).Trim(); sb.Remove(0, newlineIndex + 1); if (!string.IsNullOrEmpty(line)) { try { var msg = JsonSerializer.Deserialize(line); if (msg != null && msg.Type != MessageType.Heartbeat) { MessageReceived?.Invoke(msg); } } catch (JsonException ex) { Log($"JSON 解析失败:{ex.Message},数据:{line}"); } } } } } catch (OperationCanceledException) { Log("读取循环已取消"); } catch (IOException ex) { Log($"读取异常:{ex.Message}"); HandleDisconnect(); } catch (ObjectDisposedException) { Log("管道对象已释放"); HandleDisconnect(); } catch (Exception ex) { Log($"未知错误:{ex.Message}"); ErrorOccurred?.Invoke(ex.Message); HandleDisconnect(); } } private void HandleDisconnect() { lock (_stateLock) { if (!_isConnected) return; SetConnected(false); StopHeartbeatSystem(); try { _activeStream?.Dispose(); } catch (Exception ex) { Log($"关闭流失败:{ex.Message}"); } _activeStream = null; Log("连接已断开,准备重连..."); // 重连逻辑 Task.Run(async () => { await Task.Delay(1000); if (!_cts.Token.IsCancellationRequested) { await StartAsync(); } }); } } private void StopHeartbeatSystem() { if (_heartbeatTimer != null) { _heartbeatTimer.Stop(); _heartbeatTimer.Dispose(); _heartbeatTimer = null; } } public async Task SendAsync(PipeMessage message) { await _sendLock.WaitAsync(_cts.Token); try { Stream currentStream; lock (_stateLock) { currentStream = _activeStream; if (currentStream == null || !currentStream.CanWrite) { throw new IOException("管道不可写"); } } string json = JsonSerializer.Serialize(message); byte[] data = Encoding.UTF8.GetBytes(json + "\n"); await currentStream.WriteAsync(data, 0, data.Length, _cts.Token); // currentStream(此时它是局部变量,不会改变) await currentStream.FlushAsync(_cts.Token); Trace.WriteLine($"send:{message.Timestamp.ToString()} {message.Content}"); } catch (IOException ex) when (ex.Message.Contains("broken") || ex.Message.Contains("已关闭")) { Log($"发送失败(管道断开):{ex.Message}"); HandleDisconnect(); throw; } catch (ObjectDisposedException) { Log("发送失败(对象已释放)"); HandleDisconnect(); throw; } catch (Exception ex) { Log($"发送异常:{ex.Message}"); ErrorOccurred?.Invoke(ex.Message); throw; } finally { _sendLock.Release(); } } private void SetConnected(bool connected) { if (_isConnected != connected) { _isConnected = connected; ConnectionStatusChanged?.Invoke(connected); } } private void Log(string message) { Trace.WriteLine($"[{DateTime.Now:HH:mm:ss}] {message}"); ErrorOccurred?.Invoke($"[LOG] {message}"); } public void Dispose() { _cts?.Cancel(); StopHeartbeatSystem(); try { _serverStream?.Dispose(); } catch { } try { _clientStream?.Dispose(); } catch { } _sendLock?.Dispose(); _cts?.Dispose(); } } }