Как добавить концепции блокировки/асинхронности/ожидания или потоковой передачи в этот код
Я хочу позвонить
(1) GetMessages() это будет продолжать приносить сообщения в отдельном потоке
(2) GetOrderData() это приведет к извлечению деталей заказа и вставке их в datagrid, и его задача будет завершена здесь
(3) UpdateOrderBook() теперь это обновит существующую datagrid (которая уже содержит некоторые данные из шага 2). Здесь, в этом методе, я хочу проверить очередь и должен повторить concurrentqueue, затем выполнить некоторую обработку, а затем вставить эту запись в существующую сетку.
Два процесса (1,3) будут выполняться асинхронно, но в первый раз он должен обрабатываться в указанном выше порядке.
Я застрял в том, чтобы заставить их работать асинхронно.
Это прекрасно работает в первый раз. Но когда я меняю продукт и снова подключаю его, все портится, и данные не обрабатываются должным образом.
Я новичок в многопоточности, поэтому не уверен, что мне нужно использовать
lock
или async/await
или что-то еще. Что я делаю не так в приведенном выше коде?Что я уже пробовал:
void OnReceivingFeedMessage(message) { concurrentQueue.Enqueue(message); if (!messageStreamStarted) // only first time get order book { messageStreamStarted = true; GetOrderBookData(); } } private void GetOrderBookData() { MarketData m = new MarketData(); ProductOrderBook p = m.GetProductOrderBook(productId); bidsList = p.bids; asksList = p.asks; isOrderBookUpdated = true; Task task3 = Task.Run(() => KickStartToProcessQueue()); } private void KickStartToProcessQueue() { while (threadProcessQueueExist) { int recordCountNew = concurrentQueue.Count(); if (recordCountNew != 0) { if (isOrderBookUpdated) { ProcessQueueMessages(); } } } } private void ProcessQueueMessages() { if (!concurrentQueue.IsEmpty) { string jsonString; while (concurrentQueue.TryDequeue(out jsonString)) { // have to insert the record in existing order book } } } // The code written on product selectedindex change private void CloseAndReconnectToGetWebsocketFeed() { w.CloseWebsocketConnection(); messageStreamStarted = false; isOrderBookUpdated = false; ConcurrentQueue<string> wssMessagesQueue = new ConcurrentQueue<string>(); concurrentQueue = wssMessagesQueue; ConnectAndGetWebsocketFeedMessages(); // this calls OnReceivingFeedMessage }