You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kev/Drawer/IPCLib/TcpCommunication.cs

485 lines
15 KiB
C#

1 month ago
// TcpManager.cs
using IPCLib;
using System;
using System.IO;
using System.Net.Sockets;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
// TcpManager.cs - 修正版
// TcpManager.cs - 完整修复版
using System.Collections.Concurrent;
public class TcpManager : IDisposable
{
private TcpListener _listener;
private TcpClient _client;
private NetworkStream _stream;
private CancellationTokenSource _cts = new CancellationTokenSource();
private Task _receiveTask;
private Task _acceptTask;
private System.Threading.Timer _heartbeatTimer;
private System.Threading.Timer _timeoutCheckTimer;
private long _sequenceId = 0;
private DateTime _lastHeartbeatTime = DateTime.Now;
private int _heartbeatTimeoutSeconds = 5;
private bool _isConnected = false;
private readonly object _lockObj = new object(); // 线程锁
public bool IsServer { get; set; }
public int HeartbeatInterval { get; set; } = 1000;
public string LocalIdentity { get; set; } = "App";
public int MaxReconnectAttempts { get; set; } = 5; // 最大重连次数
public int ReconnectDelay { get; set; } = 2000; // 重连间隔
// 事件
public event Action<HeartbeatPacket> PacketReceived;
public event Action Connected;
public event Action<string> Disconnected;
public event Action<string, Exception> ErrorOccurred; // 添加 Exception 参数
private readonly ConcurrentQueue<string> _dataQueue = new ConcurrentQueue<string>();
/// <summary>
/// 服务端启动监听
/// </summary>
public async Task StartServerAsync(int port)
{
IsServer = true;
try
{
// ✅ 设置 SO_REUSEADDR允许端口重用
_listener = new TcpListener(System.Net.IPAddress.Loopback, port);
_listener.Server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
_listener.Start();
Log($"服务端已启动,监听端口:{port}");
_acceptTask = AcceptClientAsync();
await Task.CompletedTask;
}
catch (SocketException ex)
{
HandleSocketError("启动监听失败", ex);
throw;
}
catch (Exception ex)
{
ErrorOccurred?.Invoke("启动监听失败", ex);
throw;
}
}
/// <summary>
/// 后台接受客户端连接
/// </summary>
private async Task AcceptClientAsync()
{
try
{
Log("等待客户端连接...");
_client = await _listener.AcceptTcpClientAsync(_cts.Token);
// ✅ 配置 Socket 选项
ConfigureSocket(_client);
_stream = _client.GetStream();
_lastHeartbeatTime = DateTime.Now;
_isConnected = true;
Log("客户端已连接");
Connected?.Invoke();
_receiveTask = ReceiveLoopAsync();
StartHeartbeatTimer();
StartTimeoutCheckTimer();
}
catch (OperationCanceledException)
{
Log("接受连接已取消");
}
catch (SocketException ex)
{
HandleSocketError("接受连接失败", ex);
}
catch (Exception ex)
{
ErrorOccurred?.Invoke("接受连接失败", ex);
}
}
private void StartHeartbeatTimer()
{
_heartbeatTimer = new System.Threading.Timer(async _ =>
{
await SendHeartbeatAsync();
}, null, 0, HeartbeatInterval);
}
/// <summary>
/// ✅ 启动超时检测定时器(核心修复)
/// </summary>
private void StartTimeoutCheckTimer()
{
_timeoutCheckTimer = new System.Threading.Timer(_ =>
{
CheckConnectionTimeout();
}, null, HeartbeatInterval, HeartbeatInterval);
}
/// <summary>
/// 客户端启动连接(带重连)
/// </summary>
public async Task StartClientAsync(string host, int port)
{
IsServer = false;
int reconnectCount = 0;
while (!_cts.Token.IsCancellationRequested)
{
try
{
Log($"尝试连接 {host}:{port} (尝试 {reconnectCount + 1}/{MaxReconnectAttempts})...");
_client = new TcpClient();
_client.ReceiveTimeout = 5000; // ✅ 设置接收超时
_client.SendTimeout = 5000; // ✅ 设置发送超时
await _client.ConnectAsync(host, port, _cts.Token);
ConfigureSocket(_client);
_stream = _client.GetStream();
_lastHeartbeatTime = DateTime.Now;
_isConnected = true;
Log($"已连接到服务端 {host}:{port}");
Connected?.Invoke();
_receiveTask = ReceiveLoopAsync();
StartHeartbeatTimer();
StartTimeoutCheckTimer();
return; // 连接成功,退出重连循环
}
catch (SocketException ex)
{
reconnectCount++;
HandleSocketError($"连接失败 (尝试 {reconnectCount}/{MaxReconnectAttempts})", ex);
if (reconnectCount >= MaxReconnectAttempts)
{
ErrorOccurred?.Invoke($"达到最大重连次数 {MaxReconnectAttempts}", ex);
await HandleDisconnect($"达到最大重连次数:{ex.Message}");
return;
}
Log($"{ReconnectDelay}ms 后重试...");
await Task.Delay(ReconnectDelay, _cts.Token);
}
catch (OperationCanceledException)
{
Log("连接已取消");
return;
}
catch (Exception ex)
{
reconnectCount++;
ErrorOccurred?.Invoke($"连接异常 (尝试 {reconnectCount}/{MaxReconnectAttempts})", ex);
if (reconnectCount >= MaxReconnectAttempts)
{
await HandleDisconnect($"连接异常:{ex.Message}");
return;
}
await Task.Delay(ReconnectDelay, _cts.Token);
}
}
}
/// <summary>
/// ✅ 配置 Socket 选项
/// </summary>
private void ConfigureSocket(TcpClient client)
{
try
{
// 启用 KeepAlive
client.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
// 禁用 Nagle 算法(减少延迟)
client.NoDelay = true;
// 设置缓冲区大小
client.ReceiveBufferSize = 8192;
client.SendBufferSize = 8192;
}
catch (Exception ex)
{
Log($"配置 Socket 失败:{ex.Message}");
}
}
/// <summary>
/// ✅ 处理 Socket 错误
/// </summary>
private void HandleSocketError(string context, SocketException ex)
{
string errorMessage = $"{context} - Socket 错误 {ex.SocketErrorCode} ({ex.NativeErrorCode}): {ex.Message}";
Log($"❌ {errorMessage}");
ErrorOccurred?.Invoke(errorMessage, ex);
// 根据错误码采取不同措施
switch (ex.SocketErrorCode)
{
case SocketError.ConnectionReset: // 10054
case SocketError.ConnectionAborted: // 10053
case SocketError.Shutdown: // 10058
Log("连接被对端关闭,触发断开事件");
Task.Run(() => HandleDisconnect("连接被对端关闭"));
break;
case SocketError.AddressAlreadyInUse: // 10048
Log("端口被占用,尝试使用其他端口");
break;
case SocketError.ConnectionRefused: // 10061
Log("连接被拒绝,检查服务端是否运行");
break;
case SocketError.TimedOut: // 10060
Log("连接超时");
break;
default:
Log($"未处理的 Socket 错误:{ex.SocketErrorCode}");
break;
}
}
/// <summary>
/// 检查连接是否超时
/// </summary>
private void CheckConnectionTimeout()
{
try
{
lock (_lockObj)
{
var timeSinceLastHeartbeat = DateTime.Now - _lastHeartbeatTime;
if (timeSinceLastHeartbeat.TotalSeconds > _heartbeatTimeoutSeconds)
{
Log($"⚠️ 心跳超时!最后心跳:{timeSinceLastHeartbeat.TotalSeconds:F1} 秒前");
Task.Run(() => HandleDisconnect("心跳超时"));
}
}
}
catch (Exception ex)
{
Log($"超时检测错误:{ex.Message}");
}
}
/// <summary>
/// 处理断开连接
/// </summary>
private async Task HandleDisconnect(string reason)
{
lock (_lockObj)
{
if (!_isConnected) return; // 避免重复触发
_isConnected = false;
}
Log($"处理断开:{reason}");
Disconnected?.Invoke(reason);
await CleanupAsync();
}
/// <summary>
/// 接收消息循环(增强错误处理)
/// </summary>
private async Task ReceiveLoopAsync()
{
if (_stream == null) return;
var buffer = new byte[4096];
var sb = new StringBuilder();
try
{
while (!_cts.Token.IsCancellationRequested && _isConnected)
{
var readCts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token);
readCts.CancelAfter(5000);
try
{
int read = await _stream.ReadAsync(buffer, 0, buffer.Length, readCts.Token);
if (read == 0)
{
Log("连接已关闭Read 返回 0");
await HandleDisconnect("连接已关闭");
break;
}
lock (_lockObj) { _lastHeartbeatTime = DateTime.Now; }
string text = Encoding.UTF8.GetString(buffer, 0, read);
sb.Append(text);
string content = sb.ToString();
int idx;
while ((idx = content.IndexOf('\n')) >= 0)
{
string line = content.Substring(0, idx).Trim('\r');
if (!string.IsNullOrEmpty(line))
{
try
{
var packet = JsonSerializer.Deserialize<HeartbeatPacket>(line);
if(packet !=null)
PacketReceived?.Invoke(packet);
}
catch (JsonException ex)
{
Log($"JSON 解析错误:{ex.Message}");
}
}
content = content.Substring(idx + 1);
}
sb.Clear();
sb.Append(content);
}
catch (OperationCanceledException)
{
continue;
}
catch (IOException ex) when (ex.InnerException is SocketException sockEx)
{
HandleSocketError("接收数据时 Socket 错误", sockEx);
await HandleDisconnect($"Socket 错误:{sockEx.SocketErrorCode}");
break;
}
catch (IOException ex)
{
Log($"IO 错误:{ex.Message}");
await HandleDisconnect($"IO 错误:{ex.Message}");
break;
}
catch (ObjectDisposedException)
{
Log("Stream 已释放");
await HandleDisconnect("Stream 已释放");
break;
}
}
}
catch (Exception ex)
{
Log($"接收循环错误:{ex.Message}");
await HandleDisconnect($"接收错误:{ex.Message}");
}
}
/// <summary>
/// 发送心跳(增强错误处理)
/// </summary>
private async Task SendHeartbeatAsync()
{
if (!_isConnected || _stream == null || _client?.Connected != true) return;
string payload = "";
if (_dataQueue.TryDequeue(out var data))
{
payload = data;
}
var packet = new HeartbeatPacket
{
Source = LocalIdentity,
Timestamp = DateTime.Now.Ticks,
SequenceId = Interlocked.Increment(ref _sequenceId),
Payload = payload
};
try
{
string json = JsonSerializer.Serialize(packet) + "\n";
byte[] dataBytes = Encoding.UTF8.GetBytes(json);
await _stream.WriteAsync(dataBytes, 0, dataBytes.Length, _cts.Token);
await _stream.FlushAsync(_cts.Token);
}
catch (IOException ex) when (ex.InnerException is SocketException sockEx)
{
HandleSocketError("发送心跳时 Socket 错误", sockEx);
await HandleDisconnect($"发送失败:{sockEx.SocketErrorCode}");
}
catch (ObjectDisposedException)
{
Log("Stream 已释放,无法发送");
await HandleDisconnect("Stream 已释放");
}
catch (Exception ex)
{
Log($"发送失败:{ex.Message}");
await HandleDisconnect($"发送失败:{ex.Message}");
}
}
public void PushData(string jsonData)
{
if (!_isConnected)
{
Log("⚠️ 未连接,数据无法发送");
return;
}
_dataQueue.Enqueue(jsonData);
}
private void Log(string msg) => System.Diagnostics.Debug.WriteLine($"[{LocalIdentity}] {msg}");
private async Task CleanupAsync()
{
lock (_lockObj)
{
_isConnected = false;
}
_heartbeatTimer?.Dispose();
_timeoutCheckTimer?.Dispose();
try
{
_stream?.Dispose();
}
catch (Exception ex) { Log($"释放 Stream 失败:{ex.Message}"); }
try
{
_client?.Dispose();
}
catch (Exception ex) { Log($"释放 Client 失败:{ex.Message}"); }
try
{
_listener?.Stop();
}
catch (Exception ex) { Log($"停止 Listener 失败:{ex.Message}"); }
_cts.Cancel();
}
public void Dispose()
{
_cts.Cancel();
CleanupAsync().Wait();
_cts.Dispose();
}
}