oronsultan Ответов: 2

Работать с DataTable, используя Parallel.по каждому элементу


Эй ребята,
В нашем офисе мы работаем над большим развитием обработки данных.
Целью проекта является получение большого количества текстовых файлов через FTP серверы,
а затем выполнить на каждом файле определенные проверки и в конце процесса вставить все данные в SQL server.

Каждый файл может содержать миллионы записей.

Каждая запись в файле содержит разделы информации, которые расположены в соответствии с
к заранее определенной позиции, где каждый сегмент проверяется в соответствии с бизнес-моделью.

Если определенный раздел не проверен, мы аккуратно вставляем ошибку в DataTable, и в конце процесса запуска данные сохраняются в SQL с помощью метода Bulking.

Вот структура таблицы DataTable:
dtErrors = new DataTable();
dtErrors.Columns.Add(TblErrors_Consts.IdRow, typeof(int));
dtErrors.Columns.Add(TblErrors_Consts.OperatorId, typeof(int));
dtErrors.Columns.Add(TblErrors_Consts.FileTypeId, typeof(int));
dtErrors.Columns.Add(TblErrors_Consts.FileName, typeof(string));
dtErrors.Columns.Add(TblErrors_Consts.ErrorRow, typeof(int));
dtErrors.Columns.Add(TblErrors_Consts.ErrorCodeId, typeof(int));
dtErrors.Columns.Add(TblErrors_Consts.AddDate, typeof(DateTime));


И это метод, который отвечает за ввод записи в DataTable:
public void AddError(FileDetailsBE fd, int ErrorCode, int lineCounter = 0)
{
	MethodBase m = MethodBase.GetCurrentMethod();
	try
	{
		DataRow dr = dtErrors.NewRow();
		dr[TblErrors_Consts.OperatorId] = fd.ParentSafe.OperatorId;
		dr[TblErrors_Consts.FileTypeId] = fd.FileTypeId;
		dr[TblErrors_Consts.FileName] = fd.FileName;
		dr[TblErrors_Consts.ErrorRow] = lineCounter;
		dr[TblErrors_Consts.ErrorCodeId] = (int)ErrorCode;
		dr[TblErrors_Consts.AddDate] = CurrentDateTime;
		dtErrors.Rows.Add(dr);
	}
	catch (Exception ex)
	{
		GP.JobBE.Errors.Add(new JobErrorBE(ex, m));
	}
	finally
	{
		Console.ForegroundColor = ConsoleColor.Green;
	}
}


Во время наших тестов QA мы обнаружили, что когда все записи неверны, производительность резко снижается, когда она находится только в точке сохранения данных во временной таблице данных (даже без достижения стадии хранения базы данных).

По этой причине мы решили проанализировать все записи в файле с помощью Parallel.ForEach, а затем, конечно же, мы столкнулись с проблемой вставки новой записи в DataTable, потому что DataTable не является потокобезопасным.

Даже работа с Try Lock не решила эту проблему.

Мой вопрос заключается в следующем: Как все строки могут быть обработаны параллельным методом и что неправильные записи все равно войдут в DataTable?

Это раздел кода, который отвечает за работу со всеми записями (DATA представляет собой еще один DataTable, который в основном содержит все записи, полученные из текстового файла):

Parallel.ForEach(DATA.AsEnumerable(), row =>
{
	LineCounter++;
	if (LineCounter % divider == 0)
	{
		SaveDataTablesToDB(fd);
	}

	try
	{
		line = row[0].ToString();
		if (line.Trim().Length.Equals(0))
		{
			return;
		}
		eventCode = line.Substring(0, 3);
		if (eventCode != ImportManager_Consts.Event999)
		{
			EventBE eventTableBE = GetRelevantTable(fd, eventCode);
			if (eventTableBE != null)
			{
				if (line.TrimStart().TrimEnd().Length != eventTableBE.CharactersQty)
				{
					fd.Errors.AddError(fd, (int)ErrorCodes.IllegalEventLength, LineCounter);
					return;
				}

				ProcessLine(fd, line, eventTableBE, LineCounter);
			}
		}
	}
	catch (Exception ex)
	{
		ConsoleUtils.WriteErrorLine_NoObject(string.Format(ImportManager_Consts.LineCounter, LineCounter.ToString()), GP.Info);
		ConsoleUtils.WriteErrorLine_NoObject(string.Format(ImportManager_Consts.ProcessLine, line), GP.Info);
		GP.JobBE.Errors.Add(new JobErrorBE(ex, m));
		fd.Errors.AddError(fd, (int)ErrorCodes.GeneralError, LineCounter);
	}
});


Что я уже пробовал:

Я заменил DataTable в списке, содержащем бизнес-объект, и добавил ошибки в список.

В конце запуска всех записей все элементы списка передаются в DataTable, а затем я делаю наполнение в SQL.

Я обнаружил, что существует много утечек, и данные хранятся совсем не хорошо.

2 Ответов

Рейтинг:
2

#realJSOP

0) создайте класс модели, представляющий информацию об ошибке (все свойства, которые вы хотите сохранить).

1) Создайте класс, производный от List<errorobject>, чтобы хранить объекты по мере их создания.

2) по мере обработки файлов и обнаружения ошибок создайте новый экземпляр модели ErrorObject и добавьте его в список объектов error.

3) в свой класс списка добавьте метод, который

  а) создает объект datatable
  б) использует отражение (в вашем классе объектов error) для определения нужных столбцов и типов данных, хранящихся в этих столбцах.
  в) просматривает содержимое списка и добавляет новую строку, заполненную свойствами каждого элемента
  d) сохраняет содержимое datatable в базе данных.

4) вызовите метод, описанный в шаге 3, когда файл будет обработан.

На самом деле все очень просто.


Рейтинг:
1

Gerry Schmitz

Вы будете "привязаны к вводу-выводу"; несколько потоков не помогут. Поскольку ваши данные настолько просты, у вас может быть один процесс, один CSV-файл и один "массовый" загрузчик, который устраняет накладные расходы на "построение строк данных".