You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

133 lines
4.8 KiB
C#

1 month ago
using System.Threading.Channels;
using Validation.Core;
namespace Validation.Validator
{
/// <summary>
/// CSV数据校验类使用Channel提升并发效率
/// </summary>
public class CsvDataValidator
{
private readonly SimpleDataValidator validator;
private readonly Channel<string[]> channel;
private readonly List<IValidationRule> rules;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="validator">数据校验器</param>
/// <param name="rules">校验规则列表</param>
public CsvDataValidator(SimpleDataValidator validator, List<IValidationRule> rules)
{
this.validator = validator;
this.rules = rules;
// 创建一个有界通道,用于控制内存使用
var options = new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait
};
channel = Channel.CreateBounded<string[]>(options);
}
/// <summary>
/// 读取CSV文件并发送数据到通道
/// </summary>
/// <param name="filePath">CSV文件路径</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>表示异步操作的任务</returns>
public async Task ReadCsvFileAsync(string filePath, CancellationToken cancellationToken = default)
{
var writer = channel.Writer;
try
{
using (var reader = new StreamReader(filePath))
{
string? line;
bool isFirstLine = true;
while ((line = await reader.ReadLineAsync(cancellationToken)) != null && !cancellationToken.IsCancellationRequested)
{
if (isFirstLine)
{
// 第一行作为标题行处理
isFirstLine = false;
continue;
}
var values = ParseCsvLine(line);
await writer.WriteAsync(values, cancellationToken);
}
}
}
finally
{
writer.Complete();
}
}
/// <summary>
/// 处理通道中的数据并进行校验
/// </summary>
/// <param name="headers">CSV标题行</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>校验结果</returns>
public async Task<ValidationResult> ProcessDataAsync(string[] headers, CancellationToken cancellationToken = default)
{
var reader = channel.Reader;
var result = new ValidationResult { IsValid = true };
// 创建多个消费者任务来处理数据
var consumerTasks = new List<Task>();
for (int i = 0; i < Environment.ProcessorCount; i++)
{
consumerTasks.Add(ProcessDataConsumerAsync(headers, result, cancellationToken));
}
// 等待所有消费者任务完成
await Task.WhenAll(consumerTasks);
return result;
}
/// <summary>
/// 数据消费者任务
/// </summary>
/// <param name="headers">CSV标题行</param>
/// <param name="result">校验结果</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>表示异步操作的任务</returns>
private async Task ProcessDataConsumerAsync(string[] headers, ValidationResult result, CancellationToken cancellationToken)
{
var reader = channel.Reader;
int rowIndex = 1; // 每个消费者从第二行开始(第一行是标题)
await foreach (var values in reader.ReadAllAsync(cancellationToken))
{
var rowResult = validator.Validate(headers, rowIndex, values, rules);
lock (result)
{
if (!rowResult.IsValid)
{
result.IsValid = false;
result.Errors.AddRange(rowResult.Errors);
}
result.Warnings.AddRange(rowResult.Warnings);
}
rowIndex++;
}
}
/// <summary>
/// 简单的CSV行解析方法实际项目中可能需要更复杂的处理
/// </summary>
/// <param name="line">CSV行数据</param>
/// <returns>解析后的字符串数组</returns>
private string[] ParseCsvLine(string line)
{
return line.Split(',');
}
}
}