using System; using System.Collections.Generic; using System.Data; using System.Diagnostics; using System.IO; using System.IO.MemoryMappedFiles; using System.Linq; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace IPCLib { // MmfCommunication.cs // MmfCommunication.cs - 完整修复版(包含 Disconnected 事件触发) public class MmfCommunication : IDisposable { const string kepId = "KEP"; private MemoryMappedFile _mmf; private MemoryMappedViewAccessor _accessor; // ✅ 修复:使用两个独立的 Mutex private Mutex _heartbeatMutex; // 心跳专用锁 private Mutex _dataMutex; // 数据专用锁 private CancellationTokenSource _cts; private Task _heartbeatTask; private Task _receiveTask; private readonly string _mapName; private readonly bool _isServer; private readonly string _identity; private int _heartbeatInterval = 1000; private int _heartbeatTimeout = 20000; private bool _isConnected = false; private bool _isDisconnected = false; private bool _isPeerFrozen = false; private long _sessionId; private long _sequenceId = 0; private ConnectionStatus _currentStatus = ConnectionStatus.Connecting; public event Action DataReceived; public event Action Connected; public event Action Disconnected; public event Action PeerFrozen; public event Action PeerRecovered; public event Action ErrorOccurred; public event Action StatusChanged; public event Action ConnectionRecovered; public bool IsConnected => _isConnected; public bool IsDisconnected => _isDisconnected; public long SessionId => _sessionId; public MmfCommunication(string mapName, bool isServer, string identity) { _mapName = mapName; _isServer = isServer; _identity = identity; _sessionId = DateTime.Now.Ticks; } public async Task StartAsync() { try { if (_isServer) { CleanupExistingMemory(); _mmf = MemoryMappedFile.CreateNew(_mapName, MemoryLayout.TotalSize); InitializeMemory(); Log($"共享内存已创建 (SessionId: {_sessionId})"); } else { for (int i = 0; i < 50; i++) { try { _mmf = MemoryMappedFile.OpenExisting(_mapName); break; } catch (FileNotFoundException) { await Task.Delay(100); } } if (_mmf == null) throw new TimeoutException("等待共享内存创建超时"); Log($"共享内存已打开 (SessionId: {_sessionId})"); } _accessor = _mmf.CreateViewAccessor(); // ✅ 修复:创建两个独立的 Mutex _heartbeatMutex = new Mutex(false, _mapName + "_HeartbeatMutex"); _dataMutex = new Mutex(false, _mapName + "_DataMutex"); await RegisterSessionIdAsync(); _isConnected = true; _isDisconnected = false; _cts = new CancellationTokenSource(); _heartbeatTask = HeartbeatLoopAsync(_cts.Token); _receiveTask = ReceiveLoopAsync(_cts.Token); Connected?.Invoke(); Log("通讯模块已启动"); } catch (Exception ex) { _isConnected = false; _isDisconnected = true; ErrorOccurred?.Invoke($"启动失败:{ex.Message}"); throw; } } /// /// ✅ 心跳循环(使用独立的心跳锁) /// private async Task HeartbeatLoopAsync(CancellationToken ct) { Log($"❤️ 心跳任务启动"); while (!ct.IsCancellationRequested && _isConnected) { try { await Task.Delay(_heartbeatInterval, ct); // ✅ 使用心跳专用锁,不与数据操作竞争 if (_heartbeatMutex?.WaitOne(500) == true) { try { SendHeartbeatInternal(); } finally { try { _heartbeatMutex?.ReleaseMutex(); } catch { } } } else { Log($"⚠️ 心跳锁超时(可忽略)"); } CheckHeartbeatTimeout(); } catch (OperationCanceledException) { break; } catch (Exception ex) { ErrorOccurred?.Invoke($"心跳错误:{ex.Message}"); } } Log($"❤️ 心跳任务退出"); } /// /// ✅ 内部心跳发送(不获取锁,调用者负责) /// private void SendHeartbeatInternal() { if (_accessor == null) return; try { var header = ReadHeader(); long now = DateTime.Now.Ticks; if (_identity == kepId) { header.A_LastHeartbeat = now; header.A_SequenceId++; } else { header.B_LastHeartbeat = now; header.B_SequenceId++; } header.ConnectionStatus = (int)_currentStatus; WriteHeader(header); } catch (Exception ex) { ErrorOccurred?.Invoke($"发送心跳失败:{ex.Message}"); } } /// /// ✅ 检查心跳超时(使用心跳锁) /// private void CheckHeartbeatTimeout() { if (_accessor == null) return; try { if (_heartbeatMutex?.WaitOne(500) != true) return; try { var header = ReadHeader(); long peerHeartbeatTicks = _identity == kepId ? header.B_LastHeartbeat : header.A_LastHeartbeat; if (peerHeartbeatTicks == 0) return; DateTime peerHeartbeat = new DateTime(peerHeartbeatTicks); var timeSincePeer = DateTime.Now - peerHeartbeat; double seconds = timeSincePeer.TotalSeconds; if (seconds > _heartbeatTimeout * 3) { if (_currentStatus != ConnectionStatus.Disconnected) { Log($"❌ 硬断开:对端 {seconds:F0} 秒无心跳"); TriggerHardDisconnect($"硬断开:对端 {seconds:F0} 秒无心跳"); } } else if (seconds > _heartbeatTimeout * 2) { if (_currentStatus != ConnectionStatus.SoftDisconnected) { _currentStatus = ConnectionStatus.SoftDisconnected; Log($"🔄 软断开:对端 {seconds:F0} 秒无心跳(可恢复)"); StatusChanged?.Invoke($"软断开:{seconds:F0}秒"); } } else if (seconds > _heartbeatTimeout) { if (_currentStatus == ConnectionStatus.Connected) { _currentStatus = ConnectionStatus.Warning; Log($"⚠️ 心跳警告:对端 {seconds:F0} 秒无心跳"); StatusChanged?.Invoke($"警告:{seconds:F0}秒"); } } else { if (_currentStatus != ConnectionStatus.Connected) { Log($"✅ 心跳恢复:对端恢复正常 ({seconds:F1}秒)"); var previousStatus = _currentStatus; _currentStatus = ConnectionStatus.Connected; if (previousStatus == ConnectionStatus.SoftDisconnected) { ConnectionRecovered?.Invoke(); StatusChanged?.Invoke("连接已恢复"); } header.ConnectionStatus = (int)ConnectionStatus.Connected; WriteHeader(header); } } } finally { try { _heartbeatMutex?.ReleaseMutex(); } catch { } } } catch (Exception ex) { ErrorOccurred?.Invoke($"心跳检查失败:{ex.Message}"); } } /// /// ✅ 接收数据循环(使用独立的数据锁) /// private async Task ReceiveLoopAsync(CancellationToken ct) { Log($"📥 接收任务启动"); int lockSuccessCount = 0; int lockTimeoutCount = 0; DateTime lastLogTime = DateTime.Now; while (!ct.IsCancellationRequested && _isConnected) { try { await Task.Delay(100, ct); if (_accessor == null || _dataMutex == null) continue; // ✅ 使用数据专用锁,不与心跳竞争 if (_dataMutex.WaitOne(1000)) { lockSuccessCount++; try { var header = ReadHeader(); bool hasData = _identity == kepId ? header.B_HasData == 1 : header.A_HasData == 1; if (hasData) { int offset = _identity == kepId ? MemoryLayout.BToAOffset : MemoryLayout.AToBOffset; int dataLength = _identity == kepId ? header.B_DataLength : header.A_DataLength; if (dataLength > 0 && dataLength <= 1000) { string data = ReadPacketData(offset, dataLength); if (_identity == kepId) header.B_HasData = 0; else header.A_HasData = 0; WriteHeader(header); DataReceived?.Invoke(data); Log($"📥 收到数据:{data}"); } } } catch (Exception ex) { Log($"❌ 读取数据异常:{ex.Message}"); } finally { try { _dataMutex?.ReleaseMutex(); } catch { } } } else { lockTimeoutCount++; if ((DateTime.Now - lastLogTime).TotalSeconds > 2) { Log($"⚠️ 数据锁超时 (成功:{lockSuccessCount}, 超时:{lockTimeoutCount})"); lastLogTime = DateTime.Now; } await Task.Delay(50, ct); } } catch (OperationCanceledException) { break; } catch (IOException ex) { Log($"IO 异常:{ex.Message}"); TriggerDisconnect($"共享内存访问错误:{ex.Message}"); break; } catch (Exception ex) { ErrorOccurred?.Invoke($"接收错误:{ex.Message}"); } } Log($"📊 接收任务结束 (成功:{lockSuccessCount}, 超时:{lockTimeoutCount})"); } private void TriggerDisconnect(string reason) { if (_isDisconnected) return; // 避免重复触发 _isDisconnected = true; _isConnected = false; _isPeerFrozen = false; Log($"触发断开事件:{reason}"); Disconnected?.Invoke(reason); // 更新共享内存状态 //UpdateConnectionStatus(0); Task.Run(() => UpdateConnectionStatus(0)); } /// /// ✅ 发送数据(使用数据锁) /// public unsafe void SendData(string data) { if (_accessor == null || _dataMutex == null || !_isConnected) throw new InvalidOperationException("未连接"); if (!_dataMutex.WaitOne(2000)) { ErrorOccurred?.Invoke("获取数据锁超时(2 秒)"); TriggerDisconnect("获取数据锁超时"); return; } try { var header = ReadHeader(); bool peerRead = _identity == kepId ? header.B_HasData == 0 : header.A_HasData == 0; if (!peerRead) { ErrorOccurred?.Invoke("对端未读取上一条数据"); return; } int offset = _identity == kepId ? MemoryLayout.AToBOffset : MemoryLayout.BToAOffset; DataPacket packet = new DataPacket { Timestamp = DateTime.Now.Ticks, DataType = 2, DataLength = Math.Min(data.Length, 1000), SequenceId = ++_sequenceId }; SharedMemoryHelper.WriteToFixedBuffer(packet.Data, data, 1000); WritePacket(offset, packet); if (_identity == kepId) { header.A_HasData = 1; header.A_DataLength = packet.DataLength; } else { header.B_HasData = 1; header.B_DataLength = packet.DataLength; } WriteHeader(header); Log($"📤 发送数据:{data}"); } catch (IOException ex) { TriggerDisconnect($"发送数据失败:{ex.Message}"); } catch (Exception ex) { ErrorOccurred?.Invoke($"发送失败:{ex.Message}"); } finally { try { _dataMutex?.ReleaseMutex(); } catch { } } } private void TriggerHardDisconnect(string reason) { if (_currentStatus == ConnectionStatus.Disconnected) return; _currentStatus = ConnectionStatus.Disconnected; _isConnected = false; _isDisconnected = true; _isPeerFrozen = false; Log($"❌ 硬断开:{reason}"); Disconnected?.Invoke(reason); UpdateConnectionStatus(0); } private void UpdateConnectionStatus(int status) { if (_accessor == null || _heartbeatMutex == null) return; try { if (_heartbeatMutex.WaitOne(500)) { try { var header = ReadHeader(); header.ConnectionStatus = status; WriteHeader(header); } finally { try { _heartbeatMutex?.ReleaseMutex(); } catch { } } } } catch { } } private unsafe string ReadPacketData(int offset, int dataLength) { DataPacket packet = new DataPacket(); if(_accessor != null) _accessor.Read(offset, out packet); return SharedMemoryHelper.ReadFromFixedBuffer(packet.Data, dataLength); } private SharedMemoryHeader ReadHeader() { SharedMemoryHeader header = new SharedMemoryHeader(); if (_accessor != null) _accessor.Read(0, out header); return header; } private void WriteHeader(SharedMemoryHeader header) { if (_accessor != null) _accessor.Write(0, ref header); } private unsafe void WritePacket(int offset, DataPacket packet) { if (_accessor != null) _accessor.Write(offset, ref packet); } private void CleanupExistingMemory() { try { var existing = MemoryMappedFile.OpenExisting(_mapName); Log("⚠️ 发现残留共享内存,已清理"); existing.Dispose(); GC.Collect(); GC.WaitForPendingFinalizers(); Thread.Sleep(100); } catch (FileNotFoundException) { } } private void InitializeMemory() { if (_accessor == null) return; var header = new SharedMemoryHeader { IsLocked = 0, A_LastHeartbeat = DateTime.Now.Ticks, B_LastHeartbeat = 0, A_SessionId = _sessionId, B_SessionId = 0, A_HasData = 0, B_HasData = 0, ConnectionStatus = 1, Reserved1 = 0, Reserved2 = 0, Reserved3 = 0, Reserved4 = 0 }; _accessor.Write(0, ref header); } private async Task RegisterSessionIdAsync() { if (_accessor == null || _heartbeatMutex == null) return; try { if (_heartbeatMutex.WaitOne(1000)) { try { var header = ReadHeader(); if (_identity == kepId) { header.A_SessionId = _sessionId; header.A_LastHeartbeat = DateTime.Now.Ticks; } else { header.B_SessionId = _sessionId; header.B_LastHeartbeat = DateTime.Now.Ticks; } WriteHeader(header); } finally { try { _heartbeatMutex?.ReleaseMutex(); } catch { } } } } catch (Exception ex) { ErrorOccurred?.Invoke($"注册会话 ID 失败:{ex.Message}"); } } private void Log(string msg) => System.Diagnostics.Debug.WriteLine($"[{_identity}] {msg}"); public void Dispose() { if (!_isDisconnected) { TriggerHardDisconnect("本地关闭"); } _cts?.Cancel(); _heartbeatTask?.Wait(1000); _receiveTask?.Wait(1000); _accessor?.Dispose(); _mmf?.Dispose(); _heartbeatMutex?.Dispose(); _dataMutex?.Dispose(); _cts?.Dispose(); } } }