Как запланировать параллельное выполнение моей задачи
Я запускаю свои задачи для maxdegreeofparallelism с помощью parallel.foreach, и если есть задача, требующая слишком много времени для завершения, то все должны ждать следующего итерационного процесса.
Теперь сценарий таков: у меня есть обработка нескольких файловых заказов, выполняемая моей службой windows, и на первом этапе я извлекаю свои файловые заказы из БД, затем делаю некоторые манипуляции над этими записями, а затем передаю этот список файловых заказов с maxdegreeofparallelism в свою параллель для каждой задачи.
Есть несколько файлов в моих записях заказов файлов, которые имеют много документов, таких как 100 или 200, и в этих документах также есть много страниц. Таким образом, выполнение такой задачи занимает слишком много времени, потому что если заказ обработан для файла короткого размера, имеющего меньше документов или страниц, то он будет ждать, пока другие закончат работу с большими документами или страницами.
Теперь я хочу, чтобы эти конкретные свободные / завершенные задачи, выполняемые параллельно, получили новую запись БД для следующего порядка файлов и начали обрабатывать ее параллельно.
Два исключения заключаются в том, что следующие извлеченные файловые заказы не должны быть теми, которые уже обрабатываются другими задачами параллельно, и количество параллельных задач обработки также не должно превышать заданную максимальную степень параллелизма / максимальное количество запланированных задач для этой службы для обработки всех файловых заказов.
Пожалуйста, предложите лучшее решение для этого.
Что я уже пробовал:
/// <summary> /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> /// </summary> /// <param name="tasksToRun">The tasks to run.</param> /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> /// <param name="cancellationToken">The cancellation token.</param> public static Task StartAndWaitAllThrottled(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken()) { return StartAndWaitAllThrottled(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken); } /// <summary> /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> /// </summary> /// <param name="tasksToRun">The tasks to run.</param> /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param> /// <param name="cancellationToken">The cancellation token.</param> public static Task StartAndWaitAllThrottled(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken()) { // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly. var tasks = tasksToRun.ToList(); using (var throttler = new SemaphoreSlim(1,maxTasksToRunInParallel)) { var postTaskTasks = new List<Task>(); // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running. tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release()))); // Start running each task. foreach (var task in tasks) { // Increment the number of tasks currently running and wait if too many are running. throttler.Wait(timeoutInMilliseconds, cancellationToken); cancellationToken.ThrowIfCancellationRequested(); task.Start(); } // Wait for all of the provided tasks to complete. // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object. Task.WaitAll(postTaskTasks.ToArray(), cancellationToken); } }