|
|
// TcpManager.cs
|
|
|
using IPCLib;
|
|
|
using System;
|
|
|
using System.IO;
|
|
|
using System.Net.Sockets;
|
|
|
using System.Text;
|
|
|
using System.Text.Json;
|
|
|
using System.Threading;
|
|
|
using System.Threading.Tasks;
|
|
|
|
|
|
|
|
|
// TcpManager.cs - 修正版
|
|
|
// TcpManager.cs - 完整修复版
|
|
|
|
|
|
using System.Collections.Concurrent;
|
|
|
|
|
|
public class TcpManager : IDisposable
|
|
|
{
|
|
|
private TcpListener _listener;
|
|
|
private TcpClient _client;
|
|
|
private NetworkStream _stream;
|
|
|
private CancellationTokenSource _cts = new CancellationTokenSource();
|
|
|
private Task _receiveTask;
|
|
|
private Task _acceptTask;
|
|
|
private System.Threading.Timer _heartbeatTimer;
|
|
|
private System.Threading.Timer _timeoutCheckTimer;
|
|
|
|
|
|
private long _sequenceId = 0;
|
|
|
private DateTime _lastHeartbeatTime = DateTime.Now;
|
|
|
private int _heartbeatTimeoutSeconds = 5;
|
|
|
private bool _isConnected = false;
|
|
|
private readonly object _lockObj = new object(); // 线程锁
|
|
|
|
|
|
public bool IsServer { get; set; }
|
|
|
public int HeartbeatInterval { get; set; } = 1000;
|
|
|
public string LocalIdentity { get; set; } = "App";
|
|
|
public int MaxReconnectAttempts { get; set; } = 5; // 最大重连次数
|
|
|
public int ReconnectDelay { get; set; } = 2000; // 重连间隔
|
|
|
|
|
|
// 事件
|
|
|
public event Action<HeartbeatPacket> PacketReceived;
|
|
|
public event Action Connected;
|
|
|
public event Action<string> Disconnected;
|
|
|
public event Action<string, Exception> ErrorOccurred; // 添加 Exception 参数
|
|
|
|
|
|
private readonly ConcurrentQueue<string> _dataQueue = new ConcurrentQueue<string>();
|
|
|
|
|
|
/// <summary>
|
|
|
/// 服务端启动监听
|
|
|
/// </summary>
|
|
|
public async Task StartServerAsync(int port)
|
|
|
{
|
|
|
IsServer = true;
|
|
|
|
|
|
try
|
|
|
{
|
|
|
// ✅ 设置 SO_REUSEADDR,允许端口重用
|
|
|
_listener = new TcpListener(System.Net.IPAddress.Loopback, port);
|
|
|
_listener.Server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
|
|
|
_listener.Start();
|
|
|
|
|
|
Log($"服务端已启动,监听端口:{port}");
|
|
|
_acceptTask = AcceptClientAsync();
|
|
|
await Task.CompletedTask;
|
|
|
}
|
|
|
catch (SocketException ex)
|
|
|
{
|
|
|
HandleSocketError("启动监听失败", ex);
|
|
|
throw;
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
ErrorOccurred?.Invoke("启动监听失败", ex);
|
|
|
throw;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
/// 后台接受客户端连接
|
|
|
/// </summary>
|
|
|
private async Task AcceptClientAsync()
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
Log("等待客户端连接...");
|
|
|
_client = await _listener.AcceptTcpClientAsync(_cts.Token);
|
|
|
|
|
|
// ✅ 配置 Socket 选项
|
|
|
ConfigureSocket(_client);
|
|
|
|
|
|
_stream = _client.GetStream();
|
|
|
_lastHeartbeatTime = DateTime.Now;
|
|
|
_isConnected = true;
|
|
|
|
|
|
Log("客户端已连接");
|
|
|
Connected?.Invoke();
|
|
|
|
|
|
_receiveTask = ReceiveLoopAsync();
|
|
|
StartHeartbeatTimer();
|
|
|
StartTimeoutCheckTimer();
|
|
|
}
|
|
|
catch (OperationCanceledException)
|
|
|
{
|
|
|
Log("接受连接已取消");
|
|
|
}
|
|
|
catch (SocketException ex)
|
|
|
{
|
|
|
HandleSocketError("接受连接失败", ex);
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
ErrorOccurred?.Invoke("接受连接失败", ex);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void StartHeartbeatTimer()
|
|
|
{
|
|
|
_heartbeatTimer = new System.Threading.Timer(async _ =>
|
|
|
{
|
|
|
await SendHeartbeatAsync();
|
|
|
}, null, 0, HeartbeatInterval);
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
/// ✅ 启动超时检测定时器(核心修复)
|
|
|
/// </summary>
|
|
|
private void StartTimeoutCheckTimer()
|
|
|
{
|
|
|
_timeoutCheckTimer = new System.Threading.Timer(_ =>
|
|
|
{
|
|
|
CheckConnectionTimeout();
|
|
|
}, null, HeartbeatInterval, HeartbeatInterval);
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
/// 客户端启动连接(带重连)
|
|
|
/// </summary>
|
|
|
public async Task StartClientAsync(string host, int port)
|
|
|
{
|
|
|
IsServer = false;
|
|
|
int reconnectCount = 0;
|
|
|
|
|
|
while (!_cts.Token.IsCancellationRequested)
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
Log($"尝试连接 {host}:{port} (尝试 {reconnectCount + 1}/{MaxReconnectAttempts})...");
|
|
|
|
|
|
_client = new TcpClient();
|
|
|
_client.ReceiveTimeout = 5000; // ✅ 设置接收超时
|
|
|
_client.SendTimeout = 5000; // ✅ 设置发送超时
|
|
|
|
|
|
await _client.ConnectAsync(host, port, _cts.Token);
|
|
|
|
|
|
ConfigureSocket(_client);
|
|
|
|
|
|
_stream = _client.GetStream();
|
|
|
_lastHeartbeatTime = DateTime.Now;
|
|
|
_isConnected = true;
|
|
|
|
|
|
Log($"已连接到服务端 {host}:{port}");
|
|
|
Connected?.Invoke();
|
|
|
|
|
|
_receiveTask = ReceiveLoopAsync();
|
|
|
StartHeartbeatTimer();
|
|
|
StartTimeoutCheckTimer();
|
|
|
return; // 连接成功,退出重连循环
|
|
|
}
|
|
|
catch (SocketException ex)
|
|
|
{
|
|
|
reconnectCount++;
|
|
|
HandleSocketError($"连接失败 (尝试 {reconnectCount}/{MaxReconnectAttempts})", ex);
|
|
|
|
|
|
if (reconnectCount >= MaxReconnectAttempts)
|
|
|
{
|
|
|
ErrorOccurred?.Invoke($"达到最大重连次数 {MaxReconnectAttempts}", ex);
|
|
|
await HandleDisconnect($"达到最大重连次数:{ex.Message}");
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
Log($"{ReconnectDelay}ms 后重试...");
|
|
|
await Task.Delay(ReconnectDelay, _cts.Token);
|
|
|
}
|
|
|
catch (OperationCanceledException)
|
|
|
{
|
|
|
Log("连接已取消");
|
|
|
return;
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
reconnectCount++;
|
|
|
ErrorOccurred?.Invoke($"连接异常 (尝试 {reconnectCount}/{MaxReconnectAttempts})", ex);
|
|
|
|
|
|
if (reconnectCount >= MaxReconnectAttempts)
|
|
|
{
|
|
|
await HandleDisconnect($"连接异常:{ex.Message}");
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
await Task.Delay(ReconnectDelay, _cts.Token);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
/// ✅ 配置 Socket 选项
|
|
|
/// </summary>
|
|
|
private void ConfigureSocket(TcpClient client)
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
// 启用 KeepAlive
|
|
|
client.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
|
|
|
|
|
|
// 禁用 Nagle 算法(减少延迟)
|
|
|
client.NoDelay = true;
|
|
|
|
|
|
// 设置缓冲区大小
|
|
|
client.ReceiveBufferSize = 8192;
|
|
|
client.SendBufferSize = 8192;
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
Log($"配置 Socket 失败:{ex.Message}");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
/// ✅ 处理 Socket 错误
|
|
|
/// </summary>
|
|
|
private void HandleSocketError(string context, SocketException ex)
|
|
|
{
|
|
|
string errorMessage = $"{context} - Socket 错误 {ex.SocketErrorCode} ({ex.NativeErrorCode}): {ex.Message}";
|
|
|
Log($"❌ {errorMessage}");
|
|
|
ErrorOccurred?.Invoke(errorMessage, ex);
|
|
|
|
|
|
// 根据错误码采取不同措施
|
|
|
switch (ex.SocketErrorCode)
|
|
|
{
|
|
|
case SocketError.ConnectionReset: // 10054
|
|
|
case SocketError.ConnectionAborted: // 10053
|
|
|
case SocketError.Shutdown: // 10058
|
|
|
Log("连接被对端关闭,触发断开事件");
|
|
|
Task.Run(() => HandleDisconnect("连接被对端关闭"));
|
|
|
break;
|
|
|
|
|
|
case SocketError.AddressAlreadyInUse: // 10048
|
|
|
Log("端口被占用,尝试使用其他端口");
|
|
|
break;
|
|
|
|
|
|
case SocketError.ConnectionRefused: // 10061
|
|
|
Log("连接被拒绝,检查服务端是否运行");
|
|
|
break;
|
|
|
|
|
|
case SocketError.TimedOut: // 10060
|
|
|
Log("连接超时");
|
|
|
break;
|
|
|
|
|
|
default:
|
|
|
Log($"未处理的 Socket 错误:{ex.SocketErrorCode}");
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
/// 检查连接是否超时
|
|
|
/// </summary>
|
|
|
private void CheckConnectionTimeout()
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
lock (_lockObj)
|
|
|
{
|
|
|
var timeSinceLastHeartbeat = DateTime.Now - _lastHeartbeatTime;
|
|
|
|
|
|
if (timeSinceLastHeartbeat.TotalSeconds > _heartbeatTimeoutSeconds)
|
|
|
{
|
|
|
Log($"⚠️ 心跳超时!最后心跳:{timeSinceLastHeartbeat.TotalSeconds:F1} 秒前");
|
|
|
Task.Run(() => HandleDisconnect("心跳超时"));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
Log($"超时检测错误:{ex.Message}");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
/// 处理断开连接
|
|
|
/// </summary>
|
|
|
private async Task HandleDisconnect(string reason)
|
|
|
{
|
|
|
lock (_lockObj)
|
|
|
{
|
|
|
if (!_isConnected) return; // 避免重复触发
|
|
|
_isConnected = false;
|
|
|
}
|
|
|
|
|
|
Log($"处理断开:{reason}");
|
|
|
Disconnected?.Invoke(reason);
|
|
|
await CleanupAsync();
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
/// 接收消息循环(增强错误处理)
|
|
|
/// </summary>
|
|
|
private async Task ReceiveLoopAsync()
|
|
|
{
|
|
|
if (_stream == null) return;
|
|
|
var buffer = new byte[4096];
|
|
|
var sb = new StringBuilder();
|
|
|
|
|
|
try
|
|
|
{
|
|
|
while (!_cts.Token.IsCancellationRequested && _isConnected)
|
|
|
{
|
|
|
var readCts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token);
|
|
|
readCts.CancelAfter(5000);
|
|
|
|
|
|
try
|
|
|
{
|
|
|
int read = await _stream.ReadAsync(buffer, 0, buffer.Length, readCts.Token);
|
|
|
|
|
|
if (read == 0)
|
|
|
{
|
|
|
Log("连接已关闭(Read 返回 0)");
|
|
|
await HandleDisconnect("连接已关闭");
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
lock (_lockObj) { _lastHeartbeatTime = DateTime.Now; }
|
|
|
|
|
|
string text = Encoding.UTF8.GetString(buffer, 0, read);
|
|
|
sb.Append(text);
|
|
|
|
|
|
string content = sb.ToString();
|
|
|
int idx;
|
|
|
while ((idx = content.IndexOf('\n')) >= 0)
|
|
|
{
|
|
|
string line = content.Substring(0, idx).Trim('\r');
|
|
|
if (!string.IsNullOrEmpty(line))
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
var packet = JsonSerializer.Deserialize<HeartbeatPacket>(line);
|
|
|
if(packet !=null)
|
|
|
PacketReceived?.Invoke(packet);
|
|
|
}
|
|
|
catch (JsonException ex)
|
|
|
{
|
|
|
Log($"JSON 解析错误:{ex.Message}");
|
|
|
}
|
|
|
}
|
|
|
content = content.Substring(idx + 1);
|
|
|
}
|
|
|
sb.Clear();
|
|
|
sb.Append(content);
|
|
|
}
|
|
|
catch (OperationCanceledException)
|
|
|
{
|
|
|
continue;
|
|
|
}
|
|
|
catch (IOException ex) when (ex.InnerException is SocketException sockEx)
|
|
|
{
|
|
|
HandleSocketError("接收数据时 Socket 错误", sockEx);
|
|
|
await HandleDisconnect($"Socket 错误:{sockEx.SocketErrorCode}");
|
|
|
break;
|
|
|
}
|
|
|
catch (IOException ex)
|
|
|
{
|
|
|
Log($"IO 错误:{ex.Message}");
|
|
|
await HandleDisconnect($"IO 错误:{ex.Message}");
|
|
|
break;
|
|
|
}
|
|
|
catch (ObjectDisposedException)
|
|
|
{
|
|
|
Log("Stream 已释放");
|
|
|
await HandleDisconnect("Stream 已释放");
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
Log($"接收循环错误:{ex.Message}");
|
|
|
await HandleDisconnect($"接收错误:{ex.Message}");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
/// 发送心跳(增强错误处理)
|
|
|
/// </summary>
|
|
|
private async Task SendHeartbeatAsync()
|
|
|
{
|
|
|
if (!_isConnected || _stream == null || _client?.Connected != true) return;
|
|
|
|
|
|
string payload = "";
|
|
|
if (_dataQueue.TryDequeue(out var data))
|
|
|
{
|
|
|
payload = data;
|
|
|
}
|
|
|
|
|
|
var packet = new HeartbeatPacket
|
|
|
{
|
|
|
Source = LocalIdentity,
|
|
|
Timestamp = DateTime.Now.Ticks,
|
|
|
SequenceId = Interlocked.Increment(ref _sequenceId),
|
|
|
Payload = payload
|
|
|
};
|
|
|
|
|
|
try
|
|
|
{
|
|
|
string json = JsonSerializer.Serialize(packet) + "\n";
|
|
|
byte[] dataBytes = Encoding.UTF8.GetBytes(json);
|
|
|
await _stream.WriteAsync(dataBytes, 0, dataBytes.Length, _cts.Token);
|
|
|
await _stream.FlushAsync(_cts.Token);
|
|
|
}
|
|
|
catch (IOException ex) when (ex.InnerException is SocketException sockEx)
|
|
|
{
|
|
|
HandleSocketError("发送心跳时 Socket 错误", sockEx);
|
|
|
await HandleDisconnect($"发送失败:{sockEx.SocketErrorCode}");
|
|
|
}
|
|
|
catch (ObjectDisposedException)
|
|
|
{
|
|
|
Log("Stream 已释放,无法发送");
|
|
|
await HandleDisconnect("Stream 已释放");
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
Log($"发送失败:{ex.Message}");
|
|
|
await HandleDisconnect($"发送失败:{ex.Message}");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void PushData(string jsonData)
|
|
|
{
|
|
|
if (!_isConnected)
|
|
|
{
|
|
|
Log("⚠️ 未连接,数据无法发送");
|
|
|
return;
|
|
|
}
|
|
|
_dataQueue.Enqueue(jsonData);
|
|
|
}
|
|
|
|
|
|
private void Log(string msg) => System.Diagnostics.Debug.WriteLine($"[{LocalIdentity}] {msg}");
|
|
|
|
|
|
private async Task CleanupAsync()
|
|
|
{
|
|
|
lock (_lockObj)
|
|
|
{
|
|
|
_isConnected = false;
|
|
|
}
|
|
|
|
|
|
_heartbeatTimer?.Dispose();
|
|
|
_timeoutCheckTimer?.Dispose();
|
|
|
|
|
|
try
|
|
|
{
|
|
|
_stream?.Dispose();
|
|
|
}
|
|
|
catch (Exception ex) { Log($"释放 Stream 失败:{ex.Message}"); }
|
|
|
|
|
|
try
|
|
|
{
|
|
|
_client?.Dispose();
|
|
|
}
|
|
|
catch (Exception ex) { Log($"释放 Client 失败:{ex.Message}"); }
|
|
|
|
|
|
try
|
|
|
{
|
|
|
_listener?.Stop();
|
|
|
}
|
|
|
catch (Exception ex) { Log($"停止 Listener 失败:{ex.Message}"); }
|
|
|
|
|
|
_cts.Cancel();
|
|
|
}
|
|
|
|
|
|
public void Dispose()
|
|
|
{
|
|
|
_cts.Cancel();
|
|
|
CleanupAsync().Wait();
|
|
|
_cts.Dispose();
|
|
|
}
|
|
|
} |