You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kev/Drawer/IPCLib/MemoryCommunication.cs

608 lines
20 KiB
C#

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.IO;
using System.IO.MemoryMappedFiles;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace IPCLib
{
// MmfCommunication.cs
// MmfCommunication.cs - 完整修复版(包含 Disconnected 事件触发)
public class MmfCommunication : IDisposable
{
const string kepId = "KEP";
private MemoryMappedFile _mmf;
private MemoryMappedViewAccessor _accessor;
// ✅ 修复:使用两个独立的 Mutex
private Mutex _heartbeatMutex; // 心跳专用锁
private Mutex _dataMutex; // 数据专用锁
private CancellationTokenSource _cts;
private Task _heartbeatTask;
private Task _receiveTask;
private readonly string _mapName;
private readonly bool _isServer;
private readonly string _identity;
private int _heartbeatInterval = 1000;
private int _heartbeatTimeout = 20000;
private bool _isConnected = false;
private bool _isDisconnected = false;
private bool _isPeerFrozen = false;
private long _sessionId;
private long _sequenceId = 0;
private ConnectionStatus _currentStatus = ConnectionStatus.Connecting;
public event Action<string> DataReceived;
public event Action Connected;
public event Action<string> Disconnected;
public event Action<string> PeerFrozen;
public event Action<string> PeerRecovered;
public event Action<string> ErrorOccurred;
public event Action<string> StatusChanged;
public event Action ConnectionRecovered;
public bool IsConnected => _isConnected;
public bool IsDisconnected => _isDisconnected;
public long SessionId => _sessionId;
public MmfCommunication(string mapName, bool isServer, string identity)
{
_mapName = mapName;
_isServer = isServer;
_identity = identity;
_sessionId = DateTime.Now.Ticks;
}
public async Task StartAsync()
{
try
{
if (_isServer)
{
CleanupExistingMemory();
_mmf = MemoryMappedFile.CreateNew(_mapName, MemoryLayout.TotalSize);
InitializeMemory();
Log($"共享内存已创建 (SessionId: {_sessionId})");
}
else
{
for (int i = 0; i < 50; i++)
{
try
{
_mmf = MemoryMappedFile.OpenExisting(_mapName);
break;
}
catch (FileNotFoundException)
{
await Task.Delay(100);
}
}
if (_mmf == null)
throw new TimeoutException("等待共享内存创建超时");
Log($"共享内存已打开 (SessionId: {_sessionId})");
}
_accessor = _mmf.CreateViewAccessor();
// ✅ 修复:创建两个独立的 Mutex
_heartbeatMutex = new Mutex(false, _mapName + "_HeartbeatMutex");
_dataMutex = new Mutex(false, _mapName + "_DataMutex");
await RegisterSessionIdAsync();
_isConnected = true;
_isDisconnected = false;
_cts = new CancellationTokenSource();
_heartbeatTask = HeartbeatLoopAsync(_cts.Token);
_receiveTask = ReceiveLoopAsync(_cts.Token);
Connected?.Invoke();
Log("通讯模块已启动");
}
catch (Exception ex)
{
_isConnected = false;
_isDisconnected = true;
ErrorOccurred?.Invoke($"启动失败:{ex.Message}");
throw;
}
}
/// <summary>
/// ✅ 心跳循环(使用独立的心跳锁)
/// </summary>
private async Task HeartbeatLoopAsync(CancellationToken ct)
{
Log($"❤️ 心跳任务启动");
while (!ct.IsCancellationRequested && _isConnected)
{
try
{
await Task.Delay(_heartbeatInterval, ct);
// ✅ 使用心跳专用锁,不与数据操作竞争
if (_heartbeatMutex?.WaitOne(500) == true)
{
try
{
SendHeartbeatInternal();
}
finally
{
try { _heartbeatMutex?.ReleaseMutex(); } catch { }
}
}
else
{
Log($"⚠️ 心跳锁超时(可忽略)");
}
CheckHeartbeatTimeout();
}
catch (OperationCanceledException) { break; }
catch (Exception ex)
{
ErrorOccurred?.Invoke($"心跳错误:{ex.Message}");
}
}
Log($"❤️ 心跳任务退出");
}
/// <summary>
/// ✅ 内部心跳发送(不获取锁,调用者负责)
/// </summary>
private void SendHeartbeatInternal()
{
if (_accessor == null) return;
try
{
var header = ReadHeader();
long now = DateTime.Now.Ticks;
if (_identity == kepId)
{
header.A_LastHeartbeat = now;
header.A_SequenceId++;
}
else
{
header.B_LastHeartbeat = now;
header.B_SequenceId++;
}
header.ConnectionStatus = (int)_currentStatus;
WriteHeader(header);
}
catch (Exception ex)
{
ErrorOccurred?.Invoke($"发送心跳失败:{ex.Message}");
}
}
/// <summary>
/// ✅ 检查心跳超时(使用心跳锁)
/// </summary>
private void CheckHeartbeatTimeout()
{
if (_accessor == null) return;
try
{
if (_heartbeatMutex?.WaitOne(500) != true) return;
try
{
var header = ReadHeader();
long peerHeartbeatTicks = _identity == kepId ? header.B_LastHeartbeat : header.A_LastHeartbeat;
if (peerHeartbeatTicks == 0) return;
DateTime peerHeartbeat = new DateTime(peerHeartbeatTicks);
var timeSincePeer = DateTime.Now - peerHeartbeat;
double seconds = timeSincePeer.TotalSeconds;
if (seconds > _heartbeatTimeout * 3)
{
if (_currentStatus != ConnectionStatus.Disconnected)
{
Log($"❌ 硬断开:对端 {seconds:F0} 秒无心跳");
TriggerHardDisconnect($"硬断开:对端 {seconds:F0} 秒无心跳");
}
}
else if (seconds > _heartbeatTimeout * 2)
{
if (_currentStatus != ConnectionStatus.SoftDisconnected)
{
_currentStatus = ConnectionStatus.SoftDisconnected;
Log($"🔄 软断开:对端 {seconds:F0} 秒无心跳(可恢复)");
StatusChanged?.Invoke($"软断开:{seconds:F0}秒");
}
}
else if (seconds > _heartbeatTimeout)
{
if (_currentStatus == ConnectionStatus.Connected)
{
_currentStatus = ConnectionStatus.Warning;
Log($"⚠️ 心跳警告:对端 {seconds:F0} 秒无心跳");
StatusChanged?.Invoke($"警告:{seconds:F0}秒");
}
}
else
{
if (_currentStatus != ConnectionStatus.Connected)
{
Log($"✅ 心跳恢复:对端恢复正常 ({seconds:F1}秒)");
var previousStatus = _currentStatus;
_currentStatus = ConnectionStatus.Connected;
if (previousStatus == ConnectionStatus.SoftDisconnected)
{
ConnectionRecovered?.Invoke();
StatusChanged?.Invoke("连接已恢复");
}
header.ConnectionStatus = (int)ConnectionStatus.Connected;
WriteHeader(header);
}
}
}
finally
{
try { _heartbeatMutex?.ReleaseMutex(); } catch { }
}
}
catch (Exception ex)
{
ErrorOccurred?.Invoke($"心跳检查失败:{ex.Message}");
}
}
/// <summary>
/// ✅ 接收数据循环(使用独立的数据锁)
/// </summary>
private async Task ReceiveLoopAsync(CancellationToken ct)
{
Log($"📥 接收任务启动");
int lockSuccessCount = 0;
int lockTimeoutCount = 0;
DateTime lastLogTime = DateTime.Now;
while (!ct.IsCancellationRequested && _isConnected)
{
try
{
await Task.Delay(100, ct);
if (_accessor == null || _dataMutex == null) continue;
// ✅ 使用数据专用锁,不与心跳竞争
if (_dataMutex.WaitOne(1000))
{
lockSuccessCount++;
try
{
var header = ReadHeader();
bool hasData = _identity == kepId ? header.B_HasData == 1 : header.A_HasData == 1;
if (hasData)
{
int offset = _identity == kepId ? MemoryLayout.BToAOffset : MemoryLayout.AToBOffset;
int dataLength = _identity == kepId ? header.B_DataLength : header.A_DataLength;
if (dataLength > 0 && dataLength <= 1000)
{
string data = ReadPacketData(offset, dataLength);
if (_identity == kepId)
header.B_HasData = 0;
else
header.A_HasData = 0;
WriteHeader(header);
DataReceived?.Invoke(data);
Log($"📥 收到数据:{data}");
}
}
}
catch (Exception ex)
{
Log($"❌ 读取数据异常:{ex.Message}");
}
finally
{
try { _dataMutex?.ReleaseMutex(); } catch { }
}
}
else
{
lockTimeoutCount++;
if ((DateTime.Now - lastLogTime).TotalSeconds > 2)
{
Log($"⚠️ 数据锁超时 (成功:{lockSuccessCount}, 超时:{lockTimeoutCount})");
lastLogTime = DateTime.Now;
}
await Task.Delay(50, ct);
}
}
catch (OperationCanceledException) { break; }
catch (IOException ex)
{
Log($"IO 异常:{ex.Message}");
TriggerDisconnect($"共享内存访问错误:{ex.Message}");
break;
}
catch (Exception ex)
{
ErrorOccurred?.Invoke($"接收错误:{ex.Message}");
}
}
Log($"📊 接收任务结束 (成功:{lockSuccessCount}, 超时:{lockTimeoutCount})");
}
private void TriggerDisconnect(string reason)
{
if (_isDisconnected) return; // 避免重复触发
_isDisconnected = true;
_isConnected = false;
_isPeerFrozen = false;
Log($"触发断开事件:{reason}");
Disconnected?.Invoke(reason);
// 更新共享内存状态
//UpdateConnectionStatus(0);
Task.Run(() => UpdateConnectionStatus(0));
}
/// <summary>
/// ✅ 发送数据(使用数据锁)
/// </summary>
public unsafe void SendData(string data)
{
if (_accessor == null || _dataMutex == null || !_isConnected)
throw new InvalidOperationException("未连接");
if (!_dataMutex.WaitOne(2000))
{
ErrorOccurred?.Invoke("获取数据锁超时2 秒)");
TriggerDisconnect("获取数据锁超时");
return;
}
try
{
var header = ReadHeader();
bool peerRead = _identity == kepId ? header.B_HasData == 0 : header.A_HasData == 0;
if (!peerRead)
{
ErrorOccurred?.Invoke("对端未读取上一条数据");
return;
}
int offset = _identity == kepId ? MemoryLayout.AToBOffset : MemoryLayout.BToAOffset;
DataPacket packet = new DataPacket
{
Timestamp = DateTime.Now.Ticks,
DataType = 2,
DataLength = Math.Min(data.Length, 1000),
SequenceId = ++_sequenceId
};
SharedMemoryHelper.WriteToFixedBuffer(packet.Data, data, 1000);
WritePacket(offset, packet);
if (_identity == kepId)
{
header.A_HasData = 1;
header.A_DataLength = packet.DataLength;
}
else
{
header.B_HasData = 1;
header.B_DataLength = packet.DataLength;
}
WriteHeader(header);
Log($"📤 发送数据:{data}");
}
catch (IOException ex)
{
TriggerDisconnect($"发送数据失败:{ex.Message}");
}
catch (Exception ex)
{
ErrorOccurred?.Invoke($"发送失败:{ex.Message}");
}
finally
{
try { _dataMutex?.ReleaseMutex(); } catch { }
}
}
private void TriggerHardDisconnect(string reason)
{
if (_currentStatus == ConnectionStatus.Disconnected) return;
_currentStatus = ConnectionStatus.Disconnected;
_isConnected = false;
_isDisconnected = true;
_isPeerFrozen = false;
Log($"❌ 硬断开:{reason}");
Disconnected?.Invoke(reason);
UpdateConnectionStatus(0);
}
private void UpdateConnectionStatus(int status)
{
if (_accessor == null || _heartbeatMutex == null) return;
try
{
if (_heartbeatMutex.WaitOne(500))
{
try
{
var header = ReadHeader();
header.ConnectionStatus = status;
WriteHeader(header);
}
finally
{
try { _heartbeatMutex?.ReleaseMutex(); } catch { }
}
}
}
catch { }
}
private unsafe string ReadPacketData(int offset, int dataLength)
{
DataPacket packet = new DataPacket();
if(_accessor != null)
_accessor.Read(offset, out packet);
return SharedMemoryHelper.ReadFromFixedBuffer(packet.Data, dataLength);
}
private SharedMemoryHeader ReadHeader()
{
SharedMemoryHeader header = new SharedMemoryHeader();
if (_accessor != null)
_accessor.Read(0, out header);
return header;
}
private void WriteHeader(SharedMemoryHeader header)
{
if (_accessor != null)
_accessor.Write(0, ref header);
}
private unsafe void WritePacket(int offset, DataPacket packet)
{
if (_accessor != null)
_accessor.Write(offset, ref packet);
}
private void CleanupExistingMemory()
{
try
{
var existing = MemoryMappedFile.OpenExisting(_mapName);
Log("⚠️ 发现残留共享内存,已清理");
existing.Dispose();
GC.Collect();
GC.WaitForPendingFinalizers();
Thread.Sleep(100);
}
catch (FileNotFoundException) { }
}
private void InitializeMemory()
{
if (_accessor == null) return;
var header = new SharedMemoryHeader
{
IsLocked = 0,
A_LastHeartbeat = DateTime.Now.Ticks,
B_LastHeartbeat = 0,
A_SessionId = _sessionId,
B_SessionId = 0,
A_HasData = 0,
B_HasData = 0,
ConnectionStatus = 1,
Reserved1 = 0,
Reserved2 = 0,
Reserved3 = 0,
Reserved4 = 0
};
_accessor.Write(0, ref header);
}
private async Task RegisterSessionIdAsync()
{
if (_accessor == null || _heartbeatMutex == null) return;
try
{
if (_heartbeatMutex.WaitOne(1000))
{
try
{
var header = ReadHeader();
if (_identity == kepId)
{
header.A_SessionId = _sessionId;
header.A_LastHeartbeat = DateTime.Now.Ticks;
}
else
{
header.B_SessionId = _sessionId;
header.B_LastHeartbeat = DateTime.Now.Ticks;
}
WriteHeader(header);
}
finally
{
try { _heartbeatMutex?.ReleaseMutex(); } catch { }
}
}
}
catch (Exception ex)
{
ErrorOccurred?.Invoke($"注册会话 ID 失败:{ex.Message}");
}
}
private void Log(string msg) => System.Diagnostics.Debug.WriteLine($"[{_identity}] {msg}");
public void Dispose()
{
if (!_isDisconnected)
{
TriggerHardDisconnect("本地关闭");
}
_cts?.Cancel();
_heartbeatTask?.Wait(1000);
_receiveTask?.Wait(1000);
_accessor?.Dispose();
_mmf?.Dispose();
_heartbeatMutex?.Dispose();
_dataMutex?.Dispose();
_cts?.Dispose();
}
}
}