using System.Threading.Channels; using Validation.Core; namespace Validation.Validator { /// /// CSV数据校验类,使用Channel提升并发效率 /// public class CsvDataValidator { private readonly SimpleDataValidator validator; private readonly Channel channel; private readonly List rules; /// /// 构造函数 /// /// 数据校验器 /// 校验规则列表 public CsvDataValidator(SimpleDataValidator validator, List rules) { this.validator = validator; this.rules = rules; // 创建一个有界通道,用于控制内存使用 var options = new BoundedChannelOptions(100) { FullMode = BoundedChannelFullMode.Wait }; channel = Channel.CreateBounded(options); } /// /// 读取CSV文件并发送数据到通道 /// /// CSV文件路径 /// 取消令牌 /// 表示异步操作的任务 public async Task ReadCsvFileAsync(string filePath, CancellationToken cancellationToken = default) { var writer = channel.Writer; try { using (var reader = new StreamReader(filePath)) { string? line; bool isFirstLine = true; while ((line = await reader.ReadLineAsync(cancellationToken)) != null && !cancellationToken.IsCancellationRequested) { if (isFirstLine) { // 第一行作为标题行处理 isFirstLine = false; continue; } var values = ParseCsvLine(line); await writer.WriteAsync(values, cancellationToken); } } } finally { writer.Complete(); } } /// /// 处理通道中的数据并进行校验 /// /// CSV标题行 /// 取消令牌 /// 校验结果 public async Task ProcessDataAsync(string[] headers, CancellationToken cancellationToken = default) { var reader = channel.Reader; var result = new ValidationResult { IsValid = true }; // 创建多个消费者任务来处理数据 var consumerTasks = new List(); for (int i = 0; i < Environment.ProcessorCount; i++) { consumerTasks.Add(ProcessDataConsumerAsync(headers, result, cancellationToken)); } // 等待所有消费者任务完成 await Task.WhenAll(consumerTasks); return result; } /// /// 数据消费者任务 /// /// CSV标题行 /// 校验结果 /// 取消令牌 /// 表示异步操作的任务 private async Task ProcessDataConsumerAsync(string[] headers, ValidationResult result, CancellationToken cancellationToken) { var reader = channel.Reader; int rowIndex = 1; // 每个消费者从第二行开始(第一行是标题) await foreach (var values in reader.ReadAllAsync(cancellationToken)) { var rowResult = validator.Validate(headers, rowIndex, values, rules); lock (result) { if (!rowResult.IsValid) { result.IsValid = false; result.Errors.AddRange(rowResult.Errors); } result.Warnings.AddRange(rowResult.Warnings); } rowIndex++; } } /// /// 简单的CSV行解析方法(实际项目中可能需要更复杂的处理) /// /// CSV行数据 /// 解析后的字符串数组 private string[] ParseCsvLine(string line) { return line.Split(','); } } }