You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kev/Drawer/IPCLib/MemoryCommunication.cs

608 lines
20 KiB
C#

1 month ago
using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.IO;
using System.IO.MemoryMappedFiles;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace IPCLib
{
// MmfCommunication.cs
// MmfCommunication.cs - 完整修复版(包含 Disconnected 事件触发)
public class MmfCommunication : IDisposable
{
const string kepId = "KEP";
private MemoryMappedFile _mmf;
private MemoryMappedViewAccessor _accessor;
// ✅ 修复:使用两个独立的 Mutex
private Mutex _heartbeatMutex; // 心跳专用锁
private Mutex _dataMutex; // 数据专用锁
private CancellationTokenSource _cts;
private Task _heartbeatTask;
private Task _receiveTask;
private readonly string _mapName;
private readonly bool _isServer;
private readonly string _identity;
private int _heartbeatInterval = 1000;
private int _heartbeatTimeout = 20000;
private bool _isConnected = false;
private bool _isDisconnected = false;
private bool _isPeerFrozen = false;
private long _sessionId;
private long _sequenceId = 0;
private ConnectionStatus _currentStatus = ConnectionStatus.Connecting;
public event Action<string> DataReceived;
public event Action Connected;
public event Action<string> Disconnected;
public event Action<string> PeerFrozen;
public event Action<string> PeerRecovered;
public event Action<string> ErrorOccurred;
public event Action<string> StatusChanged;
public event Action ConnectionRecovered;
public bool IsConnected => _isConnected;
public bool IsDisconnected => _isDisconnected;
public long SessionId => _sessionId;
public MmfCommunication(string mapName, bool isServer, string identity)
{
_mapName = mapName;
_isServer = isServer;
_identity = identity;
_sessionId = DateTime.Now.Ticks;
}
public async Task StartAsync()
{
try
{
if (_isServer)
{
CleanupExistingMemory();
_mmf = MemoryMappedFile.CreateNew(_mapName, MemoryLayout.TotalSize);
InitializeMemory();
Log($"共享内存已创建 (SessionId: {_sessionId})");
}
else
{
for (int i = 0; i < 50; i++)
{
try
{
_mmf = MemoryMappedFile.OpenExisting(_mapName);
break;
}
catch (FileNotFoundException)
{
await Task.Delay(100);
}
}
if (_mmf == null)
throw new TimeoutException("等待共享内存创建超时");
Log($"共享内存已打开 (SessionId: {_sessionId})");
}
_accessor = _mmf.CreateViewAccessor();
// ✅ 修复:创建两个独立的 Mutex
_heartbeatMutex = new Mutex(false, _mapName + "_HeartbeatMutex");
_dataMutex = new Mutex(false, _mapName + "_DataMutex");
await RegisterSessionIdAsync();
_isConnected = true;
_isDisconnected = false;
_cts = new CancellationTokenSource();
_heartbeatTask = HeartbeatLoopAsync(_cts.Token);
_receiveTask = ReceiveLoopAsync(_cts.Token);
Connected?.Invoke();
Log("通讯模块已启动");
}
catch (Exception ex)
{
_isConnected = false;
_isDisconnected = true;
ErrorOccurred?.Invoke($"启动失败:{ex.Message}");
throw;
}
}
/// <summary>
/// ✅ 心跳循环(使用独立的心跳锁)
/// </summary>
private async Task HeartbeatLoopAsync(CancellationToken ct)
{
Log($"❤️ 心跳任务启动");
while (!ct.IsCancellationRequested && _isConnected)
{
try
{
await Task.Delay(_heartbeatInterval, ct);
// ✅ 使用心跳专用锁,不与数据操作竞争
if (_heartbeatMutex?.WaitOne(500) == true)
{
try
{
SendHeartbeatInternal();
}
finally
{
try { _heartbeatMutex?.ReleaseMutex(); } catch { }
}
}
else
{
Log($"⚠️ 心跳锁超时(可忽略)");
}
CheckHeartbeatTimeout();
}
catch (OperationCanceledException) { break; }
catch (Exception ex)
{
ErrorOccurred?.Invoke($"心跳错误:{ex.Message}");
}
}
Log($"❤️ 心跳任务退出");
}
/// <summary>
/// ✅ 内部心跳发送(不获取锁,调用者负责)
/// </summary>
private void SendHeartbeatInternal()
{
if (_accessor == null) return;
try
{
var header = ReadHeader();
long now = DateTime.Now.Ticks;
if (_identity == kepId)
{
header.A_LastHeartbeat = now;
header.A_SequenceId++;
}
else
{
header.B_LastHeartbeat = now;
header.B_SequenceId++;
}
header.ConnectionStatus = (int)_currentStatus;
WriteHeader(header);
}
catch (Exception ex)
{
ErrorOccurred?.Invoke($"发送心跳失败:{ex.Message}");
}
}
/// <summary>
/// ✅ 检查心跳超时(使用心跳锁)
/// </summary>
private void CheckHeartbeatTimeout()
{
if (_accessor == null) return;
try
{
if (_heartbeatMutex?.WaitOne(500) != true) return;
try
{
var header = ReadHeader();
long peerHeartbeatTicks = _identity == kepId ? header.B_LastHeartbeat : header.A_LastHeartbeat;
if (peerHeartbeatTicks == 0) return;
DateTime peerHeartbeat = new DateTime(peerHeartbeatTicks);
var timeSincePeer = DateTime.Now - peerHeartbeat;
double seconds = timeSincePeer.TotalSeconds;
if (seconds > _heartbeatTimeout * 3)
{
if (_currentStatus != ConnectionStatus.Disconnected)
{
Log($"❌ 硬断开:对端 {seconds:F0} 秒无心跳");
TriggerHardDisconnect($"硬断开:对端 {seconds:F0} 秒无心跳");
}
}
else if (seconds > _heartbeatTimeout * 2)
{
if (_currentStatus != ConnectionStatus.SoftDisconnected)
{
_currentStatus = ConnectionStatus.SoftDisconnected;
Log($"🔄 软断开:对端 {seconds:F0} 秒无心跳(可恢复)");
StatusChanged?.Invoke($"软断开:{seconds:F0}秒");
}
}
else if (seconds > _heartbeatTimeout)
{
if (_currentStatus == ConnectionStatus.Connected)
{
_currentStatus = ConnectionStatus.Warning;
Log($"⚠️ 心跳警告:对端 {seconds:F0} 秒无心跳");
StatusChanged?.Invoke($"警告:{seconds:F0}秒");
}
}
else
{
if (_currentStatus != ConnectionStatus.Connected)
{
Log($"✅ 心跳恢复:对端恢复正常 ({seconds:F1}秒)");
var previousStatus = _currentStatus;
_currentStatus = ConnectionStatus.Connected;
if (previousStatus == ConnectionStatus.SoftDisconnected)
{
ConnectionRecovered?.Invoke();
StatusChanged?.Invoke("连接已恢复");
}
header.ConnectionStatus = (int)ConnectionStatus.Connected;
WriteHeader(header);
}
}
}
finally
{
try { _heartbeatMutex?.ReleaseMutex(); } catch { }
}
}
catch (Exception ex)
{
ErrorOccurred?.Invoke($"心跳检查失败:{ex.Message}");
}
}
/// <summary>
/// ✅ 接收数据循环(使用独立的数据锁)
/// </summary>
private async Task ReceiveLoopAsync(CancellationToken ct)
{
Log($"📥 接收任务启动");
int lockSuccessCount = 0;
int lockTimeoutCount = 0;
DateTime lastLogTime = DateTime.Now;
while (!ct.IsCancellationRequested && _isConnected)
{
try
{
await Task.Delay(100, ct);
if (_accessor == null || _dataMutex == null) continue;
// ✅ 使用数据专用锁,不与心跳竞争
if (_dataMutex.WaitOne(1000))
{
lockSuccessCount++;
try
{
var header = ReadHeader();
bool hasData = _identity == kepId ? header.B_HasData == 1 : header.A_HasData == 1;
if (hasData)
{
int offset = _identity == kepId ? MemoryLayout.BToAOffset : MemoryLayout.AToBOffset;
int dataLength = _identity == kepId ? header.B_DataLength : header.A_DataLength;
if (dataLength > 0 && dataLength <= 1000)
{
string data = ReadPacketData(offset, dataLength);
if (_identity == kepId)
header.B_HasData = 0;
else
header.A_HasData = 0;
WriteHeader(header);
DataReceived?.Invoke(data);
Log($"📥 收到数据:{data}");
}
}
}
catch (Exception ex)
{
Log($"❌ 读取数据异常:{ex.Message}");
}
finally
{
try { _dataMutex?.ReleaseMutex(); } catch { }
}
}
else
{
lockTimeoutCount++;
if ((DateTime.Now - lastLogTime).TotalSeconds > 2)
{
Log($"⚠️ 数据锁超时 (成功:{lockSuccessCount}, 超时:{lockTimeoutCount})");
lastLogTime = DateTime.Now;
}
await Task.Delay(50, ct);
}
}
catch (OperationCanceledException) { break; }
catch (IOException ex)
{
Log($"IO 异常:{ex.Message}");
TriggerDisconnect($"共享内存访问错误:{ex.Message}");
break;
}
catch (Exception ex)
{
ErrorOccurred?.Invoke($"接收错误:{ex.Message}");
}
}
Log($"📊 接收任务结束 (成功:{lockSuccessCount}, 超时:{lockTimeoutCount})");
}
private void TriggerDisconnect(string reason)
{
if (_isDisconnected) return; // 避免重复触发
_isDisconnected = true;
_isConnected = false;
_isPeerFrozen = false;
Log($"触发断开事件:{reason}");
Disconnected?.Invoke(reason);
// 更新共享内存状态
//UpdateConnectionStatus(0);
Task.Run(() => UpdateConnectionStatus(0));
}
/// <summary>
/// ✅ 发送数据(使用数据锁)
/// </summary>
public unsafe void SendData(string data)
{
if (_accessor == null || _dataMutex == null || !_isConnected)
throw new InvalidOperationException("未连接");
if (!_dataMutex.WaitOne(2000))
{
ErrorOccurred?.Invoke("获取数据锁超时2 秒)");
TriggerDisconnect("获取数据锁超时");
return;
}
try
{
var header = ReadHeader();
bool peerRead = _identity == kepId ? header.B_HasData == 0 : header.A_HasData == 0;
if (!peerRead)
{
ErrorOccurred?.Invoke("对端未读取上一条数据");
return;
}
int offset = _identity == kepId ? MemoryLayout.AToBOffset : MemoryLayout.BToAOffset;
DataPacket packet = new DataPacket
{
Timestamp = DateTime.Now.Ticks,
DataType = 2,
DataLength = Math.Min(data.Length, 1000),
SequenceId = ++_sequenceId
};
SharedMemoryHelper.WriteToFixedBuffer(packet.Data, data, 1000);
WritePacket(offset, packet);
if (_identity == kepId)
{
header.A_HasData = 1;
header.A_DataLength = packet.DataLength;
}
else
{
header.B_HasData = 1;
header.B_DataLength = packet.DataLength;
}
WriteHeader(header);
Log($"📤 发送数据:{data}");
}
catch (IOException ex)
{
TriggerDisconnect($"发送数据失败:{ex.Message}");
}
catch (Exception ex)
{
ErrorOccurred?.Invoke($"发送失败:{ex.Message}");
}
finally
{
try { _dataMutex?.ReleaseMutex(); } catch { }
}
}
private void TriggerHardDisconnect(string reason)
{
if (_currentStatus == ConnectionStatus.Disconnected) return;
_currentStatus = ConnectionStatus.Disconnected;
_isConnected = false;
_isDisconnected = true;
_isPeerFrozen = false;
Log($"❌ 硬断开:{reason}");
Disconnected?.Invoke(reason);
UpdateConnectionStatus(0);
}
private void UpdateConnectionStatus(int status)
{
if (_accessor == null || _heartbeatMutex == null) return;
try
{
if (_heartbeatMutex.WaitOne(500))
{
try
{
var header = ReadHeader();
header.ConnectionStatus = status;
WriteHeader(header);
}
finally
{
try { _heartbeatMutex?.ReleaseMutex(); } catch { }
}
}
}
catch { }
}
private unsafe string ReadPacketData(int offset, int dataLength)
{
DataPacket packet = new DataPacket();
if(_accessor != null)
_accessor.Read(offset, out packet);
return SharedMemoryHelper.ReadFromFixedBuffer(packet.Data, dataLength);
}
private SharedMemoryHeader ReadHeader()
{
SharedMemoryHeader header = new SharedMemoryHeader();
if (_accessor != null)
_accessor.Read(0, out header);
return header;
}
private void WriteHeader(SharedMemoryHeader header)
{
if (_accessor != null)
_accessor.Write(0, ref header);
}
private unsafe void WritePacket(int offset, DataPacket packet)
{
if (_accessor != null)
_accessor.Write(offset, ref packet);
}
private void CleanupExistingMemory()
{
try
{
var existing = MemoryMappedFile.OpenExisting(_mapName);
Log("⚠️ 发现残留共享内存,已清理");
existing.Dispose();
GC.Collect();
GC.WaitForPendingFinalizers();
Thread.Sleep(100);
}
catch (FileNotFoundException) { }
}
private void InitializeMemory()
{
if (_accessor == null) return;
var header = new SharedMemoryHeader
{
IsLocked = 0,
A_LastHeartbeat = DateTime.Now.Ticks,
B_LastHeartbeat = 0,
A_SessionId = _sessionId,
B_SessionId = 0,
A_HasData = 0,
B_HasData = 0,
ConnectionStatus = 1,
Reserved1 = 0,
Reserved2 = 0,
Reserved3 = 0,
Reserved4 = 0
};
_accessor.Write(0, ref header);
}
private async Task RegisterSessionIdAsync()
{
if (_accessor == null || _heartbeatMutex == null) return;
try
{
if (_heartbeatMutex.WaitOne(1000))
{
try
{
var header = ReadHeader();
if (_identity == kepId)
{
header.A_SessionId = _sessionId;
header.A_LastHeartbeat = DateTime.Now.Ticks;
}
else
{
header.B_SessionId = _sessionId;
header.B_LastHeartbeat = DateTime.Now.Ticks;
}
WriteHeader(header);
}
finally
{
try { _heartbeatMutex?.ReleaseMutex(); } catch { }
}
}
}
catch (Exception ex)
{
ErrorOccurred?.Invoke($"注册会话 ID 失败:{ex.Message}");
}
}
private void Log(string msg) => System.Diagnostics.Debug.WriteLine($"[{_identity}] {msg}");
public void Dispose()
{
if (!_isDisconnected)
{
TriggerHardDisconnect("本地关闭");
}
_cts?.Cancel();
_heartbeatTask?.Wait(1000);
_receiveTask?.Wait(1000);
_accessor?.Dispose();
_mmf?.Dispose();
_heartbeatMutex?.Dispose();
_dataMutex?.Dispose();
_cts?.Dispose();
}
}
}