Работать с DataTable, используя Parallel.по каждому элементу
Эй ребята,
В нашем офисе мы работаем над большим развитием обработки данных.
Целью проекта является получение большого количества текстовых файлов через FTP серверы,
а затем выполнить на каждом файле определенные проверки и в конце процесса вставить все данные в SQL server.
Каждый файл может содержать миллионы записей.
Каждая запись в файле содержит разделы информации, которые расположены в соответствии с
к заранее определенной позиции, где каждый сегмент проверяется в соответствии с бизнес-моделью.
Если определенный раздел не проверен, мы аккуратно вставляем ошибку в DataTable, и в конце процесса запуска данные сохраняются в SQL с помощью метода Bulking.
Вот структура таблицы DataTable:
dtErrors = new DataTable(); dtErrors.Columns.Add(TblErrors_Consts.IdRow, typeof(int)); dtErrors.Columns.Add(TblErrors_Consts.OperatorId, typeof(int)); dtErrors.Columns.Add(TblErrors_Consts.FileTypeId, typeof(int)); dtErrors.Columns.Add(TblErrors_Consts.FileName, typeof(string)); dtErrors.Columns.Add(TblErrors_Consts.ErrorRow, typeof(int)); dtErrors.Columns.Add(TblErrors_Consts.ErrorCodeId, typeof(int)); dtErrors.Columns.Add(TblErrors_Consts.AddDate, typeof(DateTime));
И это метод, который отвечает за ввод записи в DataTable:
public void AddError(FileDetailsBE fd, int ErrorCode, int lineCounter = 0) { MethodBase m = MethodBase.GetCurrentMethod(); try { DataRow dr = dtErrors.NewRow(); dr[TblErrors_Consts.OperatorId] = fd.ParentSafe.OperatorId; dr[TblErrors_Consts.FileTypeId] = fd.FileTypeId; dr[TblErrors_Consts.FileName] = fd.FileName; dr[TblErrors_Consts.ErrorRow] = lineCounter; dr[TblErrors_Consts.ErrorCodeId] = (int)ErrorCode; dr[TblErrors_Consts.AddDate] = CurrentDateTime; dtErrors.Rows.Add(dr); } catch (Exception ex) { GP.JobBE.Errors.Add(new JobErrorBE(ex, m)); } finally { Console.ForegroundColor = ConsoleColor.Green; } }
Во время наших тестов QA мы обнаружили, что когда все записи неверны, производительность резко снижается, когда она находится только в точке сохранения данных во временной таблице данных (даже без достижения стадии хранения базы данных).
По этой причине мы решили проанализировать все записи в файле с помощью Parallel.ForEach, а затем, конечно же, мы столкнулись с проблемой вставки новой записи в DataTable, потому что DataTable не является потокобезопасным.
Даже работа с Try Lock не решила эту проблему.
Мой вопрос заключается в следующем: Как все строки могут быть обработаны параллельным методом и что неправильные записи все равно войдут в DataTable?
Это раздел кода, который отвечает за работу со всеми записями (DATA представляет собой еще один DataTable, который в основном содержит все записи, полученные из текстового файла):
Parallel.ForEach(DATA.AsEnumerable(), row => { LineCounter++; if (LineCounter % divider == 0) { SaveDataTablesToDB(fd); } try { line = row[0].ToString(); if (line.Trim().Length.Equals(0)) { return; } eventCode = line.Substring(0, 3); if (eventCode != ImportManager_Consts.Event999) { EventBE eventTableBE = GetRelevantTable(fd, eventCode); if (eventTableBE != null) { if (line.TrimStart().TrimEnd().Length != eventTableBE.CharactersQty) { fd.Errors.AddError(fd, (int)ErrorCodes.IllegalEventLength, LineCounter); return; } ProcessLine(fd, line, eventTableBE, LineCounter); } } } catch (Exception ex) { ConsoleUtils.WriteErrorLine_NoObject(string.Format(ImportManager_Consts.LineCounter, LineCounter.ToString()), GP.Info); ConsoleUtils.WriteErrorLine_NoObject(string.Format(ImportManager_Consts.ProcessLine, line), GP.Info); GP.JobBE.Errors.Add(new JobErrorBE(ex, m)); fd.Errors.AddError(fd, (int)ErrorCodes.GeneralError, LineCounter); } });
Что я уже пробовал:
Я заменил DataTable в списке, содержащем бизнес-объект, и добавил ошибки в список.
В конце запуска всех записей все элементы списка передаются в DataTable, а затем я делаю наполнение в SQL.
Я обнаружил, что существует много утечек, и данные хранятся совсем не хорошо.