Happy Singh Ответов: 2

Как добавить концепции блокировки/асинхронности/ожидания или потоковой передачи в этот код


Я хочу позвонить

(1) GetMessages() это будет продолжать приносить сообщения в отдельном потоке

(2) GetOrderData() это приведет к извлечению деталей заказа и вставке их в datagrid, и его задача будет завершена здесь

(3) UpdateOrderBook() теперь это обновит существующую datagrid (которая уже содержит некоторые данные из шага 2). Здесь, в этом методе, я хочу проверить очередь и должен повторить concurrentqueue, затем выполнить некоторую обработку, а затем вставить эту запись в существующую сетку.

Два процесса (1,3) будут выполняться асинхронно, но в первый раз он должен обрабатываться в указанном выше порядке.

Я застрял в том, чтобы заставить их работать асинхронно.


Это прекрасно работает в первый раз. Но когда я меняю продукт и снова подключаю его, все портится, и данные не обрабатываются должным образом.

Я новичок в многопоточности, поэтому не уверен, что мне нужно использовать lock или async/await или что-то еще. Что я делаю не так в приведенном выше коде?

Что я уже пробовал:

void OnReceivingFeedMessage(message)
    {
         concurrentQueue.Enqueue(message);
      
         if (!messageStreamStarted)   // only first time get order book
         {
             messageStreamStarted = true;
             GetOrderBookData();
         }
    }

    private void GetOrderBookData()
    {
        MarketData m = new MarketData();
        ProductOrderBook p = m.GetProductOrderBook(productId);           
        bidsList = p.bids;
        asksList = p.asks;
        isOrderBookUpdated = true;

        Task task3 = Task.Run(() => KickStartToProcessQueue());         
    }

    private void KickStartToProcessQueue()
    {
        while (threadProcessQueueExist)
        {
            int recordCountNew = concurrentQueue.Count();
            if (recordCountNew != 0)
            {
                if (isOrderBookUpdated)
                {
                    ProcessQueueMessages();
                }
            }
        }
    }

    private void ProcessQueueMessages()
    {
        if (!concurrentQueue.IsEmpty)
        {
            string jsonString;
            while (concurrentQueue.TryDequeue(out jsonString))
            {
              // have to insert the record in existing order book
            }
         }
    }

// The code written on product selectedindex change

    private void CloseAndReconnectToGetWebsocketFeed()
    { 
        w.CloseWebsocketConnection();                 
        messageStreamStarted = false;            
        isOrderBookUpdated = false;
        ConcurrentQueue<string> wssMessagesQueue = new ConcurrentQueue<string>();
        concurrentQueue = wssMessagesQueue;

        ConnectAndGetWebsocketFeedMessages(); // this calls OnReceivingFeedMessage
    }

2 Ответов

Рейтинг:
1

Richard Deeming

Воскрешение вопроса, который был задан и на который был дан ответ шесть месяцев назад только для того, чтобы продвигать свои собственные статьи?

Да, это жестокое обращение.