You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kev/Drawer/IPCLib/TcpCommunication.cs

485 lines
15 KiB
C#

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

// TcpManager.cs
using IPCLib;
using System;
using System.IO;
using System.Net.Sockets;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
// TcpManager.cs - 修正版
// TcpManager.cs - 完整修复版
using System.Collections.Concurrent;
public class TcpManager : IDisposable
{
private TcpListener _listener;
private TcpClient _client;
private NetworkStream _stream;
private CancellationTokenSource _cts = new CancellationTokenSource();
private Task _receiveTask;
private Task _acceptTask;
private System.Threading.Timer _heartbeatTimer;
private System.Threading.Timer _timeoutCheckTimer;
private long _sequenceId = 0;
private DateTime _lastHeartbeatTime = DateTime.Now;
private int _heartbeatTimeoutSeconds = 5;
private bool _isConnected = false;
private readonly object _lockObj = new object(); // 线程锁
public bool IsServer { get; set; }
public int HeartbeatInterval { get; set; } = 1000;
public string LocalIdentity { get; set; } = "App";
public int MaxReconnectAttempts { get; set; } = 5; // 最大重连次数
public int ReconnectDelay { get; set; } = 2000; // 重连间隔
// 事件
public event Action<HeartbeatPacket> PacketReceived;
public event Action Connected;
public event Action<string> Disconnected;
public event Action<string, Exception> ErrorOccurred; // 添加 Exception 参数
private readonly ConcurrentQueue<string> _dataQueue = new ConcurrentQueue<string>();
/// <summary>
/// 服务端启动监听
/// </summary>
public async Task StartServerAsync(int port)
{
IsServer = true;
try
{
// ✅ 设置 SO_REUSEADDR允许端口重用
_listener = new TcpListener(System.Net.IPAddress.Loopback, port);
_listener.Server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
_listener.Start();
Log($"服务端已启动,监听端口:{port}");
_acceptTask = AcceptClientAsync();
await Task.CompletedTask;
}
catch (SocketException ex)
{
HandleSocketError("启动监听失败", ex);
throw;
}
catch (Exception ex)
{
ErrorOccurred?.Invoke("启动监听失败", ex);
throw;
}
}
/// <summary>
/// 后台接受客户端连接
/// </summary>
private async Task AcceptClientAsync()
{
try
{
Log("等待客户端连接...");
_client = await _listener.AcceptTcpClientAsync(_cts.Token);
// ✅ 配置 Socket 选项
ConfigureSocket(_client);
_stream = _client.GetStream();
_lastHeartbeatTime = DateTime.Now;
_isConnected = true;
Log("客户端已连接");
Connected?.Invoke();
_receiveTask = ReceiveLoopAsync();
StartHeartbeatTimer();
StartTimeoutCheckTimer();
}
catch (OperationCanceledException)
{
Log("接受连接已取消");
}
catch (SocketException ex)
{
HandleSocketError("接受连接失败", ex);
}
catch (Exception ex)
{
ErrorOccurred?.Invoke("接受连接失败", ex);
}
}
private void StartHeartbeatTimer()
{
_heartbeatTimer = new System.Threading.Timer(async _ =>
{
await SendHeartbeatAsync();
}, null, 0, HeartbeatInterval);
}
/// <summary>
/// ✅ 启动超时检测定时器(核心修复)
/// </summary>
private void StartTimeoutCheckTimer()
{
_timeoutCheckTimer = new System.Threading.Timer(_ =>
{
CheckConnectionTimeout();
}, null, HeartbeatInterval, HeartbeatInterval);
}
/// <summary>
/// 客户端启动连接(带重连)
/// </summary>
public async Task StartClientAsync(string host, int port)
{
IsServer = false;
int reconnectCount = 0;
while (!_cts.Token.IsCancellationRequested)
{
try
{
Log($"尝试连接 {host}:{port} (尝试 {reconnectCount + 1}/{MaxReconnectAttempts})...");
_client = new TcpClient();
_client.ReceiveTimeout = 5000; // ✅ 设置接收超时
_client.SendTimeout = 5000; // ✅ 设置发送超时
await _client.ConnectAsync(host, port, _cts.Token);
ConfigureSocket(_client);
_stream = _client.GetStream();
_lastHeartbeatTime = DateTime.Now;
_isConnected = true;
Log($"已连接到服务端 {host}:{port}");
Connected?.Invoke();
_receiveTask = ReceiveLoopAsync();
StartHeartbeatTimer();
StartTimeoutCheckTimer();
return; // 连接成功,退出重连循环
}
catch (SocketException ex)
{
reconnectCount++;
HandleSocketError($"连接失败 (尝试 {reconnectCount}/{MaxReconnectAttempts})", ex);
if (reconnectCount >= MaxReconnectAttempts)
{
ErrorOccurred?.Invoke($"达到最大重连次数 {MaxReconnectAttempts}", ex);
await HandleDisconnect($"达到最大重连次数:{ex.Message}");
return;
}
Log($"{ReconnectDelay}ms 后重试...");
await Task.Delay(ReconnectDelay, _cts.Token);
}
catch (OperationCanceledException)
{
Log("连接已取消");
return;
}
catch (Exception ex)
{
reconnectCount++;
ErrorOccurred?.Invoke($"连接异常 (尝试 {reconnectCount}/{MaxReconnectAttempts})", ex);
if (reconnectCount >= MaxReconnectAttempts)
{
await HandleDisconnect($"连接异常:{ex.Message}");
return;
}
await Task.Delay(ReconnectDelay, _cts.Token);
}
}
}
/// <summary>
/// ✅ 配置 Socket 选项
/// </summary>
private void ConfigureSocket(TcpClient client)
{
try
{
// 启用 KeepAlive
client.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
// 禁用 Nagle 算法(减少延迟)
client.NoDelay = true;
// 设置缓冲区大小
client.ReceiveBufferSize = 8192;
client.SendBufferSize = 8192;
}
catch (Exception ex)
{
Log($"配置 Socket 失败:{ex.Message}");
}
}
/// <summary>
/// ✅ 处理 Socket 错误
/// </summary>
private void HandleSocketError(string context, SocketException ex)
{
string errorMessage = $"{context} - Socket 错误 {ex.SocketErrorCode} ({ex.NativeErrorCode}): {ex.Message}";
Log($"❌ {errorMessage}");
ErrorOccurred?.Invoke(errorMessage, ex);
// 根据错误码采取不同措施
switch (ex.SocketErrorCode)
{
case SocketError.ConnectionReset: // 10054
case SocketError.ConnectionAborted: // 10053
case SocketError.Shutdown: // 10058
Log("连接被对端关闭,触发断开事件");
Task.Run(() => HandleDisconnect("连接被对端关闭"));
break;
case SocketError.AddressAlreadyInUse: // 10048
Log("端口被占用,尝试使用其他端口");
break;
case SocketError.ConnectionRefused: // 10061
Log("连接被拒绝,检查服务端是否运行");
break;
case SocketError.TimedOut: // 10060
Log("连接超时");
break;
default:
Log($"未处理的 Socket 错误:{ex.SocketErrorCode}");
break;
}
}
/// <summary>
/// 检查连接是否超时
/// </summary>
private void CheckConnectionTimeout()
{
try
{
lock (_lockObj)
{
var timeSinceLastHeartbeat = DateTime.Now - _lastHeartbeatTime;
if (timeSinceLastHeartbeat.TotalSeconds > _heartbeatTimeoutSeconds)
{
Log($"⚠️ 心跳超时!最后心跳:{timeSinceLastHeartbeat.TotalSeconds:F1} 秒前");
Task.Run(() => HandleDisconnect("心跳超时"));
}
}
}
catch (Exception ex)
{
Log($"超时检测错误:{ex.Message}");
}
}
/// <summary>
/// 处理断开连接
/// </summary>
private async Task HandleDisconnect(string reason)
{
lock (_lockObj)
{
if (!_isConnected) return; // 避免重复触发
_isConnected = false;
}
Log($"处理断开:{reason}");
Disconnected?.Invoke(reason);
await CleanupAsync();
}
/// <summary>
/// 接收消息循环(增强错误处理)
/// </summary>
private async Task ReceiveLoopAsync()
{
if (_stream == null) return;
var buffer = new byte[4096];
var sb = new StringBuilder();
try
{
while (!_cts.Token.IsCancellationRequested && _isConnected)
{
var readCts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token);
readCts.CancelAfter(5000);
try
{
int read = await _stream.ReadAsync(buffer, 0, buffer.Length, readCts.Token);
if (read == 0)
{
Log("连接已关闭Read 返回 0");
await HandleDisconnect("连接已关闭");
break;
}
lock (_lockObj) { _lastHeartbeatTime = DateTime.Now; }
string text = Encoding.UTF8.GetString(buffer, 0, read);
sb.Append(text);
string content = sb.ToString();
int idx;
while ((idx = content.IndexOf('\n')) >= 0)
{
string line = content.Substring(0, idx).Trim('\r');
if (!string.IsNullOrEmpty(line))
{
try
{
var packet = JsonSerializer.Deserialize<HeartbeatPacket>(line);
if(packet !=null)
PacketReceived?.Invoke(packet);
}
catch (JsonException ex)
{
Log($"JSON 解析错误:{ex.Message}");
}
}
content = content.Substring(idx + 1);
}
sb.Clear();
sb.Append(content);
}
catch (OperationCanceledException)
{
continue;
}
catch (IOException ex) when (ex.InnerException is SocketException sockEx)
{
HandleSocketError("接收数据时 Socket 错误", sockEx);
await HandleDisconnect($"Socket 错误:{sockEx.SocketErrorCode}");
break;
}
catch (IOException ex)
{
Log($"IO 错误:{ex.Message}");
await HandleDisconnect($"IO 错误:{ex.Message}");
break;
}
catch (ObjectDisposedException)
{
Log("Stream 已释放");
await HandleDisconnect("Stream 已释放");
break;
}
}
}
catch (Exception ex)
{
Log($"接收循环错误:{ex.Message}");
await HandleDisconnect($"接收错误:{ex.Message}");
}
}
/// <summary>
/// 发送心跳(增强错误处理)
/// </summary>
private async Task SendHeartbeatAsync()
{
if (!_isConnected || _stream == null || _client?.Connected != true) return;
string payload = "";
if (_dataQueue.TryDequeue(out var data))
{
payload = data;
}
var packet = new HeartbeatPacket
{
Source = LocalIdentity,
Timestamp = DateTime.Now.Ticks,
SequenceId = Interlocked.Increment(ref _sequenceId),
Payload = payload
};
try
{
string json = JsonSerializer.Serialize(packet) + "\n";
byte[] dataBytes = Encoding.UTF8.GetBytes(json);
await _stream.WriteAsync(dataBytes, 0, dataBytes.Length, _cts.Token);
await _stream.FlushAsync(_cts.Token);
}
catch (IOException ex) when (ex.InnerException is SocketException sockEx)
{
HandleSocketError("发送心跳时 Socket 错误", sockEx);
await HandleDisconnect($"发送失败:{sockEx.SocketErrorCode}");
}
catch (ObjectDisposedException)
{
Log("Stream 已释放,无法发送");
await HandleDisconnect("Stream 已释放");
}
catch (Exception ex)
{
Log($"发送失败:{ex.Message}");
await HandleDisconnect($"发送失败:{ex.Message}");
}
}
public void PushData(string jsonData)
{
if (!_isConnected)
{
Log("⚠️ 未连接,数据无法发送");
return;
}
_dataQueue.Enqueue(jsonData);
}
private void Log(string msg) => System.Diagnostics.Debug.WriteLine($"[{LocalIdentity}] {msg}");
private async Task CleanupAsync()
{
lock (_lockObj)
{
_isConnected = false;
}
_heartbeatTimer?.Dispose();
_timeoutCheckTimer?.Dispose();
try
{
_stream?.Dispose();
}
catch (Exception ex) { Log($"释放 Stream 失败:{ex.Message}"); }
try
{
_client?.Dispose();
}
catch (Exception ex) { Log($"释放 Client 失败:{ex.Message}"); }
try
{
_listener?.Stop();
}
catch (Exception ex) { Log($"停止 Listener 失败:{ex.Message}"); }
_cts.Cancel();
}
public void Dispose()
{
_cts.Cancel();
CleanupAsync().Wait();
_cts.Dispose();
}
}