Member 11320435 Ответов: 0

Неблокирующая и реентерабельная очередь отправки с порядком выполнения


Я пытаюсь реализовать свою собственную очередь отправки для отправки задач в том же порядке, в каком они добавляются в очередь. Мотивация для этого связана с отсутствием поддержки C++/CX для очередей отправки компонентов среды выполнения Windows. (https://msdn.microsoft.com/en-us/library/hh771042.aspx) - это документация, которую я рассматривал. Я понимаю, что это связано с проблемой [производитель-потребитель](https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem), но мне еще не удалось решить ее с помощью C++/CX.

Что я уже пробовал:

Учитывая это, я попробовал несколько подходов к этой проблеме.

Согласно нескольким примерам и документации, которые я нашел в Интернете, я попытался использовать семафоры для решения этой проблемы. Для этого я создал класс DispatchQueue, который содержит несколько атрибутов:

namespace Threading
    {
    	ref class DispatchQueue
    	{
    
    	private:
    		typedef IMap<String^, DispatchQueue^> MapType;
    		typedef std::function<void()> TaskType;
    		typedef std::queue<TaskType> QueueType;
    
    	internal:
    
    		static DispatchQueue^ Get();
    
    		void Add(TaskType task);
    
    	private:
    		TaskType DequeueTask();
    		void Runner();
    
    		QueueType & GetQueue();
    		const QueueType & GetQueue() const;
    
    	private:
    		QueueType _queue;
    		std::mutex _queue_mux;
    		std::mutex _add_mux;
    		HANDLE _emptySemaphore;
    		HANDLE _fullSemaphore;
    	};
    }


(Примечание: Я не публикую фактический код, потому что класс был в значительной степени повторен, и теперь он довольно грязный. Дайте мне знать, если вам понадобятся другие подробности.)

Вот как инициализируются семафоры; по крайней мере, это соответствующая часть конструктора:

Threading::DispatchQueue::DispatchQueue(String^ identifier)
    	: _queue(),_emptySemaphore(CreateSemaphore(NULL, 1, 1, NULL)), _fullSemaphore(CreateSemaphore(NULL, 0, 1, NULL))
    {
    }


Вот мой продюсер:

void Threading::DispatchQueue::Add(TaskType task)
    {
    	// Prevent different threads from entering this method
        // Disregard the mutex being static
    	static std::mutex mux;
    	std::lock_guard<std::mutex> lock(mux);
    
    	WaitForSingleObject(_emptySemaphore, INFINITE);
    	{
    		// Prevent concorrent access to the queue
    		std::lock_guard<std::mutex> lock(_queue_mux);
    		GetQueue().push(task);
    	}
    	ReleaseSemaphore(_fullSemaphore, 1, NULL);
    }


Вот потребитель:

void Threading::DispatchQueue::Runner()
    {
    	while (true)
    	{
    		WaitForSingleObject(_fullSemaphore, INFINITE);
    
    		TaskType task = DequeueTask();
    
    		task();
    
    		if (!ReleaseSemaphore(
    			_emptySemaphore,
    			1,
    			NULL)) {
    		}
    	}
    }


Я также попытался отправить задачу ('task ();`) _after_ семафор был освобожден, но это тоже не сработало. Также заметно, что " Бегун` (потребитель) запускается в точке инициализации:

create_task([dispatchQueue]() {

			dispatchQueue->Runner();
		});


Проблема с этим подходом заключается в том, что вызов task(); также может создавать новые задачи (TaskType), но это не похоже на реентерабельность. Таким образом, реализация блоков. Блокировка происходит потому, что задача входит в метод "добавить", не освободив семафор.

Вместо этого я повторил и попробовал этого продюсера:

void Threading::DispatchQueue::Add(TaskType task)
    {
        create_task([this, task]()
        {
        	WaitForSingleObject(_emptySemaphore, INFINITE);
        	{
        		std::lock_guard<std::mutex> lock(_queue_mux);
        		GetQueue().push(task);
        	}
        	ReleaseSemaphore(_fullSemaphore, 1, NULL);
        
        }).wait();   // Notice we're waiting
    }


Мотивация, стоящая за этим wait (), заключается в том, чтобы гарантировать, что задачи вставляются в очередь в правильном порядке. Это действительно работает, но аварийно завершает работу, если этот метод вызывается из потока пользовательского интерфейса. Это не вариант. Кроме того, не ждать-это не вариант, потому что, опять же, задачи отправляются не в правильном порядке.

Я тоже пытался (https://code.msdn.microsoft.com/windowsdesktop/Producer-Consumer-inter-cd30193b#content), который также блокирует.

Короче говоря, мне нужна очередь отправки, которая уважает порядок вставки, не блокируя производящий поток, поскольку он может быть основным потоком. Есть ли исправление для любого из этих подходов или любого другого подхода, который вы бы рекомендовали?

KarstenK

Я бы создал все задачи в режиме ожидания. А затем другая задача извлекает их и, когда она закончена, запускает следующие задачи.

Member 11320435

Что значит "создать все задачи в режиме ожидания"? Как мне это сделать?

0 Ответов